Building a production-ready task stream from TCP to apalis-postgres

Feb 20th, 2026

In this guide, we'll build a production-grade TCP → JSON ingestion → apalis-postgres pipeline in Rust with the following properties:

  • Zero-copy (no unnecessary allocations)
  • Hard global backpressure cap
  • Max connection limit
  • Per-connection JSON size limit
  • Exposed as Stream<Item = Result<T, Error>>
  • Pipe the stream to apalis-postgres

This design is suitable for standalone worker systems, orchestrators, or high-throughput event receivers.


1. Problem Statement

We want:

  1. A TCP listener.
  2. Clients send newline-delimited JSON (NDJSON).
  3. We deserialize into T.
  4. We expose everything as a Stream.
  5. We prevent:
    • Unbounded memory growth
    • Unlimited connections
    • Oversized JSON payloads
    • Silent backpressure failures
  6. Controlled pipeline to apalis-postgres

2. Dependencies

Add these to your Cargo.toml:

tokio = { version = "1", features = ["net", "macros"] }
tokio-util = { version = "0.7", features = ["codec"] }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
bytes = "1"

3. Error Type

We define structured errors for safety and observability.

#[derive(Debug)]
pub enum TaskListenerError {
    Io(std::io::Error),
    Json(serde_json::Error),
    FrameTooLarge,
    TooManyConnections,
}

4. Configuration

This allows us to control resource usage.

pub struct TaskListenerConfig {
    pub max_connections: usize,
    pub max_frame_bytes: usize,
    pub channel_capacity: usize,
}
  • max_connections → Hard cap on concurrent connections
  • max_frame_bytes → Maximum JSON size per message
  • channel_capacity → Backpressure buffer limit

5. Zero-Copy NDJSON Decoder

We implement a custom decoder using BytesMut.
This avoids allocating String and avoids UTF-8 revalidation.

struct NdjsonDecoder {
    max_len: usize,
}

impl Decoder for NdjsonDecoder {
    type Item = BytesMut;
    type Error = std::io::Error;

    fn decode(
        &mut self,
        buf: &mut BytesMut,
    ) -> Result<Option<Self::Item>, Self::Error> {
        if let Some(pos) = buf.iter().position(|b| *b == b'\n') {
            if pos > self.max_len {
                return Err(std::io::Error::new(
                    std::io::ErrorKind::InvalidData,
                    "frame too large",
                ));
            }

            let mut frame = buf.split_to(pos + 1);
            frame.truncate(pos);
            return Ok(Some(frame));
        }

        if buf.len() > self.max_len {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                "frame too large",
            ));
        }

        Ok(None)
    }
}

Why this is zero-copy:

  • split_to does not copy data
  • We deserialize directly from &[u8]
  • No String allocations

6. The Listener Stream Type

pub struct TaskListenerStream<T> {
    receiver: mpsc::Receiver<Result<T, TaskListenerError>>,
}

We internally use a bounded mpsc channel to enforce backpressure.


7. Binding the Listener

impl<T> TaskListenerStream<T>
where
    T: DeserializeOwned + Send + 'static,
{
    pub async fn bind(
        addr: &str,
        config: TaskListenerConfig,
    ) -> Result<Self, std::io::Error> {
        let listener = TcpListener::bind(addr).await?;
        let (tx, rx) = mpsc::channel(config.channel_capacity);

        let semaphore = Arc::new(Semaphore::new(config.max_connections));

        tokio::spawn(run_accept_loop::<T>(
            listener,
            tx,
            semaphore,
            config,
        ));

        Ok(Self { receiver: rx })
    }
}

This:

  • Binds TCP
  • Creates a bounded channel
  • Applies connection limits via Semaphore

8. Accept Loop with Max Connection Limit

