DAG Workflows
On this page
A DAG workflow (Directed Acyclic Graph) is a step beyond sequential pipelines. Rather than running every step one after another, a DAG lets you declare which steps depend on which — independent steps run in parallel, and dependent steps wait only for their specific inputs to be ready. This makes DAGs the right model for workflows where some work can be parallelised.
DagFlow is provided by the apalis_workflow crate. Like Workflow, it runs inside a standard [WorkerBuilder] — so heartbeating, tracing, middleware, and event hooks all work unchanged.
Sequential vs. DAG Workflows
A sequential workflow is a chain — each step waits for the entire previous step to finish:
input → extract → transform → load → output
A DAG workflow is a graph — independent branches execute in parallel and merge at a collector node:
┌─▶ get_name ─┐
input ────────┼─▶ get_age ─┼──▶ collector ──▶ output
└─▶ get_address ─┘
Use a DAG when branches of your workflow have no data dependency on each other and can safely overlap.
A Basic DAG Workflow
use apalis::prelude::*;
use apalis_file_storage::JsonStorage;
use apalis_workflow::{DagFlow, WorkflowSink};
async fn get_name(user_id: u32) -> Result<String, BoxDynError> {
Ok(user_id.to_string())
}
async fn get_age(user_id: u32) -> Result<usize, BoxDynError> {
Ok(user_id as usize + 20)
}
async fn get_address(user_id: u32) -> Result<usize, BoxDynError> {
Ok(user_id as usize + 100)
}
async fn collector(
(name, age, address): (String, usize, usize),
wrk: WorkerContext,
) -> Result<usize, BoxDynError> {
let result = name.parse::<usize>()? + age + address;
wrk.stop().unwrap();
Ok(result)
}
#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
let mut backend = JsonStorage::new_temp().unwrap();
// Push multiple starting inputs — each will run through the full DAG.
backend.push_start(vec![42, 43, 44]).await.unwrap();
// Declare the graph.
let dag_flow = DagFlow::new("user-etl-workflow");
let get_name = dag_flow.node(get_name);
let get_age = dag_flow.node(get_age);
let get_address = dag_flow.node(get_address);
dag_flow
.node(collector)
.depends_on((&get_name, &get_age, &get_address));
// Validate that the graph is acyclic and all dependencies are satisfiable.
dag_flow.validate()?;
// Print the graph in DOT format for debugging or visualisation.
info!("Executing workflow:\n{}", dag_flow);
WorkerBuilder::new("tasty-banana")
.backend(backend)
.enable_tracing()
.on_event(|_c, e| info!("{e}"))
.build(dag_flow)
.run()
.await?;
Ok(())
}Concepts
Nodes
A node is a single async function registered with the DAG. Every node receives the same starting input that was pushed via push_start — unless it declares dependencies, in which case it receives the outputs of those dependencies instead.
let dag_flow = DagFlow::new("user-etl-workflow");
let get_name = dag_flow.node(get_name); // receives: u32
let get_age = dag_flow.node(get_age); // receives: u32
let get_address = dag_flow.node(get_address); // receives: u32Nodes without dependencies are root nodes — they run first, in parallel, each receiving the original input.
Dependencies
A node declares its inputs by calling .depends_on(...) with a tuple of node references. The node will not execute until all listed dependencies have completed successfully.
dag_flow
.node(collector)
.depends_on((&get_name, &get_age, &get_address));The order and types of the tuple matter — they must match the function signature of the dependent node exactly. Here collector receives (String, usize, usize), so the dependencies must be declared in the same order: get_name (returns String), get_age (returns usize), get_address (returns usize).
This is enforced at compile time — a mismatch in types or arity is a compiler error.
Execution Order
DagFlow determines execution order from the dependency graph, not from the order nodes are declared. Given the example above:
get_name,get_age, andget_addressall start concurrently — they share the sameu32input and have no dependencies on each other.- Once all three complete,
collectorruns with their combined outputs as a tuple.
If any node returns an Err, dependent nodes are not run and the workflow task fails.
Validation
Before running a DagFlow, call validate() to catch structural problems early:
dag_flow.validate()?;Validation checks that:
- The graph is acyclic — no node transitively depends on itself
- Every declared dependency refers to a registered node
- There is at least one node with no unsatisfied dependencies (a valid entry point)
validate() returns a Result — call it before building the worker, ideally at startup, so configuration errors surface immediately rather than at runtime.
Inspecting the Graph
DagFlow implements Display and outputs the graph in DOT format, which can be rendered with Graphviz or pasted into an online viewer:
info!("Executing workflow:\n{}", dag_flow);Example output:
digraph user-etl-workflow {
get_name -> collector;
get_age -> collector;
get_address -> collector;
}This is especially useful during development to verify that complex dependency chains were declared as intended.
Pushing Starting Inputs
Use push_start from WorkflowSink to enqueue one or more initial values. Each value triggers a full independent run of the DAG:
// Push a single input — runs the DAG once.
backend.push_start(42).await.unwrap();
// Push multiple inputs — each runs the DAG independently.
backend.push_start(vec![42, 43, 44]).await.unwrap();All root nodes receive the same starting value for each run. Runs are independent — they do not share state.
When to Use a DAG vs. a Sequential Workflow
| Situation | Recommended approach |
|---|---|
| Steps must run in strict order, each depending on the last | Workflow with and_then |
| Some steps are independent and can run in parallel | DagFlow |
| You need fan-out followed by a merge/collect step | DagFlow with a collector node |
| The graph structure changes at runtime | DagFlow — build the graph dynamically before validate() |
| Simple linear ETL pipeline | Workflow — less overhead |
Summary
DagFlow brings structured parallelism to Apalis workflows. You declare nodes, wire up dependencies, and let the runtime figure out what can run concurrently:
DagFlow::new(name)
│
├── .node(fn) → register an async function as a node
├── .node(fn)
│ .depends_on((a, b)) → declare typed inputs from other nodes
│
├── .validate() → assert the graph is well-formed
└── .to_string() → inspect the graph in DOT format
The result plugs directly into a [WorkerBuilder] — no special runner, no separate scheduler, no additional infrastructure.