Building a production-ready task stream from TCP to apalis-postgres
In this guide, we'll build a production-grade TCP → JSON ingestion → apalis-postgres pipeline in Rust with the following properties:
- Zero-copy (no unnecessary allocations)
- Hard global backpressure cap
- Max connection limit
- Per-connection JSON size limit
- Exposed as
Stream<Item = Result<T, Error>> - Pipe the stream to
apalis-postgres
This design is suitable for standalone worker systems, orchestrators, or high-throughput event receivers.
1. Problem Statement
We want:
- A TCP listener.
- Clients send newline-delimited JSON (NDJSON).
- We deserialize into
T. - We expose everything as a
Stream. - We prevent:
- Unbounded memory growth
- Unlimited connections
- Oversized JSON payloads
- Silent backpressure failures
- Controlled pipeline to
apalis-postgres
2. Dependencies
Add these to your Cargo.toml:
tokio = { version = "1", features = ["net", "macros"] }
tokio-util = { version = "0.7", features = ["codec"] }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
bytes = "1"3. Error Type
We define structured errors for safety and observability.
#[derive(Debug)]
pub enum TaskListenerError {
Io(std::io::Error),
Json(serde_json::Error),
FrameTooLarge,
TooManyConnections,
}4. Configuration
This allows us to control resource usage.
pub struct TaskListenerConfig {
pub max_connections: usize,
pub max_frame_bytes: usize,
pub channel_capacity: usize,
}max_connections→ Hard cap on concurrent connectionsmax_frame_bytes→ Maximum JSON size per messagechannel_capacity→ Backpressure buffer limit
5. Zero-Copy NDJSON Decoder
We implement a custom decoder using BytesMut.
This avoids allocating String and avoids UTF-8 revalidation.
struct NdjsonDecoder {
max_len: usize,
}
impl Decoder for NdjsonDecoder {
type Item = BytesMut;
type Error = std::io::Error;
fn decode(
&mut self,
buf: &mut BytesMut,
) -> Result<Option<Self::Item>, Self::Error> {
if let Some(pos) = buf.iter().position(|b| *b == b'\n') {
if pos > self.max_len {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"frame too large",
));
}
let mut frame = buf.split_to(pos + 1);
frame.truncate(pos);
return Ok(Some(frame));
}
if buf.len() > self.max_len {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"frame too large",
));
}
Ok(None)
}
}Why this is zero-copy:
split_todoes not copy data- We deserialize directly from
&[u8] - No
Stringallocations
6. The Listener Stream Type
pub struct TaskListenerStream<T> {
receiver: mpsc::Receiver<Result<T, TaskListenerError>>,
}We internally use a bounded mpsc channel to enforce backpressure.
7. Binding the Listener
impl<T> TaskListenerStream<T>
where
T: DeserializeOwned + Send + 'static,
{
pub async fn bind(
addr: &str,
config: TaskListenerConfig,
) -> Result<Self, std::io::Error> {
let listener = TcpListener::bind(addr).await?;
let (tx, rx) = mpsc::channel(config.channel_capacity);
let semaphore = Arc::new(Semaphore::new(config.max_connections));
tokio::spawn(run_accept_loop::<T>(
listener,
tx,
semaphore,
config,
));
Ok(Self { receiver: rx })
}
}This:
- Binds TCP
- Creates a bounded channel
- Applies connection limits via
Semaphore
8. Accept Loop with Max Connection Limit
async fn run_accept_loop<T>(
listener: TcpListener,
sender: mpsc::Sender<Result<T, TaskListenerError>>,
semaphore: Arc<Semaphore>,
config: TaskListenerConfig,
)
where
T: DeserializeOwned + Send + 'static,
{
loop {
match listener.accept().await {
Ok((socket, _)) => {
match semaphore.clone().try_acquire_owned() {
Ok(permit) => {
let tx = sender.clone();
tokio::spawn(async move {
handle_connection::<T>(
socket,
tx,
config.max_frame_bytes,
)
.await;
drop(permit);
});
}
Err(_) => {
let _ = sender
.send(Err(TaskListenerError::TooManyConnections))
.await;
}
}
}
Err(e) => {
let _ = sender
.send(Err(TaskListenerError::Io(e)))
.await;
}
}
}
}This ensures:
- No more than
max_connections - Excess connections are rejected safely
9. Connection Handler with Hard Backpressure
async fn handle_connection<T>(
socket: tokio::net::TcpStream,
sender: mpsc::Sender<Result<T, TaskListenerError>>,
max_frame: usize,
)
where
T: DeserializeOwned + Send + 'static,
{
let mut framed = FramedRead::new(
socket,
NdjsonDecoder { max_len: max_frame },
);
while let Some(frame) = framed.next().await {
match frame {
Ok(bytes) => {
let parsed = serde_json::from_slice::<T>(&bytes)
.map_err(TaskListenerError::Json);
if sender.send(parsed).await.is_err() {
break;
}
}
Err(e) => {
let _ = sender
.send(Err(TaskListenerError::Io(e)))
.await;
break;
}
}
}
}Why this enforces backpressure:
mpscis boundedsend().awaitblocks when full- TCP window shrinks naturally
10. Stream Implementation
impl<T> Stream for TaskListenerStream<T> {
type Item = Result<T, TaskListenerError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.receiver).poll_recv(cx)
}
}Now your listener behaves like any other async Stream.
11. Complete Example
use apalis::prelude::*;
use apalis_postgres::*;
use bytes::BytesMut;
use futures::Stream;
use futures::StreamExt;
use futures::TryFutureExt;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::io::Error;
use std::io::ErrorKind;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tokio::sync::mpsc;
use tokio_util::codec::Decoder;
use tokio_util::codec::FramedRead;
#[derive(Serialize, Deserialize, Debug)]
struct MyMsg {
id: u64,
}
async fn handle_task(item: MyMsg, wrk: WorkerContext) -> Result<(), BoxDynError> {
Ok(())
}
#[derive(Debug)]
pub enum TaskListenerError {
Io(std::io::Error),
Json(serde_json::Error),
FrameTooLarge,
TooManyConnections,
}
pub struct TaskListenerConfig {
pub max_connections: usize,
pub max_frame_bytes: usize,
pub channel_capacity: usize,
}
struct NdjsonDecoder {
max_len: usize,
}
impl Decoder for NdjsonDecoder {
type Item = BytesMut;
type Error = std::io::Error;
fn decode(
&mut self,
buf: &mut BytesMut,
) -> Result<Option<Self::Item>, Self::Error> {
if let Some(pos) = buf.iter().position(|b| *b == b'\n') {
if pos > self.max_len {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"frame too large",
));
}
let mut frame = buf.split_to(pos + 1);
frame.truncate(pos);
return Ok(Some(frame));
}
if buf.len() > self.max_len {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"frame too large",
));
}
Ok(None)
}
}
pub struct TaskListenerStream<T> {
receiver: mpsc::Receiver<Result<T, TaskListenerError>>,
}
async fn run_accept_loop<T>(
listener: TcpListener,
sender: mpsc::Sender<Result<T, TaskListenerError>>,
semaphore: Arc<Semaphore>,
config: TaskListenerConfig,
)
where
T: DeserializeOwned + Send + 'static,
{
loop {
match listener.accept().await {
Ok((socket, _)) => {
match semaphore.clone().try_acquire_owned() {
Ok(permit) => {
let tx = sender.clone();
tokio::spawn(async move {
handle_connection::<T>(
socket,
tx,
config.max_frame_bytes,
)
.await;
drop(permit);
});
}
Err(_) => {
let _ = sender
.send(Err(TaskListenerError::TooManyConnections))
.await;
}
}
}
Err(e) => {
let _ = sender
.send(Err(TaskListenerError::Io(e)))
.await;
}
}
}
}
async fn handle_connection<T>(
socket: tokio::net::TcpStream,
sender: mpsc::Sender<Result<T, TaskListenerError>>,
max_frame: usize,
)
where
T: DeserializeOwned + Send + 'static,
{
let mut framed = FramedRead::new(
socket,
NdjsonDecoder { max_len: max_frame },
);
while let Some(frame) = framed.next().await {
match frame {
Ok(bytes) => {
let parsed = serde_json::from_slice::<T>(&bytes)
.map_err(TaskListenerError::Json);
if sender.send(parsed).await.is_err() {
break;
}
}
Err(e) => {
let _ = sender
.send(Err(TaskListenerError::Io(e)))
.await;
break;
}
}
}
}
impl<T> TaskListenerStream<T>
where
T: DeserializeOwned + Send + 'static,
{
pub async fn bind(
addr: &str,
config: TaskListenerConfig,
) -> Result<Self, std::io::Error> {
let listener = TcpListener::bind(addr).await?;
let (tx, rx) = mpsc::channel(config.channel_capacity);
let semaphore = Arc::new(Semaphore::new(config.max_connections));
tokio::spawn(run_accept_loop::<T>(
listener,
tx,
semaphore,
config,
));
Ok(Self { receiver: rx })
}
}
impl<T> Stream for TaskListenerStream<T> {
type Item = Result<T, TaskListenerError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.receiver).poll_recv(cx)
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let db = std::env::var("DATABASE_URL")?;
let pool = PgPool::connect(&db).await?;
PostgresStorage::setup(&pool).await.unwrap();
let mut backend = PostgresStorage::new_with_notify(&pool, &Config::default());
let mut stream = TaskListenerStream::<MyMsg>::bind(
"127.0.0.1:5000",
TaskListenerConfig {
max_connections: 100,
max_frame_bytes: 8 * 1024,
channel_capacity: 1024,
},
)
.await?
.map(|res| res.expect("Encountered an unrecoverable error"));
let worker = WorkerBuilder::new("worker-1")
.backend(backend.clone())
.build(handle_task)
.run()
.map_err(|e| Error::new(ErrorKind::Interrupted, e));
let handler = backend
.push_stream(&mut stream)
.map_err(|e| Error::new(ErrorKind::BrokenPipe, e));
tokio::try_join!(worker, handler)?;
Ok(())
}Test with:
echo '{"id":1}' | nc 127.0.0.1 500012. What Next
- Handle our stream error in a better way
- Consider an enum for multiple messages
- Explore Codecs and TaskSinks
- Track the added jobs with apalis-board
13. What We Achieved
| Feature | How |
|---|---|
| Zero-copy | BytesMut + from_slice |
| Hard backpressure | Bounded mpsc |
| Connection limit | Semaphore |
| Frame size limit | Custom Decoder |
| Stream abstraction | impl Stream |
| Stream to Sink | Using push_stream |
14. Why This Design Scales
- No unbounded allocations
- No unlimited tasks
- Natural TCP backpressure
- Safe under high load
- Suitable for ingestion systems
Final Thoughts
This pattern is ideal for:
- Event ingestion pipelines
- Worker orchestration systems
- Task dispatch systems
- Distributed systems entrypoints
It gives you full control over memory, concurrency, and backpressure --- without sacrificing ergonomic async Rust patterns.
Happy building 🚀