Worker lifecycle

Last edited 4 minutes ago.

On this page

The WorkerContext in Apalis manages the internal lifecycle of workers automatically. While you don't directly control these transitions, understanding how they work helps you build robust task handlers that respond appropriately to lifecycle events.

Worker States

Workers progress through distinct internal states:

Pending → Running ⇄ Paused → Stopped
  • Pending: Initial state when created but not yet started
  • Running: Active state processing tasks from the backend
  • Paused: Temporarily suspended, no new tasks accepted
  • Stopped: Final state during graceful shutdown

Accessing Worker Context

The WorkerContext is automatically injected into your task handlers:

async fn email_task(
    email_data: EmailTask,
    worker: WorkerContext,
) -> Result<(), EmailError> {
    println!("Processing in worker: {}", worker.name());
    println!("Current task count: {}", worker.task_count());
    
    send_email(&email_data).await?;
    
    // Conditionally trigger shutdown
    if should_shutdown(&email_data) {
        worker.stop()?;
    }
    
    Ok(())
}

State Inspection

Check worker state within your tasks:

async fn state_aware_task(
    task: MyTask,
    worker: WorkerContext,
) -> Result<(), TaskError> {
    // Adapt behavior based on worker state
    if worker.is_shutting_down() {
        return handle_quick_shutdown(task).await;
    }
    
    if worker.task_count() > 50 {
        return handle_lightweight(task).await;
    }
    
    handle_normal(task).await
}

Lifecycle Events

Monitor worker lifecycle through event listeners:

let worker = WorkerBuilder::new("lifecycle-worker")
    .backend(storage)
    .on_event(|ctx, event| {
        match event {
            Event::Start => {
                println!("🚀 Worker '{}' started", ctx.name());
            }
            Event::Stop => {
                println!("🛑 Worker '{}' stopped, tasks: {}", 
                        ctx.name(), ctx.task_count());
            }
            Event::Task(task_event) => {
                match task_event {
                    TaskEvent::Completed => {
                        println!("✅ Task completed, remaining: {}", 
                                ctx.task_count());
                    }
                    TaskEvent::Failed(error) => {
                        println!("❌ Task failed: {:?}", error);
                    }
                    _ => {}
                }
            }
            _ => {}
        }
    })
    .build(my_task_handler);

Graceful Shutdown

Handle shutdown gracefully in long-running tasks:

async fn long_running_task(
    task: LongTask,
    worker: WorkerContext,
) -> Result<(), TaskError> {
    for (i, step) in task.steps.iter().enumerate() {
        // Check for shutdown before each step
        if worker.is_shutting_down() {
            println!("Saving progress at step {}", i);
            save_progress(&task, i).await?;
            return Ok(());
        }
        
        process_step(step).await?;
    }
    
    Ok(())
}

Triggering Shutdown

Trigger shutdown from within tasks based on conditions:

async fn monitoring_task(
    task: MonitoringTask,
    worker: WorkerContext,
) -> Result<(), TaskError> {
    let result = process_task(&task).await?;
    
    // Trigger shutdown based on conditions
    if result.should_shutdown() {
        println!("Triggering shutdown for worker '{}'", worker.name());
        worker.stop()?;
    }
    
    Ok(())
}

Advanced Patterns

Adaptive Processing

async fn adaptive_task(
    task: AdaptiveTask,
    worker: WorkerContext,
) -> Result<(), TaskError> {
    let strategy = if worker.is_shutting_down() {
        ProcessingStrategy::Fast
    } else if worker.task_count() > 20 {
        ProcessingStrategy::Batch
    } else {
        ProcessingStrategy::Thorough
    };
    
    process_with_strategy(task, strategy).await
}

Health Monitoring

#[derive(Clone)]
struct WorkerMetrics {
    tasks_processed: Arc<AtomicUsize>,
    tasks_failed: Arc<AtomicUsize>,
}

let worker = WorkerBuilder::new("monitored-worker")
    .backend(storage)
    .data(WorkerMetrics::new())
    .on_event(move |ctx, event| {
        match event {
            Event::Task(TaskEvent::Completed) => {
                metrics.tasks_processed.fetch_add(1, Ordering::Relaxed);
            }
            Event::Task(TaskEvent::Failed(_)) => {
                metrics.tasks_failed.fetch_add(1, Ordering::Relaxed);
            }
            Event::Stop => {
                let processed = metrics.tasks_processed.load(Ordering::Relaxed);
                let failed = metrics.tasks_failed.load(Ordering::Relaxed);
                println!("Final stats - Processed: {}, Failed: {}", processed, failed);
            }
            _ => {}
        }
    })
    .build(monitored_task);

Best Practices

Regular State Checking

async fn best_practice_task(
    task: MyTask,
    worker: WorkerContext,
) -> Result<(), TaskError> {
    // Check state at key points
    if worker.is_shutting_down() {
        return Ok(()); // Exit gracefully
    }
    
    // Log high load situations
    if worker.task_count() > 10 {
        println!("High load: {} tasks in worker '{}'", 
                worker.task_count(), worker.name());
    }
    
    process_task(task).await
}

Error Context

async fn error_aware_task(
    task: ErrorProneTask,
    worker: WorkerContext,
) -> Result<(), TaskError> {
    match process_risky_task(task).await {
        Ok(result) => Ok(result),
        Err(e) => {
            eprintln!("Task failed in worker '{}': {:?}", worker.name(), e);
            
            if e.is_critical() {
                worker.stop()?; // Trigger shutdown for critical errors
            }
            
            Err(e)
        }
    }
}

Key Methods

  • worker.name() - Get worker identifier
  • worker.is_ready() - Check if ready for new tasks
  • worker.is_running() - Check if in running state
  • worker.is_shutting_down() - Check if shutdown initiated
  • worker.task_count() - Get current active task count
  • worker.stop() - Trigger graceful shutdown

Conclusion

Worker lifecycle management in Apalis is automatic, but understanding states and patterns helps you build robust task handlers. Use the WorkerContext to inspect worker state, respond to lifecycle events, and ensure graceful handling of various operational scenarios.