Skip to content

Commit f3187a0

Browse files
committed
instrument spmc & mpsc channels
1 parent 32a1d6b commit f3187a0

File tree

2 files changed

+83
-0
lines changed

2 files changed

+83
-0
lines changed

pulsebeam-runtime/src/sync/mpsc.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ pub struct Receiver<T> {
102102
next_seq: u64,
103103
local_head: u64,
104104
listener: Option<EventListener>,
105+
pkts_received: u64,
105106
}
106107

107108
impl<T> Drop for Receiver<T> {
@@ -111,6 +112,8 @@ impl<T> Drop for Receiver<T> {
111112
}
112113

113114
impl<T: Clone> Receiver<T> {
115+
const METRIC_FLUSH_MASK: u64 = 1023;
116+
114117
pub async fn recv(&mut self) -> Result<T, RecvError> {
115118
std::future::poll_fn(|cx| self.poll_recv(cx)).await
116119
}
@@ -152,20 +155,39 @@ impl<T: Clone> Receiver<T> {
152155

153156
if slot_seq != self.next_seq {
154157
self.next_seq = self.local_head;
158+
metrics::counter!("mpsc_receive_lag_total").increment(1);
155159
return Poll::Ready(Err(RecvError::Lagged(self.local_head)));
156160
}
157161

158162
if let Some(ref v) = slot.val {
159163
coop.made_progress();
160164
let out = v.clone();
161165
self.next_seq += 1;
166+
self.pkts_received += 1;
167+
168+
if (self.pkts_received & Self::METRIC_FLUSH_MASK) == 0 {
169+
drop(slot);
170+
self.flush_metrics();
171+
}
162172
return Poll::Ready(Ok(out));
163173
}
164174

165175
self.next_seq = self.local_head;
176+
metrics::counter!("mpsc_receive_lag_total").increment(1);
166177
return Poll::Ready(Err(RecvError::Lagged(self.local_head)));
167178
}
168179
}
180+
181+
fn flush_metrics(&mut self) {
182+
// Use the existing local_head snapshot.
183+
// This tells us how much is still left in the current "batch".
184+
let current_drift = self.local_head.saturating_sub(self.next_seq);
185+
let capacity = (self.ring.mask + 1) as f64;
186+
187+
metrics::histogram!("mpsc_receive_drift_ratio").record(current_drift as f64 / capacity);
188+
metrics::counter!("mpsc_receive_throughput_total").increment(self.pkts_received);
189+
self.pkts_received = 0;
190+
}
169191
}
170192

171193
impl<T: Clone> Stream for Receiver<T> {
@@ -188,11 +210,29 @@ impl<T: Clone> Clone for Receiver<T> {
188210
next_seq: self.next_seq,
189211
local_head: self.local_head,
190212
listener: None,
213+
pkts_received: 0,
191214
}
192215
}
193216
}
194217

195218
pub fn channel<T: Send + Sync + Clone + 'static>(capacity: usize) -> (Sender<T>, Receiver<T>) {
219+
metrics::describe_histogram!(
220+
"mpsc_receive_drift_ratio",
221+
"The ratio of the buffer capacity currently occupied by unread packets \
222+
at the moment of processing. A value of 1.0 indicates the receiver is \
223+
about to be overwritten (lagged)."
224+
);
225+
226+
metrics::describe_counter!(
227+
"mpsc_receive_throughput_total",
228+
"The total number of packets successfully delivered across all mpsc channels."
229+
);
230+
231+
metrics::describe_counter!(
232+
"mpsc_receive_lag_total",
233+
"The total number of times a receiver was too slow and was overwritten by \
234+
the producer, resulting in dropped data."
235+
);
196236
let ring = Ring::new(capacity);
197237
(
198238
Sender { ring: ring.clone() },
@@ -201,6 +241,7 @@ pub fn channel<T: Send + Sync + Clone + 'static>(capacity: usize) -> (Sender<T>,
201241
next_seq: 0,
202242
local_head: 0,
203243
listener: None,
244+
pkts_received: 0,
204245
},
205246
)
206247
}

pulsebeam-runtime/src/sync/spmc.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,12 @@ pub struct Receiver<T> {
100100
next_seq: u64,
101101
local_head: u64,
102102
listener: Option<EventListener>,
103+
pkts_received: u64,
103104
}
104105

105106
impl<T: Clone> Receiver<T> {
107+
const METRIC_FLUSH_MASK: u64 = 1023;
108+
106109
pub fn reset(&mut self) {
107110
// Half cap to give a chance to load from cache while not too close
108111
// to tail to cause a lag error.
@@ -167,23 +170,43 @@ impl<T: Clone> Receiver<T> {
167170
// Seq mismatch — producer overwrote after head snapshot
168171
if slot_seq != self.next_seq {
169172
self.next_seq = self.local_head;
173+
metrics::counter!("spmc_receive_lag_total").increment(1);
170174
return Poll::Ready(Err(RecvError::Lagged(self.local_head)));
171175
}
172176

173177
// Valid message
174178
if let Some(v) = &slot.val {
175179
let out = v.clone();
176180
coop.made_progress();
181+
182+
self.pkts_received += 1;
177183
self.next_seq += 1;
184+
185+
if (self.pkts_received & Self::METRIC_FLUSH_MASK) == 0 {
186+
drop(slot);
187+
self.flush_metrics();
188+
}
178189
return Poll::Ready(Ok(out));
179190
}
180191

181192
// This shouldn't never happen, but just in case..
182193
// Seq was correct but value missing — treat as lag
183194
self.next_seq = self.local_head;
195+
metrics::counter!("spmc_receive_lag_total").increment(1);
184196
return Poll::Ready(Err(RecvError::Lagged(self.local_head)));
185197
}
186198
}
199+
200+
fn flush_metrics(&mut self) {
201+
// Use the existing local_head snapshot.
202+
// This tells us how much is still left in the current "batch".
203+
let current_drift = self.local_head.saturating_sub(self.next_seq);
204+
let capacity = (self.ring.mask + 1) as f64;
205+
206+
metrics::histogram!("spmc_receive_drift_ratio").record(current_drift as f64 / capacity);
207+
metrics::counter!("spmc_receive_throughput_total").increment(self.pkts_received);
208+
self.pkts_received = 0;
209+
}
187210
}
188211

189212
impl<T: Clone> Stream for Receiver<T> {
@@ -208,11 +231,29 @@ impl<T: Clone> Clone for Receiver<T> {
208231
next_seq: self.next_seq,
209232
local_head: self.local_head,
210233
listener: None,
234+
pkts_received: 0,
211235
}
212236
}
213237
}
214238

215239
pub fn channel<T: Send + Sync + Clone + 'static>(capacity: usize) -> (Sender<T>, Receiver<T>) {
240+
metrics::describe_histogram!(
241+
"spmc_receive_drift_ratio",
242+
"The ratio of the buffer capacity currently occupied by unread packets \
243+
at the moment of processing. A value of 1.0 indicates the receiver is \
244+
about to be overwritten (lagged)."
245+
);
246+
247+
metrics::describe_counter!(
248+
"spmc_receive_throughput_total",
249+
"The total number of packets successfully delivered across all SPMC channels."
250+
);
251+
252+
metrics::describe_counter!(
253+
"spmc_receive_lag_total",
254+
"The total number of times a receiver was too slow and was overwritten by \
255+
the producer, resulting in dropped data."
256+
);
216257
let ring = Ring::new(capacity);
217258
(
218259
Sender {
@@ -224,6 +265,7 @@ pub fn channel<T: Send + Sync + Clone + 'static>(capacity: usize) -> (Sender<T>,
224265
next_seq: 0,
225266
local_head: 0,
226267
listener: None,
268+
pkts_received: 0,
227269
},
228270
)
229271
}

0 commit comments

Comments
 (0)