Skip to content

Commit b930d07

Browse files
Refactor topic pipeline to lock-free fanout
1 parent dac6b0c commit b930d07

File tree

4 files changed

+177
-106
lines changed

4 files changed

+177
-106
lines changed

src/core/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod command;
44
pub mod delivery_mode;
55
pub mod dlq;
66
pub mod error;
7+
pub mod lockfree;
78
pub mod memory_pool;
89
pub mod message;
910
pub mod publisher;

src/core/topics/topic.rs

Lines changed: 136 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,40 @@
1+
use ahash::AHasher;
12
use dashmap::DashMap;
2-
use flume::{Receiver, Sender};
3+
use std::fmt;
4+
use std::hash::{Hash, Hasher};
35
use std::sync::Arc;
6+
use tokio::sync::Notify;
47
use tokio::task;
8+
use tracing::debug;
59

610
use crate::config::CONFIG;
711
use crate::core::delivery_mode::DeliveryMode;
812
use crate::core::error::BlipError;
13+
use crate::core::lockfree::MpmcQueue;
914
use crate::core::message::{to_wire_message, Message, WireMessage};
1015
use crate::core::subscriber::{Subscriber, SubscriberId};
11-
use tracing::debug;
12-
use ahash::AHasher;
13-
use std::hash::{Hash, Hasher};
14-
use tokio::sync::mpsc;
16+
use crate::util::backoff::AdaptiveYield;
1517

1618
pub type TopicName = String;
1719

18-
/// A Topic holds a list of subscribers and a fanout task.
19-
///
20-
/// Messages are first sent to a bounded flume::Sender,
21-
/// and then fanned out to per-subscriber queues in a dedicated task.
22-
#[derive(Debug)]
20+
/// A Topic holds a list of subscribers and orchestrates a high-performance fanout pipeline.
2321
pub struct Topic {
2422
name: TopicName,
2523
subscribers: Arc<DashMap<SubscriberId, Subscriber>>, // global view
2624
shards: Vec<Shard>,
27-
input_tx: Sender<Arc<Message>>,
25+
ingress_queue: Arc<MpmcQueue<Arc<Message>>>,
26+
ingress_notify: Arc<Notify>,
2827
}
2928

30-
#[derive(Debug)]
3129
struct Shard {
3230
subs: Arc<DashMap<SubscriberId, Subscriber>>, // partitioned subscribers
33-
tx: mpsc::Sender<Arc<WireMessage>>, // wire frames to process
31+
queue: Arc<MpmcQueue<Arc<WireMessage>>>,
32+
notify: Arc<Notify>,
3433
}
3534

3635
impl Topic {
3736
pub fn new(name: impl Into<TopicName>, queue_capacity: usize) -> Self {
3837
let name = name.into();
39-
let (tx, rx) = flume::bounded(queue_capacity);
4038

4139
// Determine shard count based on config or available parallelism
4240
let shard_count = {
@@ -51,115 +49,118 @@ impl Topic {
5149
}
5250
.clamp(1, 16);
5351

54-
// Create shards and their worker channels
5552
let mut shards: Vec<Shard> = Vec::with_capacity(shard_count);
5653
let global_subs: Arc<DashMap<SubscriberId, Subscriber>> = Arc::new(DashMap::new());
57-
let topic_name_for_workers = name.clone();
54+
let shard_queue_capacity = queue_capacity.max(1024);
5855
for shard_index in 0..shard_count {
59-
let (s_tx, mut s_rx) = mpsc::channel::<Arc<WireMessage>>(1024);
6056
let shard_subs: Arc<DashMap<SubscriberId, Subscriber>> = Arc::new(DashMap::new());
61-
let shard_subs_for_task = Arc::clone(&shard_subs);
62-
let global_subs_clone = Arc::clone(&global_subs);
63-
let topic_name_clone = topic_name_for_workers.clone();
64-
65-
// Worker task for this shard
66-
task::spawn(async move {
67-
// Best-effort: attempt to pin this worker to a specific core (no-op on non-Windows)
68-
crate::util::affinity::set_current_thread_affinity(shard_index);
69-
// Reuse buffers locally in the worker as needed
70-
let mut disconnected: Vec<SubscriberId> = Vec::with_capacity(16);
71-
loop {
72-
match s_rx.recv().await {
73-
Some(wire) => {
74-
// TTL drop early (keeps queues small)
75-
let now = crate::core::message::current_timestamp();
76-
if wire.is_expired(now) {
77-
crate::metrics::inc_dropped_ttl(1);
78-
continue;
79-
}
57+
let shard_queue = Arc::new(MpmcQueue::new(shard_queue_capacity));
58+
let shard_notify = Arc::new(Notify::new());
8059

81-
// Snapshot this shard's subscribers (pre-size to reduce reallocations)
82-
let mut subs_snapshot: Vec<Subscriber> =
83-
Vec::with_capacity(shard_subs_for_task.len());
84-
for e in shard_subs_for_task.iter() {
85-
subs_snapshot.push(e.value().clone());
86-
}
87-
88-
disconnected.clear();
89-
for subscriber in subs_snapshot.iter() {
90-
let id = subscriber.id().clone();
91-
let res = subscriber.enqueue(wire.clone());
92-
match res {
93-
Ok(_) => {
94-
crate::metrics::inc_enqueued(1);
95-
}
96-
Err(BlipError::Disconnected | BlipError::QueueClosed) => {
97-
disconnected.push(id);
98-
}
99-
Err(BlipError::QueueFull) => {
100-
crate::metrics::inc_dropped_sub_queue_full(1);
101-
}
102-
Err(_) => {
103-
// ignore others
104-
}
105-
}
106-
}
107-
108-
// Remove disconnected from shard and global registries
109-
for id in disconnected.drain(..) {
110-
if let Some(_) = shard_subs_for_task.remove(&id) {
111-
global_subs_clone.remove(&id);
112-
debug!("Removed disconnected subscriber: {}", id);
113-
}
114-
}
115-
}
116-
None => {
117-
tracing::info!("Shard worker exited for topic: {}", topic_name_clone);
118-
break;
119-
}
120-
}
121-
}
122-
});
60+
Self::spawn_shard_worker(
61+
shard_index,
62+
Arc::clone(&shard_subs),
63+
Arc::clone(&global_subs),
64+
Arc::clone(&shard_queue),
65+
Arc::clone(&shard_notify),
66+
);
12367

12468
shards.push(Shard {
12569
subs: shard_subs,
126-
tx: s_tx,
70+
queue: shard_queue,
71+
notify: shard_notify,
12772
});
12873
}
12974

75+
let ingress_queue = Arc::new(MpmcQueue::new(queue_capacity.max(1024)));
76+
let ingress_notify = Arc::new(Notify::new());
77+
13078
let topic = Self {
13179
name: name.clone(),
13280
subscribers: global_subs,
13381
shards,
134-
input_tx: tx,
82+
ingress_queue,
83+
ingress_notify,
13584
};
13685

137-
topic.spawn_fanout_task(rx);
86+
topic.spawn_dispatcher();
13887
topic
13988
}
14089

141-
fn spawn_fanout_task(&self, rx: Receiver<Arc<Message>>) {
142-
let shard_senders: Vec<mpsc::Sender<Arc<WireMessage>>> =
143-
self.shards.iter().map(|s| s.tx.clone()).collect();
144-
let topic_name = self.name.clone();
90+
fn spawn_dispatcher(&self) {
91+
let ingress_queue = Arc::clone(&self.ingress_queue);
92+
let ingress_notify = Arc::clone(&self.ingress_notify);
93+
let shard_channels: Vec<(Arc<MpmcQueue<Arc<WireMessage>>>, Arc<Notify>)> = self
94+
.shards
95+
.iter()
96+
.map(|s| (Arc::clone(&s.queue), Arc::clone(&s.notify)))
97+
.collect();
14598

14699
task::spawn(async move {
147-
while let Ok(message) = rx.recv_async().await {
148-
// Pre-encode the wire frame once for this published message
149-
let wire = Arc::new(to_wire_message(&message));
150-
// Send to all shard workers
151-
for tx in &shard_senders {
152-
if tx.try_send(wire.clone()).is_err() {
153-
// fall back to await if channel is full
154-
if tx.send(wire.clone()).await.is_err() {
155-
// shard worker exited; continue
156-
continue;
157-
}
100+
loop {
101+
while let Some(message) = ingress_queue.try_dequeue() {
102+
let wire = Arc::new(to_wire_message(&message));
103+
for (queue, notify) in &shard_channels {
104+
push_with_backpressure(queue, notify, wire.clone()).await;
158105
}
159106
}
107+
108+
ingress_notify.notified().await;
160109
}
110+
});
111+
}
112+
113+
fn spawn_shard_worker(
114+
shard_index: usize,
115+
shard_subs: Arc<DashMap<SubscriberId, Subscriber>>,
116+
global_subs: Arc<DashMap<SubscriberId, Subscriber>>,
117+
queue: Arc<MpmcQueue<Arc<WireMessage>>>,
118+
notify: Arc<Notify>,
119+
) {
120+
task::spawn(async move {
121+
crate::util::affinity::set_current_thread_affinity(shard_index);
122+
let mut disconnected: Vec<SubscriberId> = Vec::with_capacity(16);
123+
loop {
124+
while let Some(wire) = queue.try_dequeue() {
125+
let now = crate::core::message::current_timestamp();
126+
if wire.is_expired(now) {
127+
crate::metrics::inc_dropped_ttl(1);
128+
continue;
129+
}
161130

162-
tracing::info!("Fanout task exited for topic: {}", topic_name);
131+
let mut subs_snapshot: Vec<Subscriber> = Vec::with_capacity(shard_subs.len());
132+
for e in shard_subs.iter() {
133+
subs_snapshot.push(e.value().clone());
134+
}
135+
136+
disconnected.clear();
137+
for subscriber in subs_snapshot.iter() {
138+
let id = subscriber.id().clone();
139+
let res = subscriber.enqueue(wire.clone());
140+
match res {
141+
Ok(_) => {
142+
crate::metrics::inc_enqueued(1);
143+
}
144+
Err(BlipError::Disconnected | BlipError::QueueClosed) => {
145+
disconnected.push(id);
146+
}
147+
Err(BlipError::QueueFull) => {
148+
crate::metrics::inc_dropped_sub_queue_full(1);
149+
}
150+
Err(_) => {}
151+
}
152+
}
153+
154+
for id in disconnected.drain(..) {
155+
if let Some(_) = shard_subs.remove(&id) {
156+
global_subs.remove(&id);
157+
debug!("Removed disconnected subscriber: {}", id);
158+
}
159+
}
160+
}
161+
162+
notify.notified().await;
163+
}
163164
});
164165
}
165166

@@ -172,15 +173,9 @@ impl Topic {
172173
pub async fn publish_with_mode(&self, message: Arc<Message>, mode: DeliveryMode) {
173174
match mode {
174175
DeliveryMode::Ordered => {
175-
if (self.input_tx.send_async(message).await).is_err() {
176-
tracing::warn!(
177-
"Topic '{}' dropped message: input channel closed",
178-
self.name
179-
);
180-
}
176+
push_with_backpressure(&self.ingress_queue, &self.ingress_notify, message).await;
181177
}
182178
DeliveryMode::Parallel => {
183-
// Future: support alternate path for parallel fanout logic
184179
tracing::warn!(
185180
"Parallel delivery mode not yet supported for topic '{}'",
186181
self.name
@@ -192,10 +187,8 @@ impl Topic {
192187
/// Registers a subscriber.
193188
pub fn subscribe(&self, subscriber: Subscriber, _capacity: usize) {
194189
let subscriber_id = subscriber.id().clone();
195-
// Insert into global registry
196190
self.subscribers
197191
.insert(subscriber_id.clone(), subscriber.clone());
198-
// Insert into shard
199192
let idx = self.shard_index(&subscriber_id);
200193
self.shards[idx].subs.insert(subscriber_id, subscriber);
201194
}
@@ -220,3 +213,40 @@ impl Topic {
220213
h % n
221214
}
222215
}
216+
217+
impl fmt::Debug for Topic {
218+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219+
f.debug_struct("Topic")
220+
.field("name", &self.name)
221+
.field("subscribers", &self.subscribers.len())
222+
.field("shards", &self.shards.len())
223+
.finish()
224+
}
225+
}
226+
227+
impl fmt::Debug for Shard {
228+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
229+
f.debug_struct("Shard")
230+
.field("subscribers", &self.subs.len())
231+
.finish()
232+
}
233+
}
234+
235+
async fn push_with_backpressure<T>(queue: &Arc<MpmcQueue<T>>, notify: &Arc<Notify>, mut item: T)
236+
where
237+
T: Send,
238+
{
239+
let mut backoff = AdaptiveYield::new();
240+
loop {
241+
match queue.try_enqueue(item) {
242+
Ok(_) => {
243+
notify.notify_one();
244+
break;
245+
}
246+
Err(returned) => {
247+
item = returned;
248+
backoff.snooze().await;
249+
}
250+
}
251+
}
252+
}

src/util/backoff.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
//! Adaptive spin-yield backoff utilities optimized for async runtimes.
2+
//!
3+
//! These helpers provide a lightweight way to apply pressure-aware backoff
4+
//! without immediately yielding to the scheduler. They are ideal when
5+
//! interacting with lock-free data structures where transient contention is
6+
//! expected.
7+
8+
use tokio::task::yield_now;
9+
10+
/// Adaptive backoff that starts with CPU spins before yielding to the runtime.
11+
#[derive(Debug, Default)]
12+
pub struct AdaptiveYield {
13+
spins: u32,
14+
}
15+
16+
impl AdaptiveYield {
17+
/// Creates a new adaptive backoff helper.
18+
pub const fn new() -> Self {
19+
Self { spins: 0 }
20+
}
21+
22+
/// Perform the next backoff step.
23+
///
24+
/// The strategy is:
25+
/// - For the first few invocations, spin with exponential backoff.
26+
/// - After the spin budget is exhausted, yield to the async scheduler.
27+
pub async fn snooze(&mut self) {
28+
if self.spins < 6 {
29+
let spins = 1 << self.spins;
30+
for _ in 0..spins {
31+
std::hint::spin_loop();
32+
}
33+
self.spins += 1;
34+
} else {
35+
yield_now().await;
36+
self.spins = 0;
37+
}
38+
}
39+
}

src/util/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod affinity;
2+
pub mod backoff;

0 commit comments

Comments
 (0)