Prometheus

Last edited 4 minutes ago.

On this page

Apalis provides PrometheusLayer, a Tower middleware layer that records task throughput and processing latency using the metrics crate facade. Because metrics is backend-agnostic, you can pair it with any compatible exporter — but the most common setup is metrics-exporter-prometheus, which exposes a /metrics scrape endpoint directly from your application.


What Gets Recorded

PrometheusLayer records two instruments on every task execution:

MetricNameDescription
Countertasks_totalIncremented once per task, regardless of outcome
Histogramtask_duration_secondsTime from task receipt to handler return, in seconds

Both are tagged with three labels:

LabelValueDescription
workerWorker nameThe name passed to WorkerBuilder::new
queueTask type nameThe Rust type name of the job arguments
status"Ok" or "Err"Whether the handler succeeded or failed

This lets you answer questions like:

  • What is the error rate for Email jobs on the tasty-banana worker?
  • What is the p99 processing time for ReportJob tasks?
  • How many tasks has each worker processed in the last 5 minutes?

Adding the Layer

use apalis::prelude::*;
use apalis::layers::prometheus::PrometheusLayer;

let worker = WorkerBuilder::new("tasty-banana")
    .backend(backend)
    .layer(PrometheusLayer::default())
    .build(my_handler);

PrometheusLayer has no configuration — it delegates all recorder concerns to the global metrics registry. Install your exporter before starting the worker and the layer will find it automatically.


Full Example: Worker + Axum /metrics Endpoint

A common pattern is to run the Apalis worker and an Axum HTTP server concurrently — the HTTP server handles job submission and exposes the /metrics scrape endpoint, while the worker processes jobs in the background.

use anyhow::Result;
use apalis::layers::prometheus::PrometheusLayer;
use apalis::prelude::*;
use apalis_file_storage::JsonStorage;
use axum::{
    Router,
    extract::Extension,
    routing::get,
    response::IntoResponse,
    http::StatusCode,
};
use futures::future::ready;
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};
use std::net::SocketAddr;

async fn send_email(email: Email) -> Result<(), BoxDynError> {
    // ... handler logic ...
    Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
    // 1. Install the Prometheus recorder globally before any metrics are recorded.
    let recorder_handle = setup_metrics_recorder();

    let backend = JsonStorage::new_temp().unwrap();

    // 2. HTTP server: job submission form + /metrics scrape endpoint.
    let app = Router::new()
        .route("/", get(show_form).post(add_new_job))
        .layer(Extension(backend.clone()))
        .route("/metrics", get(move || ready(recorder_handle.render())));

    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    let listener = tokio::net::TcpListener::bind(addr).await?;

    let http = async {
        axum::serve(listener, app).await
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
    };

    // 3. Worker: process jobs and record metrics via PrometheusLayer.
    let worker = async {
        WorkerBuilder::new("tasty-banana")
            .backend(backend)
            .layer(PrometheusLayer::default())
            .build(send_email)
            .run()
            .await
            .expect("Worker failed");
        Ok(())
    };

    // 4. Run both concurrently — if either fails, both shut down.
    futures::future::try_join(worker, http).await?;
    Ok(())
}

fn setup_metrics_recorder() -> PrometheusHandle {
    // Exponential buckets suited to background job latencies (5ms – 10s).
    const EXPONENTIAL_SECONDS: &[f64] = &[
        0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
    ];

    PrometheusBuilder::new()
        .set_buckets_for_metric(
            Matcher::Prefix("task".to_string()),
            EXPONENTIAL_SECONDS,
        )
        .expect("Could not configure Prometheus buckets")
        .install_recorder()
        .expect("Could not install Prometheus recorder")
}

Once running, Prometheus can scrape http://localhost:3000/metrics and you will see output like:

# HELP tasks_total Total number of tasks processed
# TYPE tasks_total counter
tasks_total{worker="tasty-banana",queue="email_service::Email",status="Ok"} 42
tasks_total{worker="tasty-banana",queue="email_service::Email",status="Err"} 3

# HELP task_duration_seconds Task processing duration in seconds
# TYPE task_duration_seconds histogram
task_duration_seconds_bucket{worker="tasty-banana",queue="email_service::Email",status="Ok",le="0.1"} 38
task_duration_seconds_bucket{worker="tasty-banana",queue="email_service::Email",status="Ok",le="0.5"} 41
...
task_duration_seconds_sum{...} 4.327
task_duration_seconds_count{...} 42

Setting Up the Recorder

PrometheusLayer uses the metrics crate facade, which requires a recorder to be installed globally before any metrics are emitted. Call setup_metrics_recorder() — or your equivalent — before building any workers:

fn setup_metrics_recorder() -> PrometheusHandle {
    PrometheusBuilder::new()
        .install_recorder()
        .expect("Could not install Prometheus recorder")
}

The returned PrometheusHandle is the render handle — call .render() on it inside your metrics route handler to produce the current scrape payload. It is cheaply cloneable and safe to share across threads.

Order matters: install the recorder before calling WorkerBuilder::build. Metrics emitted before a recorder is installed are silently dropped by the metrics facade.


Histogram Bucket Configuration

The default PrometheusBuilder buckets are designed for HTTP request latencies and may not suit background job workloads. Use set_buckets_for_metric with a Matcher to override them for Apalis metrics:

PrometheusBuilder::new()
    .set_buckets_for_metric(
        Matcher::Prefix("task".to_string()),  // matches tasks_total and task_duration_seconds
        &[0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 300.0],
    )
    .expect("Could not configure buckets")
    .install_recorder()
    .expect("Could not install recorder")

Adjust bucket boundaries to match your job latency distribution. If most jobs complete in under a second but some run for minutes, include both fine-grained sub-second buckets and coarse multi-minute ones.


Combining with Tracing and OpenTelemetry

PrometheusLayer and TraceLayer are independent Tower layers and compose freely. Add both to the same worker to get traces and Prometheus metrics simultaneously:

use apalis::layers::tracing::TraceLayer;
use apalis::layers::prometheus::PrometheusLayer;

let worker = WorkerBuilder::new("tasty-banana")
    .backend(backend)
    .layer(PrometheusLayer::default())  // outermost — measures total wall time
    .layer(TraceLayer::new())            // inner — spans cover handler execution
    .build(my_handler);

If you are already using OpenTelemetry and prefer to export metrics via OTLP rather than a Prometheus scrape endpoint, use OpenTelemetryMetricsLayer instead — it records equivalent instruments using OTel semantic conventions and the global OTel meter provider.

PrometheusLayerOpenTelemetryMetricsLayer
Metric namestasks_total, task_duration_secondsmessaging.client.consumed.messages, messaging.process.duration
Backendmetrics crate facadeOTel global meter
ExporterPrometheus scrape endpointOTLP, Jaeger, Datadog, etc.
Semantic conventionsCustomOTel Messaging
SetupInstall recorder onceSet global meter provider

Choose PrometheusLayer when Prometheus is your primary metrics backend. Choose OpenTelemetryMetricsLayer when you want vendor-neutral OTel export or are already using an OTel collector.


Summary

PrometheusLayer adds two lines to your WorkerBuilder and gives you full task throughput and latency visibility in Prometheus:

WorkerBuilder::new("worker-name")
  .layer(PrometheusLayer::default())   ← add this
  .build(handler)

Pair it with metrics-exporter-prometheus to expose a scrape endpoint, configure histogram buckets for your latency profile, and compose it with TraceLayer for correlated traces alongside your metrics.