Sequential Workflows

Last edited 4 minutes ago.

On this page

A stepped workflow in Apalis is a sequential pipeline of async steps executed as a single unit of work. Each step receives the output of the previous one, transforming data as it flows through the chain. This is the right tool when a job involves multiple discrete stages — extraction, validation, enrichment, side effects — that must run in order.

Workflows are built with the [Workflow] type from apalis_workflow and run inside a standard [WorkerBuilder] — so all the usual machinery (heartbeating, middleware, event hooks) applies without any special handling.


A Basic Workflow

use apalis::prelude::*;
use apalis_workflow::*;
use apalis_file_storage::JsonStorage;

#[tokio::main]
async fn main() {
    let workflow = Workflow::new("odd-numbers-workflow")
        .and_then(|a: usize| async move {
            Ok::<_, BoxDynError>((0..a).collect::<Vec<_>>())
        })
        .filter_map(|x| async move {
            if x % 2 != 0 { Some(x) } else { None }
        })
        .and_then(|a: Vec<usize>| async move {
            println!("Sum: {}", a.iter().sum::<usize>());
            Ok::<_, BoxDynError>(())
        });

    let mut storage = JsonStorage::new_temp().unwrap();

    // Push the initial input value that starts the workflow.
    storage.push_start(10).await.unwrap();

    let worker = WorkerBuilder::new("rango-tango")
        .backend(storage)
        .on_event(|ctx, ev| println!("Event: {:?}", ev))
        .build(workflow);

    worker.run().await.unwrap();
}

This workflow:

  1. Receives a usize as its starting input
  2. Expands it into a Vec<usize> of all numbers in 0..a
  3. Keeps only the odd numbers with filter_map
  4. Sums and prints them

The type of data flowing through the chain is tracked at compile time — passing the wrong output type from one step to the next is a compile error, not a runtime surprise.


How Workflows Execute

When a worker picks up a workflow task, it drives each step in sequence:

push_start(input)
      │
      ▼
 [ and_then ]  →  transforms input into next value
      │
      ▼
 [ filter_map ] →  keeps, drops, or maps each element
      │
      ▼
 [ and_then ]  →  final side effect, produces ()

Each step is a function registered at build time. The worker simply polls backend for tasks and hands the current stage's output to the next registered function.


Combinators

and_then

The core building block. Chains an async function onto the workflow; the function receives the current output and returns the next value wrapped in Result.

workflow
    .and_then(extract)
    .and_then(transform)
    .and_then(load);

This maps directly to the classic ETL pattern: extract data from a source, transform it, then load it into a destination. Any step that returns Err halts the workflow and reports a failure — subsequent steps are not run.

Use and_then for any step that can fail and whose result is needed downstream.


filter_map

Applies an async function to each element of a collection, keeping elements for which the function returns Some and discarding those that return None. The input must be iterable — typically a Vec.

workflow
    .and_then(|n: usize| async move {
        Ok::<_, BoxDynError>((0..n).collect::<Vec<_>>())
    })
    .filter_map(|x| async move {
        if x % 2 != 0 { Some(x) } else { None }
    });

filter_map is infallible by design — the function returns Option, not Result. It does not short-circuit on failure; it simply excludes elements.

Use filter_map to narrow a collection before handing it to the next step, without needing to handle errors per element.


fold

Reduces a collection to a single accumulated value using an async function. Requires an initial value and calls the accumulator function once per element.

pub fn fold<F, Output, FnArgs, Init>(self, fold: F)
workflow
    .and_then(fetch_records)
    .fold(|acc: u64, record: Record| async move {
        Ok::<_, BoxDynError>(acc + record.amount)
    });

fold is the right combinator when you need to aggregate a collection into a summary value — a total, a merged document, a running count — before passing it downstream.

Use fold to collapse a Vec into a single value for the next step.


repeat_until

Calls an async function repeatedly on the current input until it returns a value that signals completion. This is useful for polling loops, retry-with-backoff patterns, or paginated data fetching where the number of iterations is not known upfront.

workflow
    .repeat_until(|order: PgTask<Order>| async move {
        
    });

The function returns a control value indicating whether to continue iterating or break with a final output. This keeps looping logic explicit and contained within a single step.

Use repeat_until for polling, pagination, or any step where the exit condition depends on runtime state.


delay_for

Pauses the workflow for a fixed Duration before proceeding to the next step.

pub fn delay_for(self, delay: Duration)
use std::time::Duration;

workflow
    .and_then(send_notification)
    .delay_for(Duration::from_secs(60 * 60)) // wait 1 hour
    .and_then(send_followup);

delay_for is appropriate when the wait time is constant and known at build time — for example, always waiting 24 hours between a signup email and a follow-up.

Use delay_for when the pause duration is the same for every task.


delay_with

Pauses the workflow for a duration determined by a function, allowing the delay to vary per task based on its input or context.

workflow
    .and_then(process_order)
    .delay_with(|order: PgTask<Order>| {
        if order.is_priority {
            Duration::from_secs(5)
        } else {
            Duration::from_secs(300)
        }
    })
    .and_then(confirm_dispatch);

The delay function receives the current task value and returns a Duration. This is useful when priority, tier, or payload content should influence scheduling.

Use delay_with when the pause duration depends on runtime data — prefer delay_for when it is always the same.


Combinator Summary

CombinatorInputPurpose
and_thenAny TTransform and propagate, short-circuits on Err
filter_mapVec<T>Keep elements matching a predicate, discard the rest
foldVec<T>Reduce a collection to a single accumulated value
repeat_untilAny TLoop on a value until a completion condition is met
delay_forAny TInsert a fixed pause before the next step
delay_withAny TInsert a variable pause based on the current value