async fn run_accept_loop<T>(
    listener: TcpListener,
    sender: mpsc::Sender<Result<T, TaskListenerError>>,
    semaphore: Arc<Semaphore>,
    config: TaskListenerConfig,
)
where
    T: DeserializeOwned + Send + 'static,
{
    loop {
        match listener.accept().await {
            Ok((socket, _)) => {
                match semaphore.clone().try_acquire_owned() {
                    Ok(permit) => {
                        let tx = sender.clone();
                        tokio::spawn(async move {
                            handle_connection::<T>(
                                socket,
                                tx,
                                config.max_frame_bytes,
                            )
                            .await;
                            drop(permit);
                        });
                    }
                    Err(_) => {
                        let _ = sender
                            .send(Err(TaskListenerError::TooManyConnections))
                            .await;
                    }
                }
            }
            Err(e) => {
                let _ = sender
                    .send(Err(TaskListenerError::Io(e)))
                    .await;
            }
        }
    }
}

This ensures:

  • No more than max_connections
  • Excess connections are rejected safely

9. Connection Handler with Hard Backpressure

async fn handle_connection<T>(
    socket: tokio::net::TcpStream,
    sender: mpsc::Sender<Result<T, TaskListenerError>>,
    max_frame: usize,
)
where
    T: DeserializeOwned + Send + 'static,
{
    let mut framed = FramedRead::new(
        socket,
        NdjsonDecoder { max_len: max_frame },
    );

    while let Some(frame) = framed.next().await {
        match frame {
            Ok(bytes) => {
                let parsed = serde_json::from_slice::<T>(&bytes)
                    .map_err(TaskListenerError::Json);

                if sender.send(parsed).await.is_err() {
                    break;
                }
            }
            Err(e) => {
                let _ = sender
                    .send(Err(TaskListenerError::Io(e)))
                    .await;
                break;
            }
        }
    }
}

Why this enforces backpressure:

  • mpsc is bounded
  • send().await blocks when full
  • TCP window shrinks naturally

10. Stream Implementation

impl<T> Stream for TaskListenerStream<T> {
    type Item = Result<T, TaskListenerError>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.receiver).poll_recv(cx)
    }
}

Now your listener behaves like any other async Stream.


11. Complete Example

main.rs
use apalis::prelude::*;
use apalis_postgres::*;
use bytes::BytesMut;
use futures::Stream;
use futures::StreamExt;
use futures::TryFutureExt;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::io::Error;
use std::io::ErrorKind;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tokio::sync::mpsc;
use tokio_util::codec::Decoder;
use tokio_util::codec::FramedRead;


#[derive(Serialize, Deserialize, Debug)]
struct MyMsg {
    id: u64,
}

async fn handle_task(item: MyMsg, wrk: WorkerContext) -> Result<(), BoxDynError> {
    Ok(())
}

#[derive(Debug)]
pub enum TaskListenerError {
    Io(std::io::Error),
    Json(serde_json::Error),
    FrameTooLarge,
    TooManyConnections,
}

pub struct TaskListenerConfig {
    pub max_connections: usize,
    pub max_frame_bytes: usize,
    pub channel_capacity: usize,
}

struct NdjsonDecoder {
    max_len: usize,
}

impl Decoder for NdjsonDecoder {
    type Item = BytesMut;
    type Error = std::io::Error;

    fn decode(
        &mut self,
        buf: &mut BytesMut,
    ) -> Result<Option<Self::Item>, Self::Error> {
        if let Some(pos) = buf.iter().position(|b| *b == b'\n') {
            if pos > self.max_len {
                return Err(std::io::Error::new(
                    std::io::ErrorKind::InvalidData,
                    "frame too large",
                ));
            }

            let mut frame = buf.split_to(pos + 1);
            frame.truncate(pos);
            return Ok(Some(frame));
        }

        if buf.len() > self.max_len {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                "frame too large",
            ));
        }

        Ok(None)
    }
}

pub struct TaskListenerStream<T> {
    receiver: mpsc::Receiver<Result<T, TaskListenerError>>,
}

