State Management
On this page
Effective state management is crucial for building robust task processing systems. Apalis provides two primary mechanisms for managing state in your task services: Request Context for backend-specific metadata and Data<T> for application-wide shared state. Understanding when and how to use each approach will help you build more maintainable and efficient task processing systems.
Execution Context
Execution Context represents backend-specific metadata that travels with each task request. This context is defined by the backend implementation and provides essential information about the task's lifecycle, execution state, and backend-specific properties.
Each backend in Apalis specifies its own context type through the generic Backend<Args, Ctx> trait. This allows different storage backends to provide relevant metadata while maintaining a consistent interface.
Postgres Example
The PostgreSQL backend provides rich context information for task management:
pub struct PgContext {
run_at: DateTime<Utc>, // When the task should be executed
max_attempts: i32, // Maximum retry attempts allowed
last_error: Option<String>, // Last error message if failed
lock_at: Option<i64>, // When the task was locked for processing
lock_by: Option<WorkerId>, // Which worker has locked this task
done_at: Option<i64>, // When the task completed
priority: i32, // Task priority for scheduling
}Backend Context
You can access backend context provided the relevant backend you are using eg:
The following types can be accessed as part of the task state:
- [
Attempt] : The current attempt - [
TaskId] : The current tasks id
async fn process(
_: Email,
context: PgContext,
// meta: Meta<TracingContext>
) -> Result<(), EmailError> {
// Make decisions based on context
if context.priority() > 5 {
// Handle high-priority tasks differently
return Ok(());
}
// Regular task processing
Ok(())
}State with Data<T>
Data<T> provides a way to share application-wide state across all task handlers. This is ideal for configuration, database connections, external service clients, and other resources that should be shared across your application.
pub struct AppState {
pub database: PgPool,
pub email_client: EmailClient,
pub config: AppConfig,
pub metrics: MetricsCollector,
}
type State = Data<Arc<AppState>>;
async fn process_with_state(
_: Email,
state: State
) -> Result<(), EmailError> {
// Use your state
Ok(())
}
State Composition Patterns
Layered State Architecture
#[derive(Clone)]
pub struct DatabaseLayer {
pub user_repo: UserRepository,
pub order_repo: OrderRepository,
pub analytics_db: AnalyticsDatabase,
}
#[derive(Clone)]
pub struct ServiceLayer {
pub payment_service: PaymentService,
pub notification_service: NotificationService,
pub audit_service: AuditService,
}
#[derive(Clone)]
pub struct AppState {
pub database: DatabaseLayer,
pub services: ServiceLayer,
pub config: AppConfig,
}
async fn process_payment(
payment_task: PaymentTask,
state: Data<AppState>,
) -> Result<(), PaymentError> {
// Clean separation of concerns
let user = state.database.user_repo
.find_by_id(payment_task.user_id)
.await?;
let payment_result = state.services.payment_service
.process_payment(&payment_task.amount, &user.payment_method)
.await?;
state.services.audit_service
.log_payment_attempt(&payment_result)
.await?;
Ok(())
}Feature-Specific State
#[derive(Clone)]
pub struct EmailState {
pub smtp_client: SmtpClient,
pub template_engine: TemplateEngine,
pub rate_limiter: Arc<RateLimiter>,
}
#[derive(Clone)]
pub struct ImageProcessingState {
pub s3_client: S3Client,
pub image_processor: ImageProcessor,
pub cdn_config: CdnConfig,
}
// Inject only relevant state
async fn send_email(
email_task: EmailTask,
email_state: Data<EmailState>,
) -> Result<(), EmailError> {
// Focused state access
email_state.rate_limiter.check_limit().await?;
let rendered = email_state.template_engine
.render(&email_task.template, &email_task.data)?;
email_state.smtp_client
.send_email(&rendered)
.await?;
Ok(())
}Combining Context and State
The most powerful patterns emerge when combining request context with application state:
async fn adaptive_processing(
task: Email,
context: PgContext,
attempt: Attempt,
state: Data<AppState>,
) -> Result<(), ProcessingError> {
// Adapt processing based on context and state
let processor = if context.priority > 8 {
&state.services.high_priority_processor
} else if attempt.current() > 2 {
&state.services.retry_processor // More resilient processor for retries
} else {
&state.services.standard_processor
};
// Use context for timeout decisions
let timeout = calculate_timeout(
context.max_attempts,
parts.attempt.current(),
&state.config.timeout_strategy,
);
// Log context information
state.metrics.record_task_attempt(
context.priority,
parts.attempt.current(),
context.run_at,
);
processor.process_with_timeout(task, timeout).await
}Best Practices
Context Guidelines
- Read-Only: Treat request context as read-only metadata
- Backend-Specific: Remember that context structure depends on your backend choice
- Lifecycle Awareness: Use context to understand where the task is in its lifecycle
- Error Context: Leverage previous error information for smarter retry logic
State Guidelines
- Minimize State: Only include truly shared resources in
Data<T> - Immutable Preference: Prefer immutable state where possible
- Resource Management: Use
Arc<T>for expensive-to-clone resources - Health Monitoring: Implement health checks for stateful resources
- Scoped Access: Consider feature-specific state types for better organization
Performance Considerations
- Clone Efficiency: Ensure your state types clone efficiently (use
Arc<T>for heavy resources) - Connection Pooling: Use connection pools rather than individual connections
- Lazy Initialization: Consider lazy initialization for expensive resources
- Metrics Integration: Monitor state usage and performance
The complete example
use apalis::prelude::*;
use std::task::{Context, Poll};
use apalis::layers::tracing::{TracingContext};
use futures::future::BoxFuture;
use examples::*;
use apalis_postgres::*;
use std::sync::Arc;
async fn process(
_: Email,
context: PgContext,
// meta: Meta<TracingContext>
) -> Result<(), EmailError> {
// Make decisions based on context
if context.priority() > 5 {
// Handle high-priority tasks differently
return Ok(());
}
// Regular task processing
Ok(())
}
pub struct AppState {
pub database: PgPool,
pub email_client: EmailClient,
pub config: AppConfig,
pub metrics: MetricsCollector,
}
type State = Data<Arc<AppState>>;
async fn process_with_state(
_: Email,
state: State
) -> Result<(), EmailError> {
// Use your state
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
let pool = PgPool::connect("DATABASE_URL").await?;
let mut pg_store = PostgresStorage::new(&pool);
let simple_fn = WorkerBuilder::new("email_worker")
.backend(pg_store)
.build(process)
.run()
.await
.unwrap();
let mut storage = MemoryStorage::new();
let state = AppState {
config: AppConfig::default(),
database: pool,
email_client: EmailClient::default(),
metrics: MetricsCollector::default()
};
let with_state = WorkerBuilder::new("email_worker")
.backend(storage)
.data(Arc::new(state))
.build(process_with_state)
.run()
.await
.unwrap();
Ok(())
}Conclusion
Effective state management in Apalis requires understanding the distinction between request context and application state. Use request context to access backend-specific metadata and make context-aware decisions about task processing. Use Data to share application resources and configuration across handlers. Combining both approaches enables sophisticated, adaptive task processing systems that can handle complex requirements while maintaining clean, testable code.