Introduction

Last edited 4 minutes ago.

On this page

Workers are the core execution engines in Apalis that process tasks from your chosen backend. They continuously poll for available tasks, execute your service functions, and handle the complete task lifecycle. Understanding how to create, configure, and control workers is essential for building robust task processing systems.

Basic Worker Concepts

A worker in Apalis consists of:

  • Name: A unique identifier for the worker instance
  • Backend: The storage system where tasks are queued (Memory, PostgreSQL, Redis, etc.)
  • Service Function: The handler that processes individual tasks
  • Configuration: Settings for concurrency, retries, timeouts, and more

Creating Workers

Apalis provides two primary methods for running workers, each suited to different use cases and control requirements.

Method 1: Direct Execution with run()

The run() method starts the worker and blocks until it completes or is stopped. This is the simplest approach for straightforward task processing:

let future_worker = WorkerBuilder::new("future-worker")
    .backend(storage)
    .on_event(|ctx, event| {
        println!("Worker '{}' event: {:?}", ctx.name(), event);
    })
    .build(send_email)
    .run();

Method 2: Event Stream with stream()

The stream() method returns an event stream that allows you to monitor worker events in real-time and maintain more control over the worker lifecycle:

let worker = WorkerBuilder::new("streaming-worker")
    .backend(storage)
    .on_event(|ctx, event| {
        println!("Worker '{}' received event: {:?}", ctx.name(), event);
    })
    .build(send_email);
    
// Get event stream instead of blocking
let mut event_stream = worker.stream();

let stream_worker = async move {
    
    // Process events as they occur
    while let Some(event_result) = event_stream.next().await {
        match event_result {
            Ok(event) => {
                println!("Stream event: {:?}", event);
                
                // You can react to specific events
                match event {
                    // Handle specific event types if needed
                    _ => {}
                }
            }
            Err(e) => {
                eprintln!("Worker error: {:?}", e);
                break;
            }
        }
    }
};

Worker Lifecycle

Stopping Workers

Workers can be stopped in several ways:

1. Programmatic Stopping

Use the WorkerContext to stop the worker from within a task:

async fn controlled_task(
    task: Email,
    worker: WorkerContext,
) -> Result<(), TaskError> {
    // Process the task
    process_business_logic(&task).await?;
    
    // Stop worker based on some condition
    if should_stop_processing(&task) {
        worker.stop()?;
    }
    
    Ok(())
}

More of this is the complete lifecycle section

2. External Signals

Workers automatically respond to common termination signals:

let worker = WorkerBuilder::new("signal-worker")
    .backend(storage)
    .build(send_email)
    .run_until(tokio::signal::ctrl_c());

Basically run_until accepts any Future which when resolves starts the worker shutdown process

3. Resuming workers

WorkerContext offers a way to resume workers. By default one doesnt have access to the context outside the task function, but in some cases you may want to pass the worker context into other parts.

let mut ctx = WorkerContext::new::<Service>(&self.name);
let worker = WorkerBuilder::new("timeout-worker")
    .backend(storage)
    .build(send_email)
    .run_with_ctx(&mut ctx);

// Use ctx 
ctx.pause()?

// Later
ctx.resume()?

Event Monitoring

Monitor worker lifecycle events for observability:

let worker = WorkerBuilder::new("monitored-worker")
    .backend(storage)
    .on_event(|ctx, event| {
        match event {
            Event::Start => {
                println!("Worker '{}' started", ctx.name());
            }
            Event::Stop => {
                println!("Worker '{}' stopped", ctx.name());
            }
            Event::Error(error) => {
                eprintln!("Error in worker '{}': {:?}", ctx.name(), error);
            }
            _ => {

            }
        }
    })
    .build(send_email);

Putting it all together

main.rs
use apalis::prelude::*;
use examples::*;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
    let mut storage = MemoryStorage::new();
    let future_worker = WorkerBuilder::new("future-worker")
        .backend(storage)
        .on_event(|ctx, event| {
            println!("Worker '{}' event: {:?}", ctx.name(), event);
        })
        .build(send_email)
        .run();

    let mut storage = MemoryStorage::new();
    let worker = WorkerBuilder::new("streaming-worker")
        .backend(storage)
        .on_event(|ctx, event| {
            println!("Worker '{}' received event: {:?}", ctx.name(), event);
        })
        .build(send_email);
    
    // Get event stream instead of blocking
    let mut event_stream = worker.stream();

    let stream_worker = async move {
    
        // Process events as they occur
        while let Some(event_result) = event_stream.next().await {
            match event_result {
                Ok(event) => {
                    println!("Stream event: {:?}", event);
                
                    // You can react to specific events
                    match event {
                        // Handle specific event types if needed
                        _ => {}
                    }
                }
                Err(e) => {
                    eprintln!("Worker error: {:?}", e);
                    break;
                }
            }
        }
    };

    let mut storage = MemoryStorage::new();
    let worker = WorkerBuilder::new("monitored-worker")
        .backend(storage)
        .on_event(|ctx, event| {
            match event {
                Event::Start => {
                    println!("Worker '{}' started", ctx.name());
                }
                Event::Stop => {
                    println!("Worker '{}' stopped", ctx.name());
                }
                Event::Error(error) => {
                    eprintln!("Error in worker '{}': {:?}", ctx.name(), error);
                }
                _ => {

                }
            }
        })
        .build(send_email);

    let mut storage = MemoryStorage::new();
    let worker = WorkerBuilder::new("signal-worker")
        .backend(storage)
        .build(send_email)
        .run_until(tokio::signal::ctrl_c());

    Ok(())
}

Best Practices

Worker Naming

  • Use descriptive names that indicate the worker's purpose
  • Include environment or instance identifiers for production deployments
  • Consider using prefixes for different application components

Resource Management

  • Share expensive resources (database connections, HTTP clients) through Data<T>
  • Implement proper cleanup in your task handlers
  • Monitor resource usage and set appropriate limits

Error Handling

  • Always handle errors gracefully in your task functions
  • Use the event system to monitor and log errors
  • Implement appropriate retry strategies for transient failures

Graceful Shutdown

  • Always implement signal handling for production deployments
  • Allow tasks to complete before forcing shutdown
  • Save state or progress information before stopping

Monitoring and Observability

  • Use event listeners to integrate with monitoring systems
  • Track task processing metrics
  • Log important worker lifecycle events

Conclusion

Workers are the heart of your Apalis task processing system. Whether you choose the simple run() approach for straightforward processing or the stream() method for advanced control, understanding worker lifecycle management is crucial for building reliable background task systems. The flexible configuration options allow you to adapt workers to your specific requirements while maintaining robust error handling and monitoring capabilities.