Piping Streams
On this page
Not all task sources are persistent queues. Cron schedules, webhook event streams, change-data-capture feeds, and other time-driven sources produce tasks ephemerally — if your worker is down when a tick fires, that work is simply lost.
PipeExt solves this by letting you route any Stream into a backend before a worker processes it. The stream's items are written to storage first, then consumed by the worker — giving you persistence, retries, and observability over sources that would otherwise be fire-and-forget.
How It Works
Without piping, a cron stream delivers ticks directly to the worker. If the worker is unavailable, the tick is gone:
CronStream ──▶ Worker
(tick lost if worker is down)
With pipe_to, ticks are written to a backend first. The worker reads from the backend, not the stream:
CronStream ──▶ pipe_to(SqliteStorage) ──▶ Worker
(tick persisted) (retryable, observable)
The resulting Pipe type itself implements Backend — so it plugs into WorkerBuilder exactly like any other backend.
The PipeExt Trait
pub trait PipeExt<B, Args, Ctx>: Sized {
/// Pipe this stream into the provided backend.
fn pipe_to(self, backend: B) -> Pipe<Self, B, Args, Ctx>;
}pipe_to is available on any stream whose items are Result<Args, Err> and whose target backend implements both [Backend] and TaskSink<Args>. The returned [Pipe] wraps both the source stream and the destination backend, forwarding items from one to the other when the worker starts polling.
Example: Persisting a Cron Schedule to SQLite
The most common use case is giving a cron schedule durability. Without persistence, a missed tick is unrecoverable. With pipe_to, each tick is written to SQLite before the worker processes it — so missed ticks can be retried and the full job history is queryable.
use apalis::{layers::retry::RetryPolicy, prelude::*};
use apalis_cron::{CronStream, Tick};
use apalis_sqlite::{SqlitePool, SqliteStorage};
use cron::Schedule;
use std::str::FromStr;
async fn handle_tick(tick: Tick, data: Data<usize>) {
// Process the current scheduled tick.
}
#[tokio::main]
async fn main() {
// 1. Define the cron schedule.
let schedule = Schedule::from_str("@daily").unwrap();
// 2. Set up the SQLite backend.
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
SqliteStorage::setup(&pool)
.await
.expect("unable to run migrations for sqlite");
let sqlite = SqliteStorage::new(&pool);
// 3. Connect the stream to the backend.
// Ticks are written to SQLite before the worker sees them.
let backend = CronStream::new(schedule).pipe_to(sqlite);
// 4. Build the worker as normal.
let worker = WorkerBuilder::new("morning-cereal")
.backend(backend)
.retry(RetryPolicy::retries(5))
.data(42usize)
.build(handle_tick);
worker.run().await.unwrap();
}What this buys you
- Durability — ticks are persisted before processing; a worker restart does not lose them
- Retries — a failed tick handler is retried up to 5 times via
RetryPolicy - Observability — tick history is stored in SQLite and queryable via
ListTasks - Backpressure — the in-memory dequeue backend naturally throttles the stream if the worker falls behind
Example: Piping an In-Memory Stream
For testing or short-lived workloads, you can pipe any finite stream directly into an in-memory backend:
use futures_util::stream;
use apalis_core::backend::{pipe::PipeExt, dequeue};
use apalis_core::worker::context::WorkerContext;
use apalis_core::error::BoxDynError;
use std::time::Duration;
#[tokio::main]
async fn main() {
let stream = stream::iter(0..10).map(|s| Ok::<_, std::io::Error>(s));
let in_memory = dequeue::backend::<u32>(Duration::from_secs(1));
let backend = stream.pipe_to(in_memory);
async fn task(item: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
println!("Processing: {item}");
Ok(())
}
WorkerBuilder::new("rango-tango")
.backend(backend)
.on_event(|_ctx, ev| println!("Event: {ev:?}"))
.build(task)
.run()
.await
.unwrap();
}This pattern is useful in tests or batch jobs where the full source is known upfront and persistence is not required.
The Pipe Type
Pipe<S, Into, Args, Ctx> holds both the source stream and the destination backend. It implements Backend by:
- On
poll— forwarding all items from the source stream into the backend viaTaskSink, encoding each item with the backend's configuredCodec - Simultaneously polling the backend's own task stream and forwarding tasks to the worker
- Delegating
heartbeatandmiddlewaredirectly to the inner backend
Because Pipe derefs to the inner backend, you can call any backend method — including list_tasks, list_workers, and other Expose methods — directly on the Pipe if the inner backend supports them.
Error handling
Pipe wraps all errors in PipeError:
| Variant | Cause |
|---|---|
PipeError::Inner(e) | Any error from the source stream, codec, or inner backend |
PipeError::EmptyStream | The source stream yielded None unexpectedly |
Choosing a Target Backend
pipe_to accepts any backend that implements TaskSink<Args>. The choice of backend determines what properties the pipe gains:
| Backend | Persistence | Retries | Observability |
|---|---|---|---|
dequeue::backend (in-memory) | ✗ | ✗ | ✗ |
SqliteStorage | ✓ | ✓ | ✓ |
PostgresStorage | ✓ | ✓ | ✓ |
RedisStorage | ✓ | ✓ | ✓ |
For production cron jobs or any stream where missed ticks matter, always pipe into a persistent backend.
Summary
PipeExt bridges the gap between ephemeral stream sources and durable job queues:
any Stream<Item = Result<Args, E>>
│
└── .pipe_to(backend)
│
├── writes items to backend storage (TaskSink)
├── worker reads from backend (Backend::poll)
└── inherits all backend capabilities: retries, metrics, introspection
A single .pipe_to() call turns a fire-and-forget stream into a fully observable, retriable, persistent task queue — with no changes to your handler function.