Monitoring

Last edited 4 minutes ago.

On this page

Introduction

The Monitor component in apalis orchestrates one or more workers, tracking their lifecycle and providing hooks for observability, restart logic, and graceful shutdown. Rather than polling, it operates as an event-driven observer that hooks directly into worker execution.

Basic Setup

A minimal Monitor registers workers and runs the workers:

#[tokio::main]
async fn main() -> Result<()> {
    let mut backend = JsonStorage::new_temp()?;
    produce_task(&mut backend).await?;

    Monitor::new()
        .register(move |_run_id| {
            WorkerBuilder::new("tasty-avocado")
                .backend(backend.clone())
                .enable_tracing()
                .build(email_service)
        })
        .run()
        .await?;

    Ok(())
}

Registering Workers

Use .register() to add workers to the monitor. Each call receives a run_id you can use to distinguish restarts:

Monitor::new()
    .register(move |run_id| {
        WorkerBuilder::new("tasty-avocado")
            .backend(avocado_backend.clone())
            .enable_tracing()
            .build(email_service)
    })
    .register(move |run_id| {
        WorkerBuilder::new("tasty-pear")
            .backend(pear_backend.clone())
            .layer(TraceLayer::new().make_span_with(ContextualTaskSpan::new()))
            .build(email_service)
    })

Multiple workers can be registered and run concurrently. Each worker can have its own backend, middleware stack, and tracing configuration.

Listening to Events

Use .on_event() to receive lifecycle events from all registered workers. This is useful for centralized logging or metrics:

.on_event(|ctx, ev| {
    info!("Received {} event from {} Worker", ev, ctx.name());
})

ctx.name() returns the worker's name, and ev describes the event (e.g. task started, completed, or failed).

Restarting Workers

Use .should_restart() to define when a worker should be restarted after a failure. The closure receives the worker context, the error, and the number of times the worker has already been restarted:

.should_restart(|ctx, err, runs| {
    if ctx.name() == "tasty-pear"
        && err.to_string().contains("Recoverable Error")
        && runs < 5
    {
        return true;
    }
    false
})

This example restarts the tasty-pear worker up to 5 times, but only for recoverable errors.

Graceful Shutdown

.shutdown_timeout() sets how long the monitor waits for workers to finish before forcing an exit:

.shutdown_timeout(Duration::from_secs(5))

.run_with_signal() accepts any future that resolves when shutdown should begin — typically a CTRL+C signal:

.run_with_signal(tokio::signal::ctrl_c())
.await?;

Full Example

main.rs
use apalis::prelude::*;
use apalis_file_storage::JsonStorage;
use examples::*;
use tracing::info;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
    use tracing_subscriber::EnvFilter;

    let fmt_layer = tracing_subscriber::fmt::layer().with_target(false);
    let filter_layer =
        EnvFilter::try_from_default_env().or_else(|_| EnvFilter::try_new("debug"))?;
    tracing_subscriber::registry()
        .with(filter_layer)
        .with(fmt_layer)
        .init();

    let avocado_backend = JsonStorage::new_temp()?;
    let pear_backend = JsonStorage::new_temp()?;

    Monitor::new()
        .register(move |runs| {
            WorkerBuilder::new("tasty-avocado")
                .backend(avocado_backend.clone())
                .build(send_email)
        })
        .register(move |runs| {
            WorkerBuilder::new("tasty-pear")
                .backend(pear_backend.clone())
                .build(send_email)
        })
        .on_event(|ctx, ev| {
            info!("Received {} event from {} Worker", ev, ctx.name());
        })
        .should_restart(|ctx, err, runs| {
            if ctx.name() == "tasty-pear"
                && err.to_string().contains("Recoverable Error")
                && runs < 5
            {
                return true;
            }
            false
        })
        .shutdown_timeout(Duration::from_secs(5))
        .run_with_signal(tokio::signal::ctrl_c())
        .await?;

    Ok(())
}