Introduction
On this page
Apalis provides flexible ways to define task handlers that process your background tasks.
You can pass in any type that implements [IntoWorkerService] such as:
- Async functions with dependency injection
- A type that implements
tower::Servicetrait implementation. - Workflow: Sequential workflows
- DagFlow: DAG workflows
All workflows accept are a representation of multiple services which can accept async functions or tower services. We will discuss these with the knowledge that the same knowledge holds for workflows.
Async Function
The simplest way to define a task handler in Apalis is using async functions. This approach is perfect for straightforward task processing where you need clean, readable code with minimal boilerplate.
Basic Usage
#[derive(Clone)]
struct State {
// .. client: Arc<MailClient>
}
async fn send_email(email: Email, state: Data<State>) -> Result<(), BoxDynError> {
// Process the email task
println!("Sending email to {} with subject: {}", email.to, email.subject);
Ok(())
}Key Features
Async function services offer several powerful capabilities:
- Automatic Argument Extraction: Functions can accept up to 16 additional arguments beyond the core request, with each argument automatically extracted using the
FromRequesttrait - Flexible Parameter Types: Extract various types including request data, application state, headers, and custom extractors
- Response Conversion: Function outputs are automatically converted to proper responses using the
IntoResponsetrait - Compile-time Safety: All argument types are captured at compile time through generics, ensuring static dispatch and type safety
Dependency Injection
The async function approach supports rich parameter extraction patterns:
async fn complex_handler(
// Task arguments
email: Email,
// Task global state, passed via .data()
state: Data<State>,
// The worker context for the worker that is running
worker: WorkerContext,
// The task_id
task_id: TaskId<RandomId>
) -> Result<(), BoxDynError> {
// Your task logic here
Ok(())
}Service trait
For more advanced scenarios requiring fine-grained control over request processing, you can implement the tower::Service trait directly. This approach accepts Request<Args, Context> and provides maximum flexibility.
When to Use Tower Services
Consider using tower::Service when you need:
- Custom task preprocessing or validation
- Complex error handling strategies
- Integration with existing Tower middleware
- Full control over the request/response lifecycle
- Advanced routing or request transformation logic
Example
#[derive(Clone)]
pub struct EmailService {
state: State
}
impl EmailService {
pub fn new(state: State) -> Self {
EmailService {
state
}
}
}
impl<Ctx> Service<Task<Email, Ctx, RandomId>> for EmailService
where
Ctx: Send + 'static,
{
type Response = ();
type Error = BoxDynError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Task<Email, Ctx, RandomId>) -> Self::Future {
let state = self.state.clone();
Box::pin(async move {
// Custom request processing logic
// let result = send_email(req, state).await?;
Ok(())
})
}
}Debugging
Lets say you have a handler that is invalid
#[derive(Debug)]
struct Msg {
id: u64,
}
async fn handle_task(item: Msg, wrk: WorkerContext) -> Result<(), BoxDynError> {
println!("Received {item:?}");
Ok(())
}The error produced by the compiler can be a little vague:
the trait bound `fn(..., ...) -> ... {handle_task}: IntoWorkerServiceExt<_, ..., _, ..., ...>` is not satisfied
This is usually caused by the relationship between IntoWorkerService and IntoWorkerServiceExt where the latter builds on top of the former meaning some type information may be lost.
To debug this issue, it is recommended to use IntoWorkerService to direct you to the right problem.
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut backend = PostgresStorage::new_with_notify(&pool, &Config::default());
IntoWorkerService::into_service(handle_task, backend);
}This will lead the compiler giving you a better error:
the trait bound `Msg: serde::Serialize` is not satisfied
for local types consider adding `#[derive(serde::Serialize)]` to your `MyMsg` type
for types from other crates check whether the crate offers a `serde` feature flag
the following other types implement trait `Serialize`:
..
Complete example
use apalis::prelude::*;
use std::task::{Context, Poll};
use apalis::layers::tracing::{TracingContext};
use futures::future::BoxFuture;
use examples::*;
#[derive(Clone)]
struct State {
// .. client: Arc<MailClient>
}
async fn send_email(email: Email, state: Data<State>) -> Result<(), BoxDynError> {
// Process the email task
println!("Sending email to {} with subject: {}", email.to, email.subject);
Ok(())
}
async fn complex_handler(
// Task arguments
email: Email,
// Task global state, passed via .data()
state: Data<State>,
// The worker context for the worker that is running
worker: WorkerContext,
// The task_id
task_id: TaskId<RandomId>
) -> Result<(), BoxDynError> {
// Your task logic here
Ok(())
}
#[derive(Clone)]
pub struct EmailService {
state: State
}
impl EmailService {
pub fn new(state: State) -> Self {
EmailService {
state
}
}
}
impl<Ctx> Service<Task<Email, Ctx, RandomId>> for EmailService
where
Ctx: Send + 'static,
{
type Response = ();
type Error = BoxDynError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Task<Email, Ctx, RandomId>) -> Self::Future {
let state = self.state.clone();
Box::pin(async move {
// Custom request processing logic
// let result = send_email(req, state).await?;
Ok(())
})
}
}
#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
let mut storage = MemoryStorage::new();
let simple_fn = WorkerBuilder::new("email_worker")
.backend(storage)
.data(State {})
.build(send_email)
.run();
let mut storage = MemoryStorage::new();
let complex_fn = WorkerBuilder::new("email_worker")
.backend(storage)
.data(State {})
.build(complex_handler)
.run();
let mut storage = MemoryStorage::new();
let svc_worker = WorkerBuilder::new("email_worker")
.backend(storage)
.build(EmailService::new(State {}))
.run();
Ok(())
}Choosing the Right Approach
Use an async function when
- You need simple, straightforward task processing
- Your handlers don't require complex request manipulation
- You want clean, readable code with minimal boilerplate
- You're building standard operations or data processing tasks
Use the Service trait when
- You need custom middleware integration
- Your application requires complex request validation or transformation
- You want to leverage existing Tower ecosystem components
- You need fine-grained control over error handling and response formatting
Best Practices
Regardless of which approach you choose, consider these guidelines:
- Keep handlers focused: Each handler should have a single, well-defined responsibility
- Handle errors gracefully: Implement proper error handling to ensure task reliability
- Use appropriate state management: Share state efficiently using
Data<T>or service-level state - Consider performance: Async functions offer lower overhead for simple cases, while Tower services provide more control at the cost of complexity
Both approaches integrate seamlessly with Apalis's task processing pipeline, giving you the flexibility to choose the right tool for each specific use case in your application.