OpenTelemetry
On this page
Apalis provides first-class support for OpenTelemetry through two complementary mechanisms:
- Metrics via
OpenTelemetryMetricsLayer— counters and histograms recorded against every task execution - Traces via
TraceLayerwithTracingContext— span propagation from upstream services into background jobs
Both are independent Tower middleware layers and can be used together or separately. This page covers both and shows how they compose.
OpenTelemetry Metrics
What gets recorded
OpenTelemetryMetricsLayer automatically records two instruments on every task execution, following the OpenTelemetry Messaging semantic conventions:
| Instrument | Name | Unit | Description |
|---|---|---|---|
| Counter | messaging.client.consumed.messages | {message} | Incremented once per task, regardless of outcome |
| Histogram | messaging.process.duration | s | Time from task receipt to handler completion |
Every observation is tagged with a consistent set of attributes:
| Attribute | Value | Description |
|---|---|---|
messaging.system | "apalis" | Identifies the messaging system |
messaging.operation.name | "process" | The operation being performed |
messaging.destination.name | Task type name | The Rust type name of the job arguments |
messaging.destination.partition.id | Worker name | The named worker processing the task |
apalis.status | "Ok" or "Err" | Whether the handler returned Ok or Err |
These attributes let you slice metrics by worker, job type, and outcome — for example, plotting error rates per job type or p99 latency per worker.
The histogram buckets are pre-configured for background job workloads, covering everything from sub-10ms fast tasks to 10-minute long-running jobs:
0.005s, 0.01s, 0.025s, 0.05s, 0.1s, 0.25s, 0.5s,
1s, 2.5s, 5s, 10s, 15s, 20s, 30s, 60s, 120s, 300s, 600s
Adding the layer
use apalis::prelude::*;
use apalis::layers::OpenTelemetryMetricsLayer;
let worker = WorkerBuilder::new("tasty-avocado")
.backend(backend)
.layer(OpenTelemetryMetricsLayer::default())
.build(my_handler);OpenTelemetryMetricsLayer uses the global OTel meter — so it picks up whichever exporter you have registered (Prometheus, OTLP, etc.) without any further configuration.
Initialising the global meter provider
Before starting your workers, register your OTel exporter. Here is an example using the OTLP exporter to send metrics to a collector:
use opentelemetry_sdk::metrics::SdkMeterProvider;
use opentelemetry_otlp::WithExportConfig;
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("http://localhost:4317");
let provider = opentelemetry_otlp::new_pipeline()
.metrics(opentelemetry_sdk::runtime::Tokio)
.with_exporter(exporter)
.build()?;
opentelemetry::global::set_meter_provider(provider);For Prometheus, use opentelemetry-prometheus and register the exporter similarly. Any provider that implements the OTel metrics API will work.
OpenTelemetry Traces
Distributed traces are handled by the TraceLayer from apalis::layers::tracing, which integrates with the tracing crate. The tracing-opentelemetry crate bridges tracing spans into OTel spans, so the two systems compose naturally.
Basic setup
use apalis::prelude::*;
use apalis::layers::WorkerBuilderExt;
// enable_tracing() adds a TraceLayer with default settings.
let worker = WorkerBuilder::new("tasty-avocado")
.backend(backend)
.enable_tracing()
.build(my_handler);To export those spans to an OTel collector, register a tracing-opentelemetry layer alongside your subscriber:
use tracing_subscriber::prelude::*;
use tracing_opentelemetry::OpenTelemetryLayer;
let tracer = /* configure your OTel tracer */;
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(OpenTelemetryLayer::new(tracer))
.init();Every span emitted by TraceLayer — including task ID, attempt, and any TracingContext fields — will now flow through to your tracing backend.
Propagating upstream trace context
When a job is enqueued from an instrumented HTTP handler or gRPC service, you can carry the upstream trace_id and span_id into the task so the background job appears as a child span in your trace visualiser:
use apalis::layers::tracing::TracingContext;
use apalis::prelude::*;
// Extract from the active OTel context in your request handler.
let context = TracingContext::new()
.with_trace_id(¤t_trace_id)
.with_span_id(¤t_span_id)
.with_trace_flags(1)
.with_trace_state("vendor=value");
let task = Task::builder(my_job).meta(context).build();
storage.send(task).await?;On the worker side, use ContextualTaskSpan to read that stored context back into the emitted span:
use apalis::layers::tracing::{ContextualTaskSpan, TraceLayer};
let worker = WorkerBuilder::new("tasty-pear")
.backend(backend)
.layer(TraceLayer::new().make_span_with(ContextualTaskSpan::new()))
.build(my_handler);See Tracing Integration for a full explanation of TracingContext, ContextualTaskSpan, and custom MakeSpan implementations.
Combining Metrics and Traces
Both layers are independent Tower services and compose freely. Layer order matters — middleware wraps inward, so the outermost layer sees the task first and last:
use apalis::prelude::*;
use apalis::layers::tracing::{ContextualTaskSpan, TraceLayer};
use apalis::layers::OpenTelemetryMetricsLayer;
let worker = WorkerBuilder::new("tasty-mango")
.backend(backend)
// Metrics layer is outermost — it measures total wall time
// including any time spent in the trace layer.
.layer(OpenTelemetryMetricsLayer::default())
// Trace layer is inner — its span covers the handler execution.
.layer(TraceLayer::new().make_span_with(ContextualTaskSpan::new()))
.build(my_handler);With this setup, every task produces:
- A
tracingspan (exported as an OTel trace span viatracing-opentelemetry) - An increment on
messaging.client.consumed.messages - A duration sample on
messaging.process.duration
All correlated by the same task_id field — present in both the span attributes and the metric attributes.
Full Example
use apalis::prelude::*;
use apalis::layers::tracing::{ContextualTaskSpan, TraceLayer};
use apalis::layers::WorkerBuilderExt;
use apalis::layers::OpenTelemetryMetricsLayer;
use tracing_subscriber::prelude::*;
#[derive(Clone)]
struct SendReport { report_id: u64 }
async fn send_report(job: SendReport) -> Result<(), BoxDynError> {
tracing::info!(report_id = job.report_id, "Sending report");
// ... do work ...
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
// 1. Initialise tracing subscriber with OTel export.
let tracer = init_otel_tracer()?; // your setup here
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.init();
// 2. Initialise OTel meter provider.
init_otel_metrics()?; // your setup here
// 3. Build and run the worker.
let worker = WorkerBuilder::new("report-worker")
.backend(backend)
.layer(OpenTelemetryMetricsLayer::default())
.layer(TraceLayer::new().make_span_with(ContextualTaskSpan::new()))
.build(send_report);
worker.run().await?;
Ok(())
}Summary
| Layer | What it provides |
|---|---|
OpenTelemetryMetricsLayer | Task counter + duration histogram, OTel semantic conventions |
TraceLayer + enable_tracing() | Per-task tracing spans, exported via tracing-opentelemetry |
ContextualTaskSpan | Reads enqueued TracingContext to link jobs back to upstream traces |
TracingContext | Metadata type for carrying W3C trace fields through the queue |