State Management

Last edited 4 minutes ago.

On this page

Effective state management is crucial for building robust task processing systems. Apalis provides two primary mechanisms for managing state in your task services: Request Context for backend-specific metadata and Data<T> for application-wide shared state. Understanding when and how to use each approach will help you build more maintainable and efficient task processing systems.

Execution Context

Execution Context represents backend-specific metadata that travels with each task request. This context is defined by the backend implementation and provides essential information about the task's lifecycle, execution state, and backend-specific properties.

Each backend in Apalis specifies its own context type through the generic Backend<Args, Ctx> trait. This allows different storage backends to provide relevant metadata while maintaining a consistent interface.

Postgres Example

The PostgreSQL backend provides rich context information for task management:

pub struct PgContext {
    run_at: DateTime<Utc>,      // When the task should be executed
    max_attempts: i32,          // Maximum retry attempts allowed
    last_error: Option<String>, // Last error message if failed
    lock_at: Option<i64>,       // When the task was locked for processing
    lock_by: Option<WorkerId>,  // Which worker has locked this task
    done_at: Option<i64>,       // When the task completed
    priority: i32,              // Task priority for scheduling
}

Backend Context

You can access backend context provided the relevant backend you are using eg:

The following types can be accessed as part of the task state:

  • [Attempt] : The current attempt
  • [TaskId] : The current tasks id
async fn process(
    _: Email,
    context: PgContext,
//    meta: Meta<TracingContext>
) -> Result<(), EmailError> {

    // Make decisions based on context
    if context.priority() > 5 {
        // Handle high-priority tasks differently
        return Ok(());
    }
    
    // Regular task processing
    Ok(())
}

State with Data<T>

Data<T> provides a way to share application-wide state across all task handlers. This is ideal for configuration, database connections, external service clients, and other resources that should be shared across your application.

pub struct AppState {
    pub database: PgPool,
    pub email_client: EmailClient,
    pub config: AppConfig,
    pub metrics: MetricsCollector,
}

type State = Data<Arc<AppState>>;

async fn process_with_state(
    _: Email,
    state: State
) -> Result<(), EmailError> {
    // Use your state
    Ok(())
}

State Composition Patterns

Layered State Architecture

#[derive(Clone)]
pub struct DatabaseLayer {
    pub user_repo: UserRepository,
    pub order_repo: OrderRepository,
    pub analytics_db: AnalyticsDatabase,
}

#[derive(Clone)]
pub struct ServiceLayer {
    pub payment_service: PaymentService,
    pub notification_service: NotificationService,
    pub audit_service: AuditService,
}

#[derive(Clone)]
pub struct AppState {
    pub database: DatabaseLayer,
    pub services: ServiceLayer,
    pub config: AppConfig,
}

async fn process_payment(
    payment_task: PaymentTask,
    state: Data<AppState>,
) -> Result<(), PaymentError> {
    // Clean separation of concerns
    let user = state.database.user_repo
        .find_by_id(payment_task.user_id)
        .await?;
        
    let payment_result = state.services.payment_service
        .process_payment(&payment_task.amount, &user.payment_method)
        .await?;
        
    state.services.audit_service
        .log_payment_attempt(&payment_result)
        .await?;
        
    Ok(())
}

Feature-Specific State

#[derive(Clone)]
pub struct EmailState {
    pub smtp_client: SmtpClient,
    pub template_engine: TemplateEngine,
    pub rate_limiter: Arc<RateLimiter>,
}

#[derive(Clone)]
pub struct ImageProcessingState {
    pub s3_client: S3Client,
    pub image_processor: ImageProcessor,
    pub cdn_config: CdnConfig,
}

// Inject only relevant state
async fn send_email(
    email_task: EmailTask,
    email_state: Data<EmailState>,
) -> Result<(), EmailError> {
    // Focused state access
    email_state.rate_limiter.check_limit().await?;
    
    let rendered = email_state.template_engine
        .render(&email_task.template, &email_task.data)?;
        
    email_state.smtp_client
        .send_email(&rendered)
        .await?;
        
    Ok(())
}

Combining Context and State

The most powerful patterns emerge when combining request context with application state:

async fn adaptive_processing(
    task: Email,
    context: PgContext,
    attempt: Attempt,
    state: Data<AppState>,
) -> Result<(), ProcessingError> {
    
    // Adapt processing based on context and state
    let processor = if context.priority > 8 {
        &state.services.high_priority_processor
    } else if attempt.current() > 2 {
        &state.services.retry_processor  // More resilient processor for retries
    } else {
        &state.services.standard_processor
    };
    
    // Use context for timeout decisions
    let timeout = calculate_timeout(
        context.max_attempts,
        parts.attempt.current(),
        &state.config.timeout_strategy,
    );
    
    // Log context information
    state.metrics.record_task_attempt(
        context.priority,
        parts.attempt.current(),
        context.run_at,
    );
    
    processor.process_with_timeout(task, timeout).await
}

Best Practices

Context Guidelines

  • Read-Only: Treat request context as read-only metadata
  • Backend-Specific: Remember that context structure depends on your backend choice
  • Lifecycle Awareness: Use context to understand where the task is in its lifecycle
  • Error Context: Leverage previous error information for smarter retry logic

State Guidelines

  • Minimize State: Only include truly shared resources in Data<T>
  • Immutable Preference: Prefer immutable state where possible
  • Resource Management: Use Arc<T> for expensive-to-clone resources
  • Health Monitoring: Implement health checks for stateful resources
  • Scoped Access: Consider feature-specific state types for better organization

Performance Considerations

  • Clone Efficiency: Ensure your state types clone efficiently (use Arc<T> for heavy resources)
  • Connection Pooling: Use connection pools rather than individual connections
  • Lazy Initialization: Consider lazy initialization for expensive resources
  • Metrics Integration: Monitor state usage and performance

The complete example

main.rs
use apalis::prelude::*;
use std::task::{Context, Poll};
use apalis::layers::tracing::{TracingContext};
use futures::future::BoxFuture;
use examples::*;
use apalis_postgres::*;
use std::sync::Arc;


async fn process(
    _: Email,
    context: PgContext,
//    meta: Meta<TracingContext>
) -> Result<(), EmailError> {

    // Make decisions based on context
    if context.priority() > 5 {
        // Handle high-priority tasks differently
        return Ok(());
    }
    
    // Regular task processing
    Ok(())
}

pub struct AppState {
    pub database: PgPool,
    pub email_client: EmailClient,
    pub config: AppConfig,
    pub metrics: MetricsCollector,
}

type State = Data<Arc<AppState>>;

async fn process_with_state(
    _: Email,
    state: State
) -> Result<(), EmailError> {
    // Use your state
    Ok(())
}


#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
    let pool = PgPool::connect("DATABASE_URL").await?;
    let mut pg_store = PostgresStorage::new(&pool);
    let simple_fn = WorkerBuilder::new("email_worker")
        .backend(pg_store)
        .build(process)
        .run()
        .await
        .unwrap();
    
    let mut storage = MemoryStorage::new();
    let state = AppState {
        config: AppConfig::default(),
        database: pool,
        email_client: EmailClient::default(),
        metrics: MetricsCollector::default()
    };
    let with_state = WorkerBuilder::new("email_worker")
        .backend(storage)
        .data(Arc::new(state))
        .build(process_with_state)
        .run()
        .await
        .unwrap();

    Ok(())
}

Conclusion

Effective state management in Apalis requires understanding the distinction between request context and application state. Use request context to access backend-specific metadata and make context-aware decisions about task processing. Use Data to share application resources and configuration across handlers. Combining both approaches enables sophisticated, adaptive task processing systems that can handle complex requirements while maintaining clean, testable code.