Error Handling

Last edited 4 minutes ago.

On this page

Overview

In apalis, task handlers are responsible for processing individual tasks, and each handler function can implement its own error handling. By using Rust’s Result type, task handlers can indicate whether a task was successful or if an error occurred. If the handler returns an error, apalis can automatically manage retries or mark the task as failed, depending on the configuration.

Here’s an example of implementing error handling in a task handler:

use apalis::prelude::*;
use anyhow::{Context, Error};
use serde::{Deserialize, Serialize};

#[derive(Task, Debug, Serialize, Deserialize)]
struct SendEmail {
    recipient: String,
    subject: String,
    body: String,
}

async fn handle_send_email(task: SendEmail) -> Result<(), Error> {
    println!("Attempting to send email to: {}", task.recipient);
    
    // Simulate an email-sending operation that could fail
    send_email_to_service(&task)
        .await
        .context("Failed to send email")?;

    println!("Email successfully sent to: {}", task.recipient);
    Ok(())
}

// This function simulates the email-sending logic
async fn send_email_to_service(_task: &SendEmail) -> Result<(), std::io::Error> {
    // Simulated email sending logic (Replace with actual email-sending code)
    Ok(())
}

In this example, handle_send_email catches potential errors from the send_email_to_service function. By returning a Result, apalis can determine whether the task succeeded or failed and handle it accordingly.

Retries

apalis provides the RetryLayer to automatically retry failed tasks based on predefined strategies. This is useful when dealing with temporary failures, such as network issues.

Applying RetryLayer to a Worker

use apalis::prelude::*;
use apalis::layers::RetryLayer;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let backoff = ExponentialBackoffMaker::default().make_backoff();
    let worker = WorkerBuilder::new("email_worker")
        .layer(RetryLayer::new(RetryPolicy::retries(3).with_backoff(backoff)))
        .build(handle_send_email);
}

Explanation:

  1. RetryLayer::new(RetryPolicy::retries(3)) – This applies an exponential backoff strategy, doubling the wait time between retries.
  2. .with_backoff(backoff) – The task will be retried with a custom backoff.
  3. Integration with handle_send_email– If an error occurs, the task is automatically retried according to the RetryLayer configuration.

By using RetryLayer, you can ensure that transient failures do not result in immediate task failures, improving the resilience of your apalis-based task processing system.

Conditional Retry

Use RetryIfPolicy to retry only if a predicate matches the error.

use std::io::{self, ErrorKind};
use apalis::layers::retry::RetryPolicy;

fn is_transient(e: &io::Error) -> bool {
    matches!(e.kind(),
        ErrorKind::ConnectionRefused |
        ErrorKind::ConnectionReset |
        ErrorKind::TimedOut |
        ErrorKind::Interrupted
    )
}

let policy = RetryPolicy::retries(3).retry_if(is_transient);

Metadata Based

You can set a custom retry config per task before pushing it. This allows fine grained task retry setting.

use apalis::layers::retry::{RetryPolicy, RetryConfig};
let policy = RetryPolicy::default().from_task_config();

let task = Task::new(args).retries(4).build(); // 4 retries only for this task

Catching Panics

In apalis, task execution is designed to be robust and resilient. However, sometimes a task may panic due to unexpected conditions, such as a bug in the code or an external dependency failure. To prevent such panics from crashing the entire worker process, apalis provides a built-in CatchPanicLayer, which catches panics and ensures that they are handled gracefully.

CatchPanicLayer is a middleware layer in apalis that catches panics occurring within a task's execution. Instead of propagating the panic and causing the worker to crash, the layer converts it into a logged error, allowing the task to be retried or marked as failed.

Applying CatchPanicLayer to a worker ensures that all task executions within that worker are protected against panics. Here's how to integrate it:

Example Usage

use apalis::prelude::*;
use apalis::layers::CatchPanicLayer;
use anyhow::Error;
use serde::{Deserialize, Serialize};

#[derive(Task, Debug, Serialize, Deserialize)]
struct ExampleTask;

async fn handle_example_task(_task: ExampleTask) -> Result<(), Error> {
    println!("Processing task...");
    panic!("Unexpected panic occurred!"); // Simulating a panic
}

#[tokio::main]
async fn main() {
    let worker = WorkerBuilder::new("example_worker")
        .layer(CatchPanicLayer::new()) 
        .build(handle_example_task);
}

Explanation

  • The CatchPanicLayer is applied to the worker using .layer(CatchPanicLayer), ensuring that any panic inside handle_example_task is caught.
  • The handle_example_task function deliberately triggers a panic with panic!("Unexpected panic occurred!").
  • Instead of crashing, apalis logs the panic as an error, and the task can be retried based on the retry policy (if configured).

Behavior of CatchPanicLayer

When CatchPanicLayer is enabled:

  • Panics are logged instead of crashing the worker.
  • Tasks that panic are marked as failed**.
  • Retries still apply if a RetryLayer is present.
  • Stack traces can be captured** in logs for debugging.

Combining with Retry Strategies

CatchPanicLayer works well with RetryLayer to ensure panicked tasks are retried automatically.

use std::time::Duration;

let worker = WorkerBuilder::new("example_worker")
    .catch_panic()
    .layer(RetryLayer::new(RetryPolicy::retries(3)))
    .build(handle_example_task);

By using CatchPanicLayer, you can ensure that your apalis workers remain stable and resilient even in the face of unexpected panics.