Introduction
On this page
Workers are the core execution engines in Apalis that process tasks from your chosen backend. They continuously poll for available tasks, execute your service functions, and handle the complete task lifecycle. Understanding how to create, configure, and control workers is essential for building robust task processing systems.
Basic Worker Concepts
A worker in Apalis consists of:
- Name: A unique identifier for the worker instance
- Backend: The storage system where tasks are queued (Memory, PostgreSQL, Redis, etc.)
- Service Function: The handler that processes individual tasks
- Configuration: Settings for concurrency, retries, timeouts, and more
Creating Workers
Apalis provides two primary methods for running workers, each suited to different use cases and control requirements.
Method 1: Direct Execution with run()
The run() method starts the worker and blocks until it completes or is stopped. This is the simplest approach for straightforward task processing:
let future_worker = WorkerBuilder::new("future-worker")
.backend(storage)
.on_event(|ctx, event| {
println!("Worker '{}' event: {:?}", ctx.name(), event);
})
.build(send_email)
.run();Method 2: Event Stream with stream()
The stream() method returns an event stream that allows you to monitor worker events in real-time and maintain more control over the worker lifecycle:
let worker = WorkerBuilder::new("streaming-worker")
.backend(storage)
.on_event(|ctx, event| {
println!("Worker '{}' received event: {:?}", ctx.name(), event);
})
.build(send_email);
// Get event stream instead of blocking
let mut event_stream = worker.stream();
let stream_worker = async move {
// Process events as they occur
while let Some(event_result) = event_stream.next().await {
match event_result {
Ok(event) => {
println!("Stream event: {:?}", event);
// You can react to specific events
match event {
// Handle specific event types if needed
_ => {}
}
}
Err(e) => {
eprintln!("Worker error: {:?}", e);
break;
}
}
}
};Worker Lifecycle
Stopping Workers
Workers can be stopped in several ways:
1. Programmatic Stopping
Use the WorkerContext to stop the worker from within a task:
async fn controlled_task(
task: Email,
worker: WorkerContext,
) -> Result<(), TaskError> {
// Process the task
process_business_logic(&task).await?;
// Stop worker based on some condition
if should_stop_processing(&task) {
worker.stop()?;
}
Ok(())
}More of this is the complete lifecycle section
2. External Signals
Workers automatically respond to common termination signals:
let worker = WorkerBuilder::new("signal-worker")
.backend(storage)
.build(send_email)
.run_until(tokio::signal::ctrl_c());Basically run_until accepts any Future which when resolves starts the worker shutdown process
3. Resuming workers
WorkerContext offers a way to resume workers. By default one doesnt have access to the context outside the task function, but in some cases you may want to pass the worker context into other parts.
let mut ctx = WorkerContext::new::<Service>(&self.name);
let worker = WorkerBuilder::new("timeout-worker")
.backend(storage)
.build(send_email)
.run_with_ctx(&mut ctx);
// Use ctx
ctx.pause()?
// Later
ctx.resume()?Event Monitoring
Monitor worker lifecycle events for observability:
let worker = WorkerBuilder::new("monitored-worker")
.backend(storage)
.on_event(|ctx, event| {
match event {
Event::Start => {
println!("Worker '{}' started", ctx.name());
}
Event::Stop => {
println!("Worker '{}' stopped", ctx.name());
}
Event::Error(error) => {
eprintln!("Error in worker '{}': {:?}", ctx.name(), error);
}
_ => {
}
}
})
.build(send_email);Putting it all together
use apalis::prelude::*;
use examples::*;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
let mut storage = MemoryStorage::new();
let future_worker = WorkerBuilder::new("future-worker")
.backend(storage)
.on_event(|ctx, event| {
println!("Worker '{}' event: {:?}", ctx.name(), event);
})
.build(send_email)
.run();
let mut storage = MemoryStorage::new();
let worker = WorkerBuilder::new("streaming-worker")
.backend(storage)
.on_event(|ctx, event| {
println!("Worker '{}' received event: {:?}", ctx.name(), event);
})
.build(send_email);
// Get event stream instead of blocking
let mut event_stream = worker.stream();
let stream_worker = async move {
// Process events as they occur
while let Some(event_result) = event_stream.next().await {
match event_result {
Ok(event) => {
println!("Stream event: {:?}", event);
// You can react to specific events
match event {
// Handle specific event types if needed
_ => {}
}
}
Err(e) => {
eprintln!("Worker error: {:?}", e);
break;
}
}
}
};
let mut storage = MemoryStorage::new();
let worker = WorkerBuilder::new("monitored-worker")
.backend(storage)
.on_event(|ctx, event| {
match event {
Event::Start => {
println!("Worker '{}' started", ctx.name());
}
Event::Stop => {
println!("Worker '{}' stopped", ctx.name());
}
Event::Error(error) => {
eprintln!("Error in worker '{}': {:?}", ctx.name(), error);
}
_ => {
}
}
})
.build(send_email);
let mut storage = MemoryStorage::new();
let worker = WorkerBuilder::new("signal-worker")
.backend(storage)
.build(send_email)
.run_until(tokio::signal::ctrl_c());
Ok(())
}Best Practices
Worker Naming
- Use descriptive names that indicate the worker's purpose
- Include environment or instance identifiers for production deployments
- Consider using prefixes for different application components
Resource Management
- Share expensive resources (database connections, HTTP clients) through
Data<T> - Implement proper cleanup in your task handlers
- Monitor resource usage and set appropriate limits
Error Handling
- Always handle errors gracefully in your task functions
- Use the event system to monitor and log errors
- Implement appropriate retry strategies for transient failures
Graceful Shutdown
- Always implement signal handling for production deployments
- Allow tasks to complete before forcing shutdown
- Save state or progress information before stopping
Monitoring and Observability
- Use event listeners to integrate with monitoring systems
- Track task processing metrics
- Log important worker lifecycle events
Conclusion
Workers are the heart of your Apalis task processing system. Whether you choose the simple run() approach for straightforward processing or the stream() method for advanced control, understanding worker lifecycle management is crucial for building reliable background task systems. The flexible configuration options allow you to adapt workers to your specific requirements while maintaining robust error handling and monitoring capabilities.