Middleware

Last edited 4 minutes ago.

On this page

Apalis builds on top of the Tower ecosystem, which means workers are powered by composable middleware layers. This allows you to extend, control, and observe job execution without modifying your core business logic.

Middleware can be used to introduce cross-cutting concerns such as retries, concurrency control, observability, and fault tolerance.

Why Middleware?

Middleware in Apalis allows you to:

  • Control how jobs are executed
  • Add resilience (retries, circuit breaking)
  • Improve observability and debugging
  • Enforce execution constraints (timeouts, parallelism)
  • Extend behavior without modifying handlers

They are applied as layers around your job handlers, similar to HTTP middleware in frameworks like Axum or Actix.


Built-in Middleware

Apalis provides several built-in middleware to handle common production concerns.

Parallelize

The parallelize middleware allows a single job to be split into multiple smaller tasks that can be processed concurrently.

#[tokio::main]
async fn main() {
    async fn task(task: u32) {
        println!("Processing task: {task}");
    }
    let in_memory = MemoryStorage::new();
    let worker = WorkerBuilder::new("rango-tango")
        .backend(in_memory)
        .parallelize(tokio::spawn)
        .on_event(|ctx, ev| {
            println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
        })
        .build(task);
    worker.run().await.unwrap();
}

This is useful when:

  • Processing large datasets in chunks
  • Performing batch operations
  • Distributing work across multiple workers

Instead of handling a job sequentially, you can break it into independent units and execute them in parallel, improving throughput and reducing latency.


Circuit Breaker

The circuit_breaker middleware helps protect your system from repeated failures when interacting with unreliable dependencies.

#[tokio::main]
async fn main() {
    let mut in_memory = MemoryStorage::new();

    in_memory.push(42u32).await.unwrap();

    async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
        if task % 3 == 0 {
            return Ok(());
        } else {
            return Err("Expected Error stopped!")?;
        }
    }

    let config = CircuitBreakerConfig::default()
        .with_failure_threshold(1)
        .with_recovery_timeout(Duration::from_secs(1))
        .with_success_threshold(0.5)
        .with_half_open_max_calls(1);

    let worker = WorkerBuilder::new("rango-tango")
        .backend(in_memory)
        .break_circuit_with(config)
        .build(task);

    worker.run().await.unwrap();
}

It works by:

  • Monitoring failures in job execution
  • Temporarily stopping execution when failures exceed a threshold
  • Allowing recovery after a cooldown period

This is especially useful for:

  • External API calls
  • Database outages
  • Third-party integrations

By failing fast, it prevents cascading failures and reduces system load.


Long Running Jobs

The long_running middleware is designed for tasks that take significant time to complete.

#[tokio::main]
async fn main() {
    let mut in_memory = MemoryStorage::new();
    in_memory.push(42).await.unwrap();

    async fn task(
        task: u32,
        runner: Runner,
    ) -> Result<u32, BoxDynError> {
        let (ctx, receiver) = runner.channel();
        // Spawn and track the long-running task
        // Futures should be spawned by an executor and not awaited directly
        // Graceful shutdown is also ensured.
        tokio::spawn(ctx.execute(async move {
            // Perform a long running task
            tokio::time::sleep(Duration::from_secs(1)).await;
            task
        }));
        // Close the context and await for the results
        ctx.wait().await;
        let res = receiver.try_collect::<Vec<_>>().await?.iter().sum::<u32>();
        Ok(res)
    }

    let worker = WorkerBuilder::new("rango-tango")
        .backend(in_memory)
        .long_running()
        .build(task);
    worker.run().await.unwrap();
}

It provides:

  • Better handling of long-lived tasks
  • Visibility into execution state
  • Protection against premature termination

Use this middleware when:

  • Processing large files
  • Running heavy computations
  • Executing workflows that span minutes or hours

It ensures that such jobs are managed safely and predictably.


Acknowledgements (Ack)

The ack middleware controls when a job is acknowledged as completed.

This is critical for reliability, as it determines when a job is considered successfully processed and removed from the queue.

With ack, you can:

  • Acknowledge jobs only after successful execution
  • Delay acknowledgement until side effects are confirmed
  • Avoid losing jobs in case of failures

This is particularly important when working with persistent backends like Redis or databases.

#[tokio::main]
async fn main() {
    let mut in_memory = MemoryStorage::new();
    in_memory.push(42).await.unwrap();

    async fn task(
        task: u32,
        ctx: WorkerContext,
    ) -> Result<(), BoxDynError> {
        Ok(())
    }

    #[derive(Debug, Clone)]
    struct MyAcknowledger;

    impl<Ctx: Debug, IdType: Debug> Acknowledge<(), Ctx, IdType> for MyAcknowledger {
        type Error = SendError<()>;
        type Future = BoxFuture<'static, Result<(), Self::Error>>;
        fn ack(
            &mut self,
            res: &Result<(), BoxDynError>,
            parts: &Parts<Ctx, IdType>,
        ) -> Self::Future {
            println!("{res:?}, {parts:?}");
            ready(Ok(())).boxed()
        }
    }

    let worker = WorkerBuilder::new("rango-tango")
        .backend(in_memory)
        .ack_with(MyAcknowledger)
        .on_event(|ctx, ev| {
            println!("On Event = {:?}", ev);
        })
        .build(task);
    worker.run().await.unwrap();
}

Composing Middleware

Middleware in Apalis is composable. You can combine multiple layers to build powerful execution pipelines.

For example:

  • Use parallelize + ack for safe concurrent processing
  • Use circuit_breaker + retries for resilient external calls
  • Use long_running + monitoring for heavy workloads

Because Apalis integrates with Tower, middleware can be stacked and reused across workers, enabling consistent behavior across your system.


Summary

Middleware is a core part of Apalis’ design, enabling you to build robust and production-ready background processing systems.

By leveraging built-in middleware like:

  • parallelize
  • circuit_breaker
  • long_running
  • ack

you can handle concurrency, failures, and execution guarantees with minimal effort while keeping your job handlers simple and focused.