Skip to content

Refactor subscriber queues to bounded tokio mpsc channels with batching and separate QoS1 inflight tracking#13

Closed
bravo1goingdark wants to merge 1 commit intomainfrom
redesign-subscriberqueue-with-bounded-channel
Closed

Refactor subscriber queues to bounded tokio mpsc channels with batching and separate QoS1 inflight tracking#13
bravo1goingdark wants to merge 1 commit intomainfrom
redesign-subscriberqueue-with-bounded-channel

Conversation

@bravo1goingdark
Copy link
Owner

Motivation

  • Reduce contention on per-subscriber state by moving the hot-path queue off a Mutex<VecDeque> and into a bounded channel implementation.
  • Reduce per-message locking by batching enqueues in the broker publish path.
  • Make QoS1 inflight tracking cheaper on the hot-path by storing inflight state in a separate concurrent map.
  • Expose configurable backpressure behavior (drop, block, shed) for full per-subscriber queues.

Description

  • Replaced SubscriberQueue internal Mutex<VecDeque<...>> with a bounded tokio::mpsc channel and added enqueue_batch/send_entry helpers to support batching and different backpressure policies via a new BackpressurePolicy enum and BrokerConfig::per_subscriber_backpressure.
  • Moved QoS1 inflight tracking out of the queue into SubscriberInflight (a Mutex<HashMap<DeliveryTag, QueueEntry>>) and changed dequeue/ack/maintenance logic to use this separate inflight map.
  • Implemented batching in Broker::publish_with_wal_id by collecting subscriber references and calling enqueue_batch to reduce per-message locking and cloning overhead.
  • Added QueueEntry::new constructor and helper methods: SubscriberQueue::requeue, SubscriberInflight::maintenance_tick, SubscriberInflight::insert/ack/len and updated maintenance_tick to use the new inflight-based retry/requeue flow.
  • Updated call sites and configs across the repo to provide the new backpressure setting (tests, bench, daemon): bench/src/main.rs, blipmqd/src/main.rs, tests/daemon_integration.rs, and tests/chaos_recovery.rs.

Testing

  • Ran cargo fmt to format modified files; formatting succeeded.
  • No automated unit/integration tests (cargo test) were executed as part of this change.

Codex Task

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant