Piping Streams

Last edited 4 minutes ago.

On this page

Not all task sources are persistent queues. Cron schedules, webhook event streams, change-data-capture feeds, and other time-driven sources produce tasks ephemerally — if your worker is down when a tick fires, that work is simply lost.

PipeExt solves this by letting you route any Stream into a backend before a worker processes it. The stream's items are written to storage first, then consumed by the worker — giving you persistence, retries, and observability over sources that would otherwise be fire-and-forget.


How It Works

Without piping, a cron stream delivers ticks directly to the worker. If the worker is unavailable, the tick is gone:

CronStream ──▶ Worker
               (tick lost if worker is down)

With pipe_to, ticks are written to a backend first. The worker reads from the backend, not the stream:

CronStream ──▶ pipe_to(SqliteStorage) ──▶ Worker
               (tick persisted)            (retryable, observable)

The resulting Pipe type itself implements Backend — so it plugs into WorkerBuilder exactly like any other backend.


The PipeExt Trait

pub trait PipeExt<B, Args, Ctx>: Sized {
    /// Pipe this stream into the provided backend.
    fn pipe_to(self, backend: B) -> Pipe<Self, B, Args, Ctx>;
}

pipe_to is available on any stream whose items are Result<Args, Err> and whose target backend implements both [Backend] and TaskSink<Args>. The returned [Pipe] wraps both the source stream and the destination backend, forwarding items from one to the other when the worker starts polling.


Example: Persisting a Cron Schedule to SQLite

The most common use case is giving a cron schedule durability. Without persistence, a missed tick is unrecoverable. With pipe_to, each tick is written to SQLite before the worker processes it — so missed ticks can be retried and the full job history is queryable.

use apalis::{layers::retry::RetryPolicy, prelude::*};
use apalis_cron::{CronStream, Tick};
use apalis_sqlite::{SqlitePool, SqliteStorage};
use cron::Schedule;
use std::str::FromStr;

async fn handle_tick(tick: Tick, data: Data<usize>) {
    // Process the current scheduled tick.
}

#[tokio::main]
async fn main() {
    // 1. Define the cron schedule.
    let schedule = Schedule::from_str("@daily").unwrap();

    // 2. Set up the SQLite backend.
    let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
    SqliteStorage::setup(&pool)
        .await
        .expect("unable to run migrations for sqlite");
    let sqlite = SqliteStorage::new(&pool);

    // 3. Connect the stream to the backend.
    //    Ticks are written to SQLite before the worker sees them.
    let backend = CronStream::new(schedule).pipe_to(sqlite);

    // 4. Build the worker as normal.
    let worker = WorkerBuilder::new("morning-cereal")
        .backend(backend)
        .retry(RetryPolicy::retries(5))
        .data(42usize)
        .build(handle_tick);

    worker.run().await.unwrap();
}

What this buys you

  • Durability — ticks are persisted before processing; a worker restart does not lose them
  • Retries — a failed tick handler is retried up to 5 times via RetryPolicy
  • Observability — tick history is stored in SQLite and queryable via ListTasks
  • Backpressure — the in-memory dequeue backend naturally throttles the stream if the worker falls behind

Example: Piping an In-Memory Stream

For testing or short-lived workloads, you can pipe any finite stream directly into an in-memory backend:

use futures_util::stream;
use apalis_core::backend::{pipe::PipeExt, dequeue};
use apalis_core::worker::context::WorkerContext;
use apalis_core::error::BoxDynError;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let stream = stream::iter(0..10).map(|s| Ok::<_, std::io::Error>(s));

    let in_memory = dequeue::backend::<u32>(Duration::from_secs(1));
    let backend = stream.pipe_to(in_memory);

    async fn task(item: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
        println!("Processing: {item}");
        Ok(())
    }

    WorkerBuilder::new("rango-tango")
        .backend(backend)
        .on_event(|_ctx, ev| println!("Event: {ev:?}"))
        .build(task)
        .run()
        .await
        .unwrap();
}

This pattern is useful in tests or batch jobs where the full source is known upfront and persistence is not required.


The Pipe Type

Pipe<S, Into, Args, Ctx> holds both the source stream and the destination backend. It implements Backend by:

  1. On poll — forwarding all items from the source stream into the backend via TaskSink, encoding each item with the backend's configured Codec
  2. Simultaneously polling the backend's own task stream and forwarding tasks to the worker
  3. Delegating heartbeat and middleware directly to the inner backend

Because Pipe derefs to the inner backend, you can call any backend method — including list_tasks, list_workers, and other Expose methods — directly on the Pipe if the inner backend supports them.

Error handling

Pipe wraps all errors in PipeError:

VariantCause
PipeError::Inner(e)Any error from the source stream, codec, or inner backend
PipeError::EmptyStreamThe source stream yielded None unexpectedly

Choosing a Target Backend

pipe_to accepts any backend that implements TaskSink<Args>. The choice of backend determines what properties the pipe gains:

BackendPersistenceRetriesObservability
dequeue::backend (in-memory)
SqliteStorage
PostgresStorage
RedisStorage

For production cron jobs or any stream where missed ticks matter, always pipe into a persistent backend.


Summary

PipeExt bridges the gap between ephemeral stream sources and durable job queues:

any Stream<Item = Result<Args, E>>
  │
  └── .pipe_to(backend)
        │
        ├── writes items to backend storage (TaskSink)
        ├── worker reads from backend (Backend::poll)
        └── inherits all backend capabilities: retries, metrics, introspection

A single .pipe_to() call turns a fire-and-forget stream into a fully observable, retriable, persistent task queue — with no changes to your handler function.