Introduction

Last edited 4 minutes ago.

On this page

Apalis provides flexible ways to define task handlers that process your background tasks. You can pass in any type that implements [IntoWorkerService] such as:

  1. Async functions with dependency injection
  2. A type that implements tower::Service trait implementation.
  3. Workflow: Sequential workflows
  4. DagFlow: DAG workflows

All workflows accept are a representation of multiple services which can accept async functions or tower services. We will discuss these with the knowledge that the same knowledge holds for workflows.

Async Function

The simplest way to define a task handler in Apalis is using async functions. This approach is perfect for straightforward task processing where you need clean, readable code with minimal boilerplate.

Basic Usage

#[derive(Clone)]
struct State {
    // .. client: Arc<MailClient>
}

async fn send_email(email: Email, state: Data<State>) -> Result<(), BoxDynError> {
    // Process the email task
    println!("Sending email to {} with subject: {}", email.to, email.subject);
    Ok(())
}

Key Features

Async function services offer several powerful capabilities:

  • Automatic Argument Extraction: Functions can accept up to 16 additional arguments beyond the core request, with each argument automatically extracted using the FromRequest trait
  • Flexible Parameter Types: Extract various types including request data, application state, headers, and custom extractors
  • Response Conversion: Function outputs are automatically converted to proper responses using the IntoResponse trait
  • Compile-time Safety: All argument types are captured at compile time through generics, ensuring static dispatch and type safety

Dependency Injection

The async function approach supports rich parameter extraction patterns:

async fn complex_handler(
    // Task arguments
    email: Email,
    // Task global state, passed via .data()
    state: Data<State>,
    // The worker context for the worker that is running
    worker: WorkerContext,
    // The task_id
    task_id: TaskId<RandomId>
) -> Result<(), BoxDynError> {
    // Your task logic here
    Ok(())
}

Service trait

For more advanced scenarios requiring fine-grained control over request processing, you can implement the tower::Service trait directly. This approach accepts Request<Args, Context> and provides maximum flexibility.

When to Use Tower Services

Consider using tower::Service when you need:

  • Custom task preprocessing or validation
  • Complex error handling strategies
  • Integration with existing Tower middleware
  • Full control over the request/response lifecycle
  • Advanced routing or request transformation logic

Example

#[derive(Clone)]
pub struct EmailService {
    state: State
}

impl EmailService {
    pub fn new(state: State) -> Self {
        EmailService {
            state
        }
    }
}

impl<Ctx> Service<Task<Email, Ctx, RandomId>> for EmailService 
where
    Ctx: Send + 'static,
{
    type Response = ();
    type Error = BoxDynError;
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: Task<Email, Ctx, RandomId>) -> Self::Future {
        let state = self.state.clone();
        
        Box::pin(async move {
            // Custom request processing logic
            // let result = send_email(req, state).await?;
            Ok(())
        })
    }
}

Debugging

Lets say you have a handler that is invalid

#[derive(Debug)]
struct Msg {
    id: u64,
}

async fn handle_task(item: Msg, wrk: WorkerContext) -> Result<(), BoxDynError> {
    println!("Received {item:?}");
    Ok(())
}

The error produced by the compiler can be a little vague:

the trait bound `fn(..., ...) -> ... {handle_task}: IntoWorkerServiceExt<_, ..., _, ..., ...>` is not satisfied

This is usually caused by the relationship between IntoWorkerService and IntoWorkerServiceExt where the latter builds on top of the former meaning some type information may be lost.

To debug this issue, it is recommended to use IntoWorkerService to direct you to the right problem.

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut backend = PostgresStorage::new_with_notify(&pool, &Config::default());
    IntoWorkerService::into_service(handle_task, backend);
}

This will lead the compiler giving you a better error:

the trait bound `Msg: serde::Serialize` is not satisfied
for local types consider adding `#[derive(serde::Serialize)]` to your `MyMsg` type
for types from other crates check whether the crate offers a `serde` feature flag
the following other types implement trait `Serialize`:
..

Complete example

main.rs
use apalis::prelude::*;
use std::task::{Context, Poll};
use apalis::layers::tracing::{TracingContext};
use futures::future::BoxFuture;
use examples::*;

#[derive(Clone)]
struct State {
    // .. client: Arc<MailClient>
}

async fn send_email(email: Email, state: Data<State>) -> Result<(), BoxDynError> {
    // Process the email task
    println!("Sending email to {} with subject: {}", email.to, email.subject);
    Ok(())
}

async fn complex_handler(
    // Task arguments
    email: Email,
    // Task global state, passed via .data()
    state: Data<State>,
    // The worker context for the worker that is running
    worker: WorkerContext,
    // The task_id
    task_id: TaskId<RandomId>
) -> Result<(), BoxDynError> {
    // Your task logic here
    Ok(())
}

#[derive(Clone)]
pub struct EmailService {
    state: State
}

impl EmailService {
    pub fn new(state: State) -> Self {
        EmailService {
            state
        }
    }
}

impl<Ctx> Service<Task<Email, Ctx, RandomId>> for EmailService 
where
    Ctx: Send + 'static,
{
    type Response = ();
    type Error = BoxDynError;
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: Task<Email, Ctx, RandomId>) -> Self::Future {
        let state = self.state.clone();
        
        Box::pin(async move {
            // Custom request processing logic
            // let result = send_email(req, state).await?;
            Ok(())
        })
    }
}

#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
    let mut storage = MemoryStorage::new();
    let simple_fn = WorkerBuilder::new("email_worker")
        .backend(storage)
        .data(State {})
        .build(send_email)
        .run();
    
    let mut storage = MemoryStorage::new();
    let complex_fn = WorkerBuilder::new("email_worker")
        .backend(storage)
        .data(State {})
        .build(complex_handler)
        .run();
    
    let mut storage = MemoryStorage::new();
    let svc_worker = WorkerBuilder::new("email_worker")
        .backend(storage)
        .build(EmailService::new(State {}))
        .run();
    Ok(())
}

Choosing the Right Approach

Use an async function when

  • You need simple, straightforward task processing
  • Your handlers don't require complex request manipulation
  • You want clean, readable code with minimal boilerplate
  • You're building standard operations or data processing tasks

Use the Service trait when

  • You need custom middleware integration
  • Your application requires complex request validation or transformation
  • You want to leverage existing Tower ecosystem components
  • You need fine-grained control over error handling and response formatting

Best Practices

Regardless of which approach you choose, consider these guidelines:

  • Keep handlers focused: Each handler should have a single, well-defined responsibility
  • Handle errors gracefully: Implement proper error handling to ensure task reliability
  • Use appropriate state management: Share state efficiently using Data<T> or service-level state
  • Consider performance: Async functions offer lower overhead for simple cases, while Tower services provide more control at the cost of complexity

Both approaches integrate seamlessly with Apalis's task processing pipeline, giving you the flexibility to choose the right tool for each specific use case in your application.