Pushing Tasks

Last edited 4 minutes ago.

On this page

Apalis provides a powerful and composable way to enqueue background jobs by building on top of futures::Sink. This abstraction allows any compatible backend — Redis, SQLite, PostgreSQL, and more — to act as a destination for tasks, making job submission consistent, async-friendly, and stream-aware.

At the core of this system is the TaskSink trait.


The TaskSink Trait

TaskSink<Args> extends any [Backend] with the ability to accept tasks in multiple forms:

MethodDescription
pushEncode and enqueue a single task value
push_bulkEnqueue a Vec of tasks efficiently
push_streamDrain an async stream of tasks into the backend
push_taskEnqueue a fully constructed Task<Args, _, _>
push_allDrain a stream of pre-built Task objects

This gives you the flexibility to match your submission strategy to your workload — from one-off jobs to high-throughput pipelines.


Understanding Sink and SinkExt

Apalis builds on the futures ecosystem's Sink abstraction — the dual of a Stream:

  • A Stream produces values asynchronously
  • A Sink consumes values asynchronously

A Sink represents something you can push data into — in this case, a job queue backend. SinkExt layers ergonomic helpers on top:

TaskSink uses these internally so every backend gets consistent, efficient push behaviour for free.


Pushing a Single Task

For one-off jobs, use push. The task value is encoded via the backend's configured [Codec] and wrapped into a [Task] before being sent.

use apalis::prelude::*;

async fn enqueue_task<S>(mut backend: S)
where
    S: TaskSink<String>,
{
    backend
        .push("send_email:user@example.com".to_string())
        .await
        .unwrap();
}

Internally, push calls SinkExt::send after encoding the value — so backpressure is respected automatically.


Bulk Task Submission

When you have a collection of tasks ready up front, push_bulk is more efficient than calling push in a loop — it encodes and submits all items in a single batched send_all call, minimising round-trips to the backend.

async fn enqueue_bulk<S>(mut backend: S)
where
    S: TaskSink<String>,
{
    let jobs = vec![
        "job1".to_string(),
        "job2".to_string(),
        "job3".to_string(),
    ];

    backend.push_bulk(jobs).await.unwrap();
}

When to use this: prefer push_bulk over repeated push calls for small-to-medium batches where all items are already in memory.


Streaming Tasks

For large or unbounded datasets, push_stream lets you feed tasks directly from an async Stream. This is the most memory-efficient option because tasks are encoded and forwarded lazily — the backend's backpressure naturally throttles the producer.

use futures_util::stream;

async fn enqueue_stream<S>(mut backend: S)
where
    S: TaskSink<u32>,
{
    let stream = stream::iter(0..1_000_000);

    backend.push_stream(stream).await.unwrap();
}

This is ideal when:

  • Tasks are generated dynamically or computed on the fly
  • You are processing large datasets that shouldn't be held in memory at once
  • You want automatic backpressure between the producer and the backend

Internally, push_stream uses SinkExt::send_all with sink_map_err to unify error types.


Working with Pre-built Tasks

Sometimes you need control over the [Task] envelope itself — for example, to set a custom ID, attach metadata, or carry context. The push_task and push_all methods accept fully constructed Task<Args, Context, IdType> values.

Pushing a Pre-built Task

use apalis_core::task::Task;

async fn push_custom_task<S>(mut backend: S)
where
    S: TaskSink<String>,
{
    let task = Task::new("important_job".to_string());

    backend.push_task(task).await.unwrap();
}

Pushing a Stream of Pre-built Tasks

use futures_util::stream;
use apalis_core::task::Task;

async fn push_many_tasks<S>(mut backend: S)
where
    S: TaskSink<String>,
{
    let tasks = stream::iter(vec![
        Task::new("task_a".to_string()),
        Task::new("task_b".to_string()),
    ]);

    backend.push_all(tasks).await.unwrap();
}

Choosing the Right Method

MethodInputBest for
pushSingle valueOne-off task submission
push_bulkVec<Args>Small–medium in-memory batches
push_streamStream<Item = Args>Large or dynamically generated datasets
push_taskTask<Args, _, _>Full control over task envelope
push_allStream<Item = Task<...>>Streaming pre-built task objects

How Encoding Works

Before any task reaches the backend, it is encoded using the backend's associated [Codec]:

let encoded = C::encode(&task)?;

This separates your domain types from their storage representation, enabling:

  • Compact wire formats (e.g. MessagePack, bincode)
  • Backend-specific serialization (e.g. JSON for PostgreSQL, binary for Redis)
  • Type-safe decoding on the worker side

If encoding fails, a [TaskSinkError::CodecError] is returned. See Error Handling below.


Error Handling

All TaskSink methods return:

Result<(), TaskSinkError<S::Error>>

[TaskSinkError] has two variants:

  • PushError(E) — the backend sink rejected the item (e.g. connection failure)
  • CodecError(BoxDynError) — serialization of the task value failed
match backend.push("job".to_string()).await {
    Ok(()) => println!("Task enqueued"),
    Err(TaskSinkError::PushError(e)) => eprintln!("Backend error: {:?}", e),
    Err(TaskSinkError::CodecError(e)) => eprintln!("Encoding failed: {:?}", e),
}

Summary

TaskSink gives you a single, consistent interface for enqueuing jobs regardless of which backend you use. By composing:

  • futures::Sink for async, backpressure-aware ingestion
  • SinkExt for ergonomic send utilities
  • TaskSink for domain-specific, type-safe task submission

you can build scalable task producers that slot naturally into the async Rust ecosystem — whether you're pushing a single email job or streaming millions of records into a queue.