OpenTelemetry

Last edited 4 minutes ago.

On this page

Apalis provides first-class support for OpenTelemetry through two complementary mechanisms:

  • Metrics via OpenTelemetryMetricsLayer — counters and histograms recorded against every task execution
  • Traces via TraceLayer with TracingContext — span propagation from upstream services into background jobs

Both are independent Tower middleware layers and can be used together or separately. This page covers both and shows how they compose.


OpenTelemetry Metrics

What gets recorded

OpenTelemetryMetricsLayer automatically records two instruments on every task execution, following the OpenTelemetry Messaging semantic conventions:

InstrumentNameUnitDescription
Countermessaging.client.consumed.messages{message}Incremented once per task, regardless of outcome
Histogrammessaging.process.durationsTime from task receipt to handler completion

Every observation is tagged with a consistent set of attributes:

AttributeValueDescription
messaging.system"apalis"Identifies the messaging system
messaging.operation.name"process"The operation being performed
messaging.destination.nameTask type nameThe Rust type name of the job arguments
messaging.destination.partition.idWorker nameThe named worker processing the task
apalis.status"Ok" or "Err"Whether the handler returned Ok or Err

These attributes let you slice metrics by worker, job type, and outcome — for example, plotting error rates per job type or p99 latency per worker.

The histogram buckets are pre-configured for background job workloads, covering everything from sub-10ms fast tasks to 10-minute long-running jobs:

0.005s, 0.01s, 0.025s, 0.05s, 0.1s, 0.25s, 0.5s,
1s, 2.5s, 5s, 10s, 15s, 20s, 30s, 60s, 120s, 300s, 600s

Adding the layer

use apalis::prelude::*;
use apalis::layers::OpenTelemetryMetricsLayer;

let worker = WorkerBuilder::new("tasty-avocado")
    .backend(backend)
    .layer(OpenTelemetryMetricsLayer::default())
    .build(my_handler);

OpenTelemetryMetricsLayer uses the global OTel meter — so it picks up whichever exporter you have registered (Prometheus, OTLP, etc.) without any further configuration.

Initialising the global meter provider

Before starting your workers, register your OTel exporter. Here is an example using the OTLP exporter to send metrics to a collector:

use opentelemetry_sdk::metrics::SdkMeterProvider;
use opentelemetry_otlp::WithExportConfig;

let exporter = opentelemetry_otlp::new_exporter()
    .tonic()
    .with_endpoint("http://localhost:4317");

let provider = opentelemetry_otlp::new_pipeline()
    .metrics(opentelemetry_sdk::runtime::Tokio)
    .with_exporter(exporter)
    .build()?;

opentelemetry::global::set_meter_provider(provider);

For Prometheus, use opentelemetry-prometheus and register the exporter similarly. Any provider that implements the OTel metrics API will work.


OpenTelemetry Traces

Distributed traces are handled by the TraceLayer from apalis::layers::tracing, which integrates with the tracing crate. The tracing-opentelemetry crate bridges tracing spans into OTel spans, so the two systems compose naturally.

Basic setup

use apalis::prelude::*;
use apalis::layers::WorkerBuilderExt;

// enable_tracing() adds a TraceLayer with default settings.
let worker = WorkerBuilder::new("tasty-avocado")
    .backend(backend)
    .enable_tracing()
    .build(my_handler);

To export those spans to an OTel collector, register a tracing-opentelemetry layer alongside your subscriber:

use tracing_subscriber::prelude::*;
use tracing_opentelemetry::OpenTelemetryLayer;

let tracer = /* configure your OTel tracer */;

tracing_subscriber::registry()
    .with(tracing_subscriber::fmt::layer())
    .with(OpenTelemetryLayer::new(tracer))
    .init();

Every span emitted by TraceLayer — including task ID, attempt, and any TracingContext fields — will now flow through to your tracing backend.

Propagating upstream trace context

When a job is enqueued from an instrumented HTTP handler or gRPC service, you can carry the upstream trace_id and span_id into the task so the background job appears as a child span in your trace visualiser:

use apalis::layers::tracing::TracingContext;
use apalis::prelude::*;

// Extract from the active OTel context in your request handler.
let context = TracingContext::new()
    .with_trace_id(&current_trace_id)
    .with_span_id(&current_span_id)
    .with_trace_flags(1)
    .with_trace_state("vendor=value");

let task = Task::builder(my_job).meta(context).build();
storage.send(task).await?;

On the worker side, use ContextualTaskSpan to read that stored context back into the emitted span:

use apalis::layers::tracing::{ContextualTaskSpan, TraceLayer};

let worker = WorkerBuilder::new("tasty-pear")
    .backend(backend)
    .layer(TraceLayer::new().make_span_with(ContextualTaskSpan::new()))
    .build(my_handler);

See Tracing Integration for a full explanation of TracingContext, ContextualTaskSpan, and custom MakeSpan implementations.


Combining Metrics and Traces

Both layers are independent Tower services and compose freely. Layer order matters — middleware wraps inward, so the outermost layer sees the task first and last:

use apalis::prelude::*;
use apalis::layers::tracing::{ContextualTaskSpan, TraceLayer};
use apalis::layers::OpenTelemetryMetricsLayer;

let worker = WorkerBuilder::new("tasty-mango")
    .backend(backend)
    // Metrics layer is outermost — it measures total wall time
    // including any time spent in the trace layer.
    .layer(OpenTelemetryMetricsLayer::default())
    // Trace layer is inner — its span covers the handler execution.
    .layer(TraceLayer::new().make_span_with(ContextualTaskSpan::new()))
    .build(my_handler);

With this setup, every task produces:

  • A tracing span (exported as an OTel trace span via tracing-opentelemetry)
  • An increment on messaging.client.consumed.messages
  • A duration sample on messaging.process.duration

All correlated by the same task_id field — present in both the span attributes and the metric attributes.


Full Example

use apalis::prelude::*;
use apalis::layers::tracing::{ContextualTaskSpan, TraceLayer};
use apalis::layers::WorkerBuilderExt;
use apalis::layers::OpenTelemetryMetricsLayer;
use tracing_subscriber::prelude::*;

#[derive(Clone)]
struct SendReport { report_id: u64 }

async fn send_report(job: SendReport) -> Result<(), BoxDynError> {
    tracing::info!(report_id = job.report_id, "Sending report");
    // ... do work ...
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
    // 1. Initialise tracing subscriber with OTel export.
    let tracer = init_otel_tracer()?; // your setup here
    tracing_subscriber::registry()
        .with(tracing_subscriber::fmt::layer())
        .with(tracing_opentelemetry::layer().with_tracer(tracer))
        .init();

    // 2. Initialise OTel meter provider.
    init_otel_metrics()?; // your setup here

    // 3. Build and run the worker.
    let worker = WorkerBuilder::new("report-worker")
        .backend(backend)
        .layer(OpenTelemetryMetricsLayer::default())
        .layer(TraceLayer::new().make_span_with(ContextualTaskSpan::new()))
        .build(send_report);

    worker.run().await?;
    Ok(())
}

Summary

LayerWhat it provides
OpenTelemetryMetricsLayerTask counter + duration histogram, OTel semantic conventions
TraceLayer + enable_tracing()Per-task tracing spans, exported via tracing-opentelemetry
ContextualTaskSpanReads enqueued TracingContext to link jobs back to upstream traces
TracingContextMetadata type for carrying W3C trace fields through the queue