Sequential Workflows
On this page
A stepped workflow in Apalis is a sequential pipeline of async steps executed as a single unit of work. Each step receives the output of the previous one, transforming data as it flows through the chain. This is the right tool when a job involves multiple discrete stages — extraction, validation, enrichment, side effects — that must run in order.
Workflows are built with the [Workflow] type from apalis_workflow and run inside a standard [WorkerBuilder] — so all the usual machinery (heartbeating, middleware, event hooks) applies without any special handling.
A Basic Workflow
use apalis::prelude::*;
use apalis_workflow::*;
use apalis_file_storage::JsonStorage;
#[tokio::main]
async fn main() {
let workflow = Workflow::new("odd-numbers-workflow")
.and_then(|a: usize| async move {
Ok::<_, BoxDynError>((0..a).collect::<Vec<_>>())
})
.filter_map(|x| async move {
if x % 2 != 0 { Some(x) } else { None }
})
.and_then(|a: Vec<usize>| async move {
println!("Sum: {}", a.iter().sum::<usize>());
Ok::<_, BoxDynError>(())
});
let mut storage = JsonStorage::new_temp().unwrap();
// Push the initial input value that starts the workflow.
storage.push_start(10).await.unwrap();
let worker = WorkerBuilder::new("rango-tango")
.backend(storage)
.on_event(|ctx, ev| println!("Event: {:?}", ev))
.build(workflow);
worker.run().await.unwrap();
}This workflow:
- Receives a
usizeas its starting input - Expands it into a
Vec<usize>of all numbers in0..a - Keeps only the odd numbers with
filter_map - Sums and prints them
The type of data flowing through the chain is tracked at compile time — passing the wrong output type from one step to the next is a compile error, not a runtime surprise.
How Workflows Execute
When a worker picks up a workflow task, it drives each step in sequence:
push_start(input)
│
▼
[ and_then ] → transforms input into next value
│
▼
[ filter_map ] → keeps, drops, or maps each element
│
▼
[ and_then ] → final side effect, produces ()
Each step is a function registered at build time. The worker simply polls backend for tasks and hands the current stage's output to the next registered function.
Combinators
and_then
The core building block. Chains an async function onto the workflow; the function receives the current output and returns the next value wrapped in Result.
workflow
.and_then(extract)
.and_then(transform)
.and_then(load);This maps directly to the classic ETL pattern: extract data from a source, transform it, then load it into a destination. Any step that returns Err halts the workflow and reports a failure — subsequent steps are not run.
Use
and_thenfor any step that can fail and whose result is needed downstream.
filter_map
Applies an async function to each element of a collection, keeping elements for which the function returns Some and discarding those that return None. The input must be iterable — typically a Vec.
workflow
.and_then(|n: usize| async move {
Ok::<_, BoxDynError>((0..n).collect::<Vec<_>>())
})
.filter_map(|x| async move {
if x % 2 != 0 { Some(x) } else { None }
});filter_map is infallible by design — the function returns Option, not Result. It does not short-circuit on failure; it simply excludes elements.
Use
filter_mapto narrow a collection before handing it to the next step, without needing to handle errors per element.
fold
Reduces a collection to a single accumulated value using an async function. Requires an initial value and calls the accumulator function once per element.
pub fn fold<F, Output, FnArgs, Init>(self, fold: F)workflow
.and_then(fetch_records)
.fold(|acc: u64, record: Record| async move {
Ok::<_, BoxDynError>(acc + record.amount)
});fold is the right combinator when you need to aggregate a collection into a summary value — a total, a merged document, a running count — before passing it downstream.
Use
foldto collapse aVecinto a single value for the next step.
repeat_until
Calls an async function repeatedly on the current input until it returns a value that signals completion. This is useful for polling loops, retry-with-backoff patterns, or paginated data fetching where the number of iterations is not known upfront.
workflow
.repeat_until(|order: PgTask<Order>| async move {
});The function returns a control value indicating whether to continue iterating or break with a final output. This keeps looping logic explicit and contained within a single step.
Use
repeat_untilfor polling, pagination, or any step where the exit condition depends on runtime state.
delay_for
Pauses the workflow for a fixed Duration before proceeding to the next step.
pub fn delay_for(self, delay: Duration)use std::time::Duration;
workflow
.and_then(send_notification)
.delay_for(Duration::from_secs(60 * 60)) // wait 1 hour
.and_then(send_followup);delay_for is appropriate when the wait time is constant and known at build time — for example, always waiting 24 hours between a signup email and a follow-up.
Use
delay_forwhen the pause duration is the same for every task.
delay_with
Pauses the workflow for a duration determined by a function, allowing the delay to vary per task based on its input or context.
workflow
.and_then(process_order)
.delay_with(|order: PgTask<Order>| {
if order.is_priority {
Duration::from_secs(5)
} else {
Duration::from_secs(300)
}
})
.and_then(confirm_dispatch);The delay function receives the current task value and returns a Duration. This is useful when priority, tier, or payload content should influence scheduling.
Use
delay_withwhen the pause duration depends on runtime data — preferdelay_forwhen it is always the same.
Combinator Summary
| Combinator | Input | Purpose |
|---|---|---|
and_then | Any T | Transform and propagate, short-circuits on Err |
filter_map | Vec<T> | Keep elements matching a predicate, discard the rest |
fold | Vec<T> | Reduce a collection to a single accumulated value |
repeat_until | Any T | Loop on a value until a completion condition is met |
delay_for | Any T | Insert a fixed pause before the next step |
delay_with | Any T | Insert a variable pause based on the current value |