Pushing Tasks
On this page
Apalis provides a powerful and composable way to enqueue background jobs by building on top of futures::Sink. This abstraction allows any compatible backend — Redis, SQLite, PostgreSQL, and more — to act as a destination for tasks, making job submission consistent, async-friendly, and stream-aware.
At the core of this system is the TaskSink trait.
The TaskSink Trait
TaskSink<Args> extends any [Backend] with the ability to accept tasks in multiple forms:
| Method | Description |
|---|---|
push | Encode and enqueue a single task value |
push_bulk | Enqueue a Vec of tasks efficiently |
push_stream | Drain an async stream of tasks into the backend |
push_task | Enqueue a fully constructed Task<Args, _, _> |
push_all | Drain a stream of pre-built Task objects |
This gives you the flexibility to match your submission strategy to your workload — from one-off jobs to high-throughput pipelines.
Understanding Sink and SinkExt
Apalis builds on the futures ecosystem's Sink abstraction — the dual of a Stream:
A Sink represents something you can push data into — in this case, a job queue backend. SinkExt layers ergonomic helpers on top:
send(item)— submit a single itemsend_all(stream)— drain a stream into the sinksink_map_err(...)— transform sink errors
TaskSink uses these internally so every backend gets consistent, efficient push behaviour for free.
Pushing a Single Task
For one-off jobs, use push. The task value is encoded via the backend's configured [Codec] and wrapped into a [Task] before being sent.
use apalis::prelude::*;
async fn enqueue_task<S>(mut backend: S)
where
S: TaskSink<String>,
{
backend
.push("send_email:user@example.com".to_string())
.await
.unwrap();
}Internally, push calls SinkExt::send after encoding the value — so backpressure is respected automatically.
Bulk Task Submission
When you have a collection of tasks ready up front, push_bulk is more efficient than calling push in a loop — it encodes and submits all items in a single batched send_all call, minimising round-trips to the backend.
async fn enqueue_bulk<S>(mut backend: S)
where
S: TaskSink<String>,
{
let jobs = vec![
"job1".to_string(),
"job2".to_string(),
"job3".to_string(),
];
backend.push_bulk(jobs).await.unwrap();
}When to use this: prefer
push_bulkover repeatedpushcalls for small-to-medium batches where all items are already in memory.
Streaming Tasks
For large or unbounded datasets, push_stream lets you feed tasks directly from an async Stream. This is the most memory-efficient option because tasks are encoded and forwarded lazily — the backend's backpressure naturally throttles the producer.
use futures_util::stream;
async fn enqueue_stream<S>(mut backend: S)
where
S: TaskSink<u32>,
{
let stream = stream::iter(0..1_000_000);
backend.push_stream(stream).await.unwrap();
}This is ideal when:
- Tasks are generated dynamically or computed on the fly
- You are processing large datasets that shouldn't be held in memory at once
- You want automatic backpressure between the producer and the backend
Internally, push_stream uses SinkExt::send_all with sink_map_err to unify error types.
Working with Pre-built Tasks
Sometimes you need control over the [Task] envelope itself — for example, to set a custom ID, attach metadata, or carry context. The push_task and push_all methods accept fully constructed Task<Args, Context, IdType> values.
Pushing a Pre-built Task
use apalis_core::task::Task;
async fn push_custom_task<S>(mut backend: S)
where
S: TaskSink<String>,
{
let task = Task::new("important_job".to_string());
backend.push_task(task).await.unwrap();
}Pushing a Stream of Pre-built Tasks
use futures_util::stream;
use apalis_core::task::Task;
async fn push_many_tasks<S>(mut backend: S)
where
S: TaskSink<String>,
{
let tasks = stream::iter(vec![
Task::new("task_a".to_string()),
Task::new("task_b".to_string()),
]);
backend.push_all(tasks).await.unwrap();
}Choosing the Right Method
| Method | Input | Best for |
|---|---|---|
push | Single value | One-off task submission |
push_bulk | Vec<Args> | Small–medium in-memory batches |
push_stream | Stream<Item = Args> | Large or dynamically generated datasets |
push_task | Task<Args, _, _> | Full control over task envelope |
push_all | Stream<Item = Task<...>> | Streaming pre-built task objects |
How Encoding Works
Before any task reaches the backend, it is encoded using the backend's associated [Codec]:
let encoded = C::encode(&task)?;This separates your domain types from their storage representation, enabling:
- Compact wire formats (e.g. MessagePack, bincode)
- Backend-specific serialization (e.g. JSON for PostgreSQL, binary for Redis)
- Type-safe decoding on the worker side
If encoding fails, a [TaskSinkError::CodecError] is returned. See Error Handling below.
Error Handling
All TaskSink methods return:
Result<(), TaskSinkError<S::Error>>[TaskSinkError] has two variants:
PushError(E)— the backend sink rejected the item (e.g. connection failure)CodecError(BoxDynError)— serialization of the task value failed
match backend.push("job".to_string()).await {
Ok(()) => println!("Task enqueued"),
Err(TaskSinkError::PushError(e)) => eprintln!("Backend error: {:?}", e),
Err(TaskSinkError::CodecError(e)) => eprintln!("Encoding failed: {:?}", e),
}Summary
TaskSink gives you a single, consistent interface for enqueuing jobs regardless of which backend you use. By composing:
futures::Sinkfor async, backpressure-aware ingestionSinkExtfor ergonomic send utilitiesTaskSinkfor domain-specific, type-safe task submission
you can build scalable task producers that slot naturally into the async Rust ecosystem — whether you're pushing a single email job or streaming millions of records into a queue.