Custom Backends
On this page
Overview
CustomBackend is a flexible backend implementation that allows you to use any persistence layer with apalis. Unlike pre-built backends that work with specific databases, custom backends give you complete control over task storage and retrieval.
You define two core components:
- Fetcher: A function that retrieves pending tasks from your storage system and returns them as a stream
- Sink: A function that persists new tasks to your storage system
This architecture works with PostgreSQL (via Diesel), MongoDB, Redis, SQLite, or any custom storage solution you're already using.
Basic Setup
The BackendBuilder is what you use to construct a custom backend. It requires three essential components:
- Database: Your database connection or pool (the storage engine)
- Fetcher: A function that defines how to retrieve tasks
- Sink: A function that defines how to save tasks
use apalis_core::backend::custom::BackendBuilder;
use diesel::r2d2::{self, ConnectionManager};
type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
let backend = BackendBuilder::new()
.database(db_pool) // The database connection pool
.fetcher(create_fetcher()) // How to fetch tasks
.sink(create_sink()) // How to save tasks
.build()?;Fetcher Implementation
The fetcher is a function that returns a stream of tasks. It's called by the worker to continuously poll for new tasks to process. The fetcher receives three parameters:
- pool: Your database connection pool
- config: Custom configuration (if any)
- worker_ctx: Context about the worker requesting tasks
The fetcher must return a Stream that yields Result<Option<Task>>:
Ok(Some(task)): A task was found and should be processedOk(None): No tasks available right nowErr(e): An error occurred while fetching
use futures_util::{stream, StreamExt};
fn create_fetcher() -> impl Fn(&mut DbPool, &(), &WorkerContext) -> BoxStream<...> {
|pool, _, _| {
stream::unfold(pool.clone(), |p| async move {
// Add a small delay to avoid hammering the database
tokio::time::sleep(Duration::from_millis(100)).await;
match fetch_pending_task(&p) {
Ok(Some(task)) => Some((Ok(Some(task)), p)),
Ok(None) => Some((Ok(None), p)),
Err(e) => Some((Err(e.into()), p)),
}
})
.boxed()
}
}Database Query
This is where you implement the actual database logic to fetch a pending task. The function queries your database for tasks that are ready to be processed, typically filtering by status and ordering by creation time to ensure FIFO processing.
fn fetch_pending_task(pool: &DbPool) -> Result<Option<Task<MyJob>>> {
use schema::tasks::dsl::*;
let mut conn = pool.get()?;
// Query for the oldest pending task
tasks
.filter(status.eq("pending"))
.order(created_at.asc())
.first::<TaskRow>(&mut conn)
.optional()
.map(|row| row.map(|r| r.into_task()))
}Sink Implementation
The sink is a function that returns a Sink for persisting tasks. When you push a new task to the backend (using backend.send(task)), the sink handles saving it to your storage system.
The sink receives two parameters:
- pool: Your database connection pool
- config: Custom configuration (if any)
The sink processes each task and must return a Result indicating success or failure.
use futures_util::{sink, SinkExt};
fn create_sink() -> impl Fn(&mut DbPool, &()) -> impl Sink<Task<MyJob>> {
|pool, _| {
sink::unfold(pool.clone(), |p, task| async move {
save_task(&p, task)?;
Ok::<_, BoxDynError>(p)
})
}
}Database Insert
This function handles the actual persistence of a task to your database. It converts the task into a database row and inserts it into your tasks table.
fn save_task(pool: &DbPool, task: Task<MyJob>) -> Result<()> {
use schema::tasks;
let mut conn = pool.get()?;
// Convert task to database row and insert
diesel::insert_into(tasks::table)
.values(&TaskRow::from_task(task))
.execute(&mut conn)?;
Ok(())
}Using with Worker
Once you've built your custom backend, you use it with a worker just like any other backend. The worker will automatically call your fetcher to get tasks and process them with your handler function.
#[derive(Serialize, Deserialize)]
struct EmailJob {
to: String,
subject: String,
}
// The handler function that processes each task
async fn send_email(job: EmailJob, _ctx: WorkerContext) -> Result<()> {
println!("Sending to: {}", job.to);
// Your email sending logic here
Ok(())
}
// Connect the backend to a worker
let worker = WorkerBuilder::new("email-worker")
.backend(backend)
.build(send_email);
worker.run().await?;Custom Configuration
You can pass custom configuration to control how your backend behaves. This is useful for settings like batch sizes, polling intervals, or connection parameters. The configuration is passed to both your fetcher and sink functions.
#[derive(Clone)]
struct Config {
batch_size: usize, // How many tasks to fetch at once
poll_interval: Duration, // How often to check for new tasks
}
let backend = BackendBuilder::new_with_cfg(Config {
batch_size: 10,
poll_interval: Duration::from_secs(1),
})
.database(db_pool)
.fetcher(|pool, config, _| {
// Use config.batch_size and config.poll_interval
// to customize fetching behavior
})
.sink(|pool, config| {
// Access config here if needed
})
.build()?;Features
| Feature | Status |
|---|---|
| Task Sink | ✅ |
| Serialization | ✅ |
| Fetch by ID | ❌ |
| Workflows | ❌ |
Best Practices
Connection Pooling
Always use a connection pool rather than creating new connections for each task. This improves performance and prevents exhausting database connections.
Polling Strategy
Add small delays in your fetcher (e.g., 100-500ms) to avoid constantly hammering your database. This reduces load while still maintaining responsiveness.
Error Handling
Properly handle and propagate errors in both your fetcher and sink. Return errors rather than panicking so the worker can handle them gracefully.
Idempotency
Design your task processing to be idempotent - if a task is processed twice, it should produce the same result. This is important for reliability when tasks might be retried.
Task Status Updates
Consider updating task status in your database (e.g., from "pending" to "processing" to "completed") to track task lifecycle and prevent duplicate processing.