Worker lifecycle
On this page
The WorkerContext in Apalis manages the internal lifecycle of workers automatically. While you don't directly control these transitions, understanding how they work helps you build robust task handlers that respond appropriately to lifecycle events.
Worker States
Workers progress through distinct internal states:
Pending → Running ⇄ Paused → Stopped
- Pending: Initial state when created but not yet started
- Running: Active state processing tasks from the backend
- Paused: Temporarily suspended, no new tasks accepted
- Stopped: Final state during graceful shutdown
Accessing Worker Context
The WorkerContext is automatically injected into your task handlers:
async fn email_task(
email_data: EmailTask,
worker: WorkerContext,
) -> Result<(), EmailError> {
println!("Processing in worker: {}", worker.name());
println!("Current task count: {}", worker.task_count());
send_email(&email_data).await?;
// Conditionally trigger shutdown
if should_shutdown(&email_data) {
worker.stop()?;
}
Ok(())
}State Inspection
Check worker state within your tasks:
async fn state_aware_task(
task: MyTask,
worker: WorkerContext,
) -> Result<(), TaskError> {
// Adapt behavior based on worker state
if worker.is_shutting_down() {
return handle_quick_shutdown(task).await;
}
if worker.task_count() > 50 {
return handle_lightweight(task).await;
}
handle_normal(task).await
}Lifecycle Events
Monitor worker lifecycle through event listeners:
let worker = WorkerBuilder::new("lifecycle-worker")
.backend(storage)
.on_event(|ctx, event| {
match event {
Event::Start => {
println!("🚀 Worker '{}' started", ctx.name());
}
Event::Stop => {
println!("🛑 Worker '{}' stopped, tasks: {}",
ctx.name(), ctx.task_count());
}
Event::Task(task_event) => {
match task_event {
TaskEvent::Completed => {
println!("✅ Task completed, remaining: {}",
ctx.task_count());
}
TaskEvent::Failed(error) => {
println!("❌ Task failed: {:?}", error);
}
_ => {}
}
}
_ => {}
}
})
.build(my_task_handler);Graceful Shutdown
Handle shutdown gracefully in long-running tasks:
async fn long_running_task(
task: LongTask,
worker: WorkerContext,
) -> Result<(), TaskError> {
for (i, step) in task.steps.iter().enumerate() {
// Check for shutdown before each step
if worker.is_shutting_down() {
println!("Saving progress at step {}", i);
save_progress(&task, i).await?;
return Ok(());
}
process_step(step).await?;
}
Ok(())
}Triggering Shutdown
Trigger shutdown from within tasks based on conditions:
async fn monitoring_task(
task: MonitoringTask,
worker: WorkerContext,
) -> Result<(), TaskError> {
let result = process_task(&task).await?;
// Trigger shutdown based on conditions
if result.should_shutdown() {
println!("Triggering shutdown for worker '{}'", worker.name());
worker.stop()?;
}
Ok(())
}Advanced Patterns
Adaptive Processing
async fn adaptive_task(
task: AdaptiveTask,
worker: WorkerContext,
) -> Result<(), TaskError> {
let strategy = if worker.is_shutting_down() {
ProcessingStrategy::Fast
} else if worker.task_count() > 20 {
ProcessingStrategy::Batch
} else {
ProcessingStrategy::Thorough
};
process_with_strategy(task, strategy).await
}Health Monitoring
#[derive(Clone)]
struct WorkerMetrics {
tasks_processed: Arc<AtomicUsize>,
tasks_failed: Arc<AtomicUsize>,
}
let worker = WorkerBuilder::new("monitored-worker")
.backend(storage)
.data(WorkerMetrics::new())
.on_event(move |ctx, event| {
match event {
Event::Task(TaskEvent::Completed) => {
metrics.tasks_processed.fetch_add(1, Ordering::Relaxed);
}
Event::Task(TaskEvent::Failed(_)) => {
metrics.tasks_failed.fetch_add(1, Ordering::Relaxed);
}
Event::Stop => {
let processed = metrics.tasks_processed.load(Ordering::Relaxed);
let failed = metrics.tasks_failed.load(Ordering::Relaxed);
println!("Final stats - Processed: {}, Failed: {}", processed, failed);
}
_ => {}
}
})
.build(monitored_task);Best Practices
Regular State Checking
async fn best_practice_task(
task: MyTask,
worker: WorkerContext,
) -> Result<(), TaskError> {
// Check state at key points
if worker.is_shutting_down() {
return Ok(()); // Exit gracefully
}
// Log high load situations
if worker.task_count() > 10 {
println!("High load: {} tasks in worker '{}'",
worker.task_count(), worker.name());
}
process_task(task).await
}Error Context
async fn error_aware_task(
task: ErrorProneTask,
worker: WorkerContext,
) -> Result<(), TaskError> {
match process_risky_task(task).await {
Ok(result) => Ok(result),
Err(e) => {
eprintln!("Task failed in worker '{}': {:?}", worker.name(), e);
if e.is_critical() {
worker.stop()?; // Trigger shutdown for critical errors
}
Err(e)
}
}
}Key Methods
worker.name()- Get worker identifierworker.is_ready()- Check if ready for new tasksworker.is_running()- Check if in running stateworker.is_shutting_down()- Check if shutdown initiatedworker.task_count()- Get current active task countworker.stop()- Trigger graceful shutdown
Conclusion
Worker lifecycle management in Apalis is automatic, but understanding states and patterns helps you build robust task handlers. Use the WorkerContext to inspect worker state, respond to lifecycle events, and ensure graceful handling of various operational scenarios.