Extending Workers
On this page
Extending Workers with Middleware
Overview
In apalis, layers provide a way to extend worker functionality by wrapping task execution logic with additional behavior. A layer is a way to modify or enhance how tasks are processed by a worker. Since Apalis is built with Tower's Service abstraction, it allows users to apply Tower Layers to workers. You can extend a workers functionality by applying transformations, logging, error handling, and other processing layers to tasks before they are executed.
By applying middleware, you can:
- Log task execution details (e.g., when a task starts and finishes)
- Handle errors and retries in a structured manner
- Inject shared dependencies like database connections or authentication
- Apply custom transformations before task execution
This section will guide you on how to extend workers using middleware
Applying Middleware to a Worker
apalis workers are built on Tower services, allowing middleware to be applied using the .layer(...) method. Below is a breakdown of how to create and apply a logging middleware.
Define The Middleware
We will create a middleware that logs when a task starts processing.
The LoggingLayer struct implements the Layer trait, allowing it to wrap an existing worker service:
use apalis::prelude::*;
use tower::Layer;
use tracing::info;
struct LoggingLayer {
namespace: String,
};
impl<S> Layer<S> for LoggingLayer {
type Service = LoggingService<S>;
fn layer(&self, inner: S) -> Self::Service {
LoggingService { inner }
}
}Layer<S>: A generic trait that wraps another service (S).LoggingLayer: The struct acts as a wrapper around an apalis worker.layer(&self, inner: S) -> Self::Service: This method wraps the worker in theLoggingService, adding logging functionality.
Define the Logging Service
The LoggingService struct modifies the behavior of task execution by logging each request:
use tower::Service;
use std::task::{Context, Poll};
struct LoggingService<S> {
inner: S,
}
impl<S, Req> Service<Req> for LoggingService<S>
where
S: Service<Req>,
Req: std::fmt::Debug,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Req) -> Self::Future {
info!("Processing task: {:?}", req);
self.inner.call(req)
}
}LoggingService<S>: A wrapper struct that takes an inner service (S), which in this case is an apalis worker.poll_ready: Checks if the service is ready to process a task. This defers to the worker’s readiness.call(&mut self, req: Req) -> Self::Future:- Logs the task using
tracing::info!. - Passes the task to the original worker service for processing.
- Logs the task using
Apply the Middleware to a Worker
To apply the logging middleware, use the .layer(...) method when creating the worker:
let worker = WorkerBuilder::new("email_worker")
.layer(LoggingLayer::new())
.build(|task: Email| async move {
println!("Executing task: {:?}", task);
Ok(())
});