Building an AI Travel Planner with DAG Workflows

Mar 25th, 2026

Background jobs are often thought of as simple fire-and-forget tasks — send an email, resize an image, sync a record. But some workloads are fundamentally parallel pipelines: several independent pieces of work that must all complete before a final step can run. This is exactly what DAG workflows were built for.

In this tutorial we will build an AI-powered travel planner that takes an origin and destination as input, fans out across four independent AI agents running in parallel — itinerary planning, visa and vaccine requirements, hotel recommendations, and flight search — and then merges all four results into a single markdown travel report. Every agent runs concurrently. None of them wait for the others. The merge step runs only when all four are done.

The result is a real-world demonstration of DagFlow that goes well beyond toy examples.


What We Are Building

                  ┌─▶ build_itinerary   ─┐
                  │                       │
push_start(trip) ─┼─▶ find_requirements  ─┼──▶ generate_report
                  │                       │
                  ├─▶ find_hotels        ─┤
                  │                       │
                  └─▶ find_flights       ─┘

The four parallel nodes each receive the same TripRequest — origin, destination, and travel dates. The generate_report node receives all four results as a tuple and produces a final markdown document.


Project Setup

Add the following to your Cargo.toml:

[dependencies]
apalis = { version = "1.0.0" }
apalis-workflow = { version = "1.0.0" }
apalis-file-storage = { version = "1.0.0" }
rig-core = "0.1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
tracing = "0.1"
tracing-subscriber = "0.3"

Set your OpenAI key in the environment:

export OPENAI_API_KEY=sk-...

The Input Type

Every node in the DAG receives the same starting value — a TripRequest:

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TripRequest {
    pub origin: String,
    pub destination: String,
    pub departure_date: String,
    pub return_date: String,
}

The Four Parallel Nodes

Each node is a plain async function. They all share the same signature shape — receive a TripRequest, call an AI agent, return a String. Because they share no state and depend on no other node, DagFlow will run all four simultaneously.

Node 1 — Build an Itinerary

use apalis::prelude::BoxDynError;
use rig::providers::openai;
use rig::completion::Prompt;

pub async fn build_itinerary(trip: TripRequest) -> Result<String, BoxDynError> {
    let client = openai::Client::from_env();
    let agent = client
        .agent("gpt-4")
        .preamble("You are a travel expert. Given an origin, destination, and dates, \
                   suggest a day-by-day itinerary with activities, landmarks, \
                   and local experiences. Be specific and practical.")
        .build();

    let prompt = format!(
        "Create a 7-day itinerary for a trip from {} to {}, \
         departing {} and returning {}.",
        trip.origin, trip.destination, trip.departure_date, trip.return_date
    );

    Ok(agent.prompt(&prompt).await?)
}

Node 2 — Find Entry Requirements

pub async fn find_requirements(trip: TripRequest) -> Result<String, BoxDynError> {
    let client = openai::Client::from_env();
    let agent = client
        .agent("gpt-4")
        .preamble("You are a travel documentation specialist. Provide accurate, \
                   current information about visa requirements, recommended and \
                   mandatory vaccinations, travel advisories, and any health \
                   documentation needed for entry.")
        .build();

    let prompt = format!(
        "What are the visa requirements, vaccination recommendations, \
         and entry documentation needed for a traveller from {} visiting {}?",
        trip.origin, trip.destination
    );

    Ok(agent.prompt(&prompt).await?)
}

Node 3 — Find Hotel Options

pub async fn find_hotels(trip: TripRequest) -> Result<String, BoxDynError> {
    let client = openai::Client::from_env();
    let agent = client
        .agent("gpt-4")
        .preamble("You are a hotel and accommodation expert. Recommend a range of \
                   accommodation options from budget to luxury, including neighbourhoods \
                   to stay in, what to look for, and approximate nightly rates.")
        .build();

    let prompt = format!(
        "Recommend accommodation options in {} for a visitor from {}. \
         Include budget, mid-range, and luxury options with approximate prices \
         and the best neighbourhoods to stay in.",
        trip.destination, trip.origin
    );

    Ok(agent.prompt(&prompt).await?)
}

Node 4 — Find Flight Options

pub async fn find_flights(trip: TripRequest) -> Result<String, BoxDynError> {
    let client = openai::Client::from_env();
    let agent = client
        .agent("gpt-4")
        .preamble("You are a flight search assistant. Provide practical guidance \
                   on finding the best flights, including typical airlines on the route, \
                   approximate prices, layover options, and booking advice.")
        .build();

    let prompt = format!(
        "What are the best flight options from {} to {} departing around {}? \
         Include typical airlines, approximate costs, direct vs connecting options, \
         and the best time to book.",
        trip.origin, trip.destination, trip.departure_date
    );

    Ok(agent.prompt(&prompt).await?)
}

The Collector — Generating the Final Report

The generate_report node depends on all four parallel nodes. It receives their outputs as a tuple in the same order they were declared in depends_on. It then passes everything to a final AI agent that weaves it into a cohesive markdown travel document.

pub async fn generate_report(
    (itinerary, requirements, hotels, flights): (String, String, String, String),
    wrk: WorkerContext,
) -> Result<String, BoxDynError> {
    let client = openai::Client::from_env();
    let agent = client
        .agent("gpt-4")
        .preamble("You are a professional travel writer. Given structured research \
                   on a destination, produce a beautifully formatted markdown travel \
                   report that a traveller can save and reference throughout their trip. \
                   Include all sections, use clear headings, and write in an engaging tone.")
        .build();

    let prompt = format!(
        "Produce a complete markdown travel report using the following research:\n\n\
         ## Itinerary\n{itinerary}\n\n\
         ## Entry Requirements\n{requirements}\n\n\
         ## Accommodation\n{hotels}\n\n\
         ## Flights\n{flights}"
    );

    let report = agent.prompt(&prompt).await?;

    // Write the report to disk so the caller can retrieve it.
    tokio::fs::write("travel_report.md", &report).await?;
    tracing::info!("Travel report written to travel_report.md");

    wrk.stop().unwrap();
    Ok(report)
}

Wiring the DAG

Now we connect everything. This is the part where the value of DagFlow becomes clear — declaring what depends on what, validating the graph, and handing it to a standard WorkerBuilder:

use apalis::prelude::*;
use apalis_file_storage::JsonStorage;
use apalis_workflow::{DagFlow, WorkflowSink};
use tracing::info;

#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
    tracing_subscriber::fmt::init();

    // 1. Set up the backend.
    let mut backend = JsonStorage::new_temp().unwrap();

    // 2. Push one or more trip requests as starting inputs.
    //    Each request will run through the full DAG independently.
    backend
        .push_start(vec![
            TripRequest {
                origin: "Nairobi".into(),
                destination: "Tokyo".into(),
                departure_date: "2026-07-01".into(),
                return_date: "2026-07-14".into(),
            }
        ])
        .await?;

    // 3. Declare the DAG.
    let dag = DagFlow::new("travel-planner");

    let itinerary    = dag.node(build_itinerary);
    let requirements = dag.node(find_requirements);
    let hotels       = dag.node(find_hotels);
    let flights      = dag.node(find_flights);

    // The collector depends on all four — order here must match
    // the tuple in generate_report's signature.
    dag.node(generate_report)
        .depends_on((&itinerary, &requirements, &hotels, &flights));

    // 4. Validate before running — catches cycles and missing dependencies.
    dag.validate()?;

    // 5. Print the graph structure for debugging.
    //    Paste the output at https://dreampuf.github.io/GraphvizOnline/
    info!("DAG structure:\n{dag}");

    // 6. Run — the worker drives the DAG like any other backend.
    WorkerBuilder::new("travel-planner-worker")
        .backend(backend)
        .enable_tracing()
        .on_event(|_ctx, event| info!("{event}"))
        .build(dag)
        .run()
        .await?;

    Ok(())
}

The DOT output from info!("{dag}") will look like this — paste it into GraphvizOnline to visualise the graph:

digraph travel-planner {
    0 [ label="build_itinerary"]
    1 [ label="find_requirements"]
    2 [ label="find_hotels"]
    3 [ label="find_flights"]
    4 [ label="generate_report"]
    0 -> 4 [ ]
    1 -> 4 [ ]
    2 -> 4 [ ]
    3 -> 4 [ ]
}

What Happens at Runtime

When the worker picks up a TripRequest, execution proceeds in two phases:

Phase 1 — parallel fan-out. All four root nodes start concurrently. Each receives the same TripRequest. There is no coordination between them — they are fully independent AI calls happening at the same time. For two trip requests pushed via push_start, that is eight concurrent AI calls across the two DAG runs.

Phase 2 — merge. Once all four nodes for a given input have returned Ok, generate_report runs with their outputs collected into a tuple. If any node returns Err, the entire DAG run for that input fails — generate_report does not run and the task is marked as errored, ready for retry.

The final report lands in travel_report.md in the working directory.


Why DAG Workflows Shine Here

It would be possible to write this as a sequential workflow with four and_then calls. But that would mean each AI call waiting for the previous one to finish — if each takes 5 seconds, the total wall time for one trip is 20 seconds. With the DAG, all four run simultaneously and the total wall time collapses to roughly the slowest individual call — around 5 seconds.

More importantly, the parallelism is declared, not hand-rolled. You do not manage tasks, channels, or join handles. You declare that generate_report depends on four nodes, and the runtime figures out that those four can run concurrently. The compile-time type check on the tuple ensures you cannot accidentally swap the order of hotels and flights without the compiler telling you.


Taking It Further

A few natural extensions from here:

  • Persist results — swap JsonStorage::new_temp() for SqliteStorage or PostgresStorage and every trip plan is stored, queryable, and visible in the Apalis dashboard
  • Retry failed AI calls — add .retry(RetryPolicy::retries(3)) to the WorkerBuilder and transient OpenAI errors are automatically retried
  • Add observability — layer on PrometheusLayer or OpenTelemetryMetricsLayer to track how long each AI call takes per destination
  • Fan out further — add a find_weather node or a find_local_events node; because they have no dependencies, they join the parallel phase for free
  • Process a list of destinationspush_start accepts a Vec, so you can queue up an entire travel wishlist and process every destination concurrently across multiple worker instances

The complete source code for this example is available in the apalis examples repository.


To learn more about Rig, checkout this tutorial on their website Build a Flight Search AI Agent with Rig


Have a question or want to share what you built? Open a discussion on GitHub or join the community.


Related Posts

Building an AI Travel Planner with DAG Workflows – Apalis Blog