async fn run_accept_loop<T>(
    listener: TcpListener,
    sender: mpsc::Sender<Result<T, TaskListenerError>>,
    semaphore: Arc<Semaphore>,
    config: TaskListenerConfig,
)
where
    T: DeserializeOwned + Send + 'static,
{
    loop {
        match listener.accept().await {
            Ok((socket, _)) => {
                match semaphore.clone().try_acquire_owned() {
                    Ok(permit) => {
                        let tx = sender.clone();
                        tokio::spawn(async move {
                            handle_connection::<T>(
                                socket,
                                tx,
                                config.max_frame_bytes,
                            )
                            .await;
                            drop(permit);
                        });
                    }
                    Err(_) => {
                        let _ = sender
                            .send(Err(TaskListenerError::TooManyConnections))
                            .await;
                    }
                }
            }
            Err(e) => {
                let _ = sender
                    .send(Err(TaskListenerError::Io(e)))
                    .await;
            }
        }
    }
}

async fn handle_connection<T>(
    socket: tokio::net::TcpStream,
    sender: mpsc::Sender<Result<T, TaskListenerError>>,
    max_frame: usize,
)
where
    T: DeserializeOwned + Send + 'static,
{
    let mut framed = FramedRead::new(
        socket,
        NdjsonDecoder { max_len: max_frame },
    );

    while let Some(frame) = framed.next().await {
        match frame {
            Ok(bytes) => {
                let parsed = serde_json::from_slice::<T>(&bytes)
                    .map_err(TaskListenerError::Json);

                if sender.send(parsed).await.is_err() {
                    break;
                }
            }
            Err(e) => {
                let _ = sender
                    .send(Err(TaskListenerError::Io(e)))
                    .await;
                break;
            }
        }
    }
}

impl<T> TaskListenerStream<T>
where
    T: DeserializeOwned + Send + 'static,
{
    pub async fn bind(
        addr: &str,
        config: TaskListenerConfig,
    ) -> Result<Self, std::io::Error> {
        let listener = TcpListener::bind(addr).await?;
        let (tx, rx) = mpsc::channel(config.channel_capacity);

        let semaphore = Arc::new(Semaphore::new(config.max_connections));

        tokio::spawn(run_accept_loop::<T>(
            listener,
            tx,
            semaphore,
            config,
        ));

        Ok(Self { receiver: rx })
    }
}

impl<T> Stream for TaskListenerStream<T> {
    type Item = Result<T, TaskListenerError>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.receiver).poll_recv(cx)
    }
}


#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let db = std::env::var("DATABASE_URL")?;
    let pool = PgPool::connect(&db).await?;

    PostgresStorage::setup(&pool).await.unwrap();
    let mut backend = PostgresStorage::new_with_notify(&pool, &Config::default());

    let mut stream = TaskListenerStream::<MyMsg>::bind(
        "127.0.0.1:5000",
        TaskListenerConfig {
            max_connections: 100,
            max_frame_bytes: 8 * 1024,
            channel_capacity: 1024,
        },
    )
    .await?
    .map(|res| res.expect("Encountered an unrecoverable error"));

    let worker = WorkerBuilder::new("worker-1")
        .backend(backend.clone())
        .build(handle_task)
        .run()
        .map_err(|e| Error::new(ErrorKind::Interrupted, e));

    let handler = backend
        .push_stream(&mut stream)
        .map_err(|e| Error::new(ErrorKind::BrokenPipe, e));

    tokio::try_join!(worker, handler)?;

    Ok(())
}

Test with:

echo '{"id":1}' | nc 127.0.0.1 5000

12. What Next

  • Handle our stream error in a better way
  • Consider an enum for multiple messages
  • Explore Codecs and TaskSinks
  • Track the added jobs with apalis-board

13. What We Achieved

FeatureHow
Zero-copyBytesMut + from_slice
Hard backpressureBounded mpsc
Connection limitSemaphore
Frame size limitCustom Decoder
Stream abstractionimpl Stream
Stream to SinkUsing push_stream

14. Why This Design Scales

  • No unbounded allocations
  • No unlimited tasks
  • Natural TCP backpressure
  • Safe under high load
  • Suitable for ingestion systems

Final Thoughts

This pattern is ideal for:

  • Event ingestion pipelines
  • Worker orchestration systems
  • Task dispatch systems
  • Distributed systems entrypoints

It gives you full control over memory, concurrency, and backpressure --- without sacrificing ergonomic async Rust patterns.

Happy building 🚀

Building a production-ready task stream from TCP to apalis-postgres – Apalis Blog