Extending Workers

Last edited 4 minutes ago.

On this page

Extending Workers with Middleware

Overview

In apalis, layers provide a way to extend worker functionality by wrapping task execution logic with additional behavior. A layer is a way to modify or enhance how tasks are processed by a worker. Since Apalis is built with Tower's Service abstraction, it allows users to apply Tower Layers to workers. You can extend a workers functionality by applying transformations, logging, error handling, and other processing layers to tasks before they are executed.

By applying middleware, you can:

  • Log task execution details (e.g., when a task starts and finishes)
  • Handle errors and retries in a structured manner
  • Inject shared dependencies like database connections or authentication
  • Apply custom transformations before task execution

This section will guide you on how to extend workers using middleware

Applying Middleware to a Worker

apalis workers are built on Tower services, allowing middleware to be applied using the .layer(...) method. Below is a breakdown of how to create and apply a logging middleware.

Define The Middleware

We will create a middleware that logs when a task starts processing.

The LoggingLayer struct implements the Layer trait, allowing it to wrap an existing worker service:

use apalis::prelude::*;
use tower::Layer;
use tracing::info;

struct LoggingLayer {
    namespace: String,
};

impl<S> Layer<S> for LoggingLayer {
    type Service = LoggingService<S>;
    
    fn layer(&self, inner: S) -> Self::Service {
        LoggingService { inner }
    }
}
  • Layer<S>: A generic trait that wraps another service (S).
  • LoggingLayer: The struct acts as a wrapper around an apalis worker.
  • layer(&self, inner: S) -> Self::Service: This method wraps the worker in the LoggingService, adding logging functionality.

Define the Logging Service

The LoggingService struct modifies the behavior of task execution by logging each request:

use tower::Service;
use std::task::{Context, Poll};

struct LoggingService<S> {
    inner: S,
}

impl<S, Req> Service<Req> for LoggingService<S>
where
    S: Service<Req>,
    Req: std::fmt::Debug,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, req: Req) -> Self::Future {
        info!("Processing task: {:?}", req);
        self.inner.call(req)
    }
}
  • LoggingService<S>: A wrapper struct that takes an inner service (S), which in this case is an apalis worker.
  • poll_ready: Checks if the service is ready to process a task. This defers to the worker’s readiness.
  • call(&mut self, req: Req) -> Self::Future:
    • Logs the task using tracing::info!.
    • Passes the task to the original worker service for processing.

Apply the Middleware to a Worker

To apply the logging middleware, use the .layer(...) method when creating the worker:

let worker = WorkerBuilder::new("email_worker")
    .layer(LoggingLayer::new())
    .build(|task: Email| async move {
        println!("Executing task: {:?}", task);
        Ok(())
    });