3131//! - Throughput reported in messages/second
3232
3333use criterion:: { BenchmarkId , Criterion , Throughput , black_box, criterion_group, criterion_main} ;
34+ use std:: collections:: HashMap ;
3435use std:: sync:: atomic:: { AtomicBool , AtomicU64 , Ordering } ;
3536use std:: sync:: { Arc , Barrier } ;
3637use std:: thread;
@@ -63,6 +64,26 @@ impl Params {
6364 }
6465}
6566
67+ /// Simple exponential backoff: spin a little before yielding to keep threads resident.
68+ fn backoff ( pow : & mut u8 ) {
69+ if * pow < 6 {
70+ let spins = 1 << * pow;
71+ for _ in 0 ..spins {
72+ std:: hint:: spin_loop ( ) ;
73+ }
74+ * pow += 1 ;
75+ } else {
76+ thread:: yield_now ( ) ;
77+ }
78+ }
79+
80+ #[ derive( Debug , Clone ) ]
81+ struct CoreInfo {
82+ id : usize ,
83+ package_id : Option < u32 > ,
84+ core_id : Option < u32 > ,
85+ }
86+
6687/// Find two distinct CPU cores to pin threads to, preferring physical cores
6788/// Returns (producer_core_id, consumer_core_id)
6889fn select_cores ( ) -> ( usize , usize ) {
@@ -72,14 +93,61 @@ fn select_cores() -> (usize, usize) {
7293 panic ! ( "Need at least 2 CPU cores for SPSC benchmark" ) ;
7394 }
7495
75- // Try to select cores that are not SMT siblings
76- // For simplicity, pick first and middle core
77- let producer_core = 0 ;
78- let consumer_core = core_ids. len ( ) / 2 ;
96+ let core_info = core_ids
97+ . iter ( )
98+ . map ( |cid| CoreInfo {
99+ id : cid. id ,
100+ package_id : read_topology_value ( cid. id , "physical_package_id" ) ,
101+ core_id : read_topology_value ( cid. id , "core_id" ) ,
102+ } )
103+ . collect :: < Vec < _ > > ( ) ;
104+
105+ if let Some ( ( a, b) ) = pick_nonsmt_same_package ( & core_info) {
106+ return ( a, b) ;
107+ }
108+
109+ // Fallback: pick first and middle core
110+ let producer_core = core_info[ 0 ] . id ;
111+ let consumer_core = core_info[ core_info. len ( ) / 2 ] . id ;
79112
80113 ( producer_core, consumer_core)
81114}
82115
116+ fn pick_nonsmt_same_package ( core_info : & [ CoreInfo ] ) -> Option < ( usize , usize ) > {
117+ let mut by_pkg: HashMap < u32 , Vec < & CoreInfo > > = HashMap :: new ( ) ;
118+
119+ for info in core_info {
120+ if let Some ( pkg) = info. package_id {
121+ by_pkg. entry ( pkg) . or_default ( ) . push ( info) ;
122+ }
123+ }
124+
125+ for infos in by_pkg. values ( ) {
126+ if infos. len ( ) < 2 {
127+ continue ;
128+ }
129+
130+ for ( i, a) in infos. iter ( ) . enumerate ( ) {
131+ for b in infos. iter ( ) . skip ( i + 1 ) {
132+ if let ( Some ( core_a) , Some ( core_b) ) = ( a. core_id , b. core_id )
133+ && ( core_a == core_b)
134+ {
135+ continue ;
136+ }
137+ return Some ( ( a. id , b. id ) ) ;
138+ }
139+ }
140+ }
141+
142+ None
143+ }
144+
145+ fn read_topology_value ( cpu_id : usize , file : & str ) -> Option < u32 > {
146+ let path = format ! ( "/sys/devices/system/cpu/cpu{cpu_id}/topology/{file}" ) ;
147+ let contents = std:: fs:: read_to_string ( path) . ok ( ) ?;
148+ contents. trim ( ) . parse ( ) . ok ( )
149+ }
150+
83151/// Run a single SPSC sample for a fixed duration
84152/// Returns (elapsed_duration, items_processed)
85153fn run_spsc_sample (
@@ -89,9 +157,10 @@ fn run_spsc_sample(
89157 consumer_core : usize ,
90158) -> ( Duration , u64 ) {
91159 // Barrier to synchronize thread start
92- let barrier = Arc :: new ( Barrier :: new ( 2 ) ) ;
160+ let barrier = Arc :: new ( Barrier :: new ( 3 ) ) ;
93161 let barrier_producer = Arc :: clone ( & barrier) ;
94162 let barrier_consumer = Arc :: clone ( & barrier) ;
163+ let barrier_timer = Arc :: clone ( & barrier) ;
95164
96165 // Stop flag to signal threads to terminate
97166 let stop = Arc :: new ( AtomicBool :: new ( false ) ) ;
@@ -132,6 +201,7 @@ fn run_spsc_sample(
132201 barrier_consumer. wait ( ) ;
133202
134203 let mut local_count = 0u64 ;
204+ let mut backoff_pow = 0 ;
135205
136206 while !stop_consumer. load ( Ordering :: Relaxed ) {
137207 if params. batch_size == 1 {
@@ -143,9 +213,10 @@ fn run_spsc_sample(
143213 . mark_slot_consumed ( consume_slot)
144214 . expect ( "Failed to mark slot consumed" ) ;
145215 local_count += 1 ;
216+ backoff_pow = 0 ;
146217 }
147218 Err ( YCQueueError :: EmptyQueue ) | Err ( YCQueueError :: SlotNotReady ) => {
148- thread :: yield_now ( ) ;
219+ backoff ( & mut backoff_pow ) ;
149220 }
150221 Err ( e) => panic ! ( "Consumer error: {:?}" , e) ,
151222 }
@@ -160,9 +231,10 @@ fn run_spsc_sample(
160231 consumer_queue
161232 . mark_slots_consumed ( slots)
162233 . expect ( "Failed to mark slots consumed" ) ;
234+ backoff_pow = 0 ;
163235 }
164236 Err ( YCQueueError :: EmptyQueue ) | Err ( YCQueueError :: SlotNotReady ) => {
165- thread :: yield_now ( ) ;
237+ backoff ( & mut backoff_pow ) ;
166238 }
167239 Err ( e) => panic ! ( "Consumer error: {:?}" , e) ,
168240 }
@@ -187,6 +259,7 @@ fn run_spsc_sample(
187259 barrier_producer. wait ( ) ;
188260
189261 let mut local_count = 0u64 ;
262+ let mut backoff_pow = 0 ;
190263
191264 while !stop_producer. load ( Ordering :: Relaxed ) {
192265 if params. batch_size == 1 {
@@ -198,9 +271,10 @@ fn run_spsc_sample(
198271 . mark_slot_produced ( produce_slot)
199272 . expect ( "Failed to mark slot produced" ) ;
200273 local_count += 1 ;
274+ backoff_pow = 0 ;
201275 }
202276 Err ( YCQueueError :: OutOfSpace ) | Err ( YCQueueError :: SlotNotReady ) => {
203- thread :: yield_now ( ) ;
277+ backoff ( & mut backoff_pow ) ;
204278 }
205279 Err ( e) => panic ! ( "Producer error: {:?}" , e) ,
206280 }
@@ -215,9 +289,10 @@ fn run_spsc_sample(
215289 producer_queue
216290 . mark_slots_produced ( slots)
217291 . expect ( "Failed to mark slots produced" ) ;
292+ backoff_pow = 0 ;
218293 }
219294 Err ( YCQueueError :: OutOfSpace ) | Err ( YCQueueError :: SlotNotReady ) => {
220- thread :: yield_now ( ) ;
295+ backoff ( & mut backoff_pow ) ;
221296 }
222297 Err ( e) => panic ! ( "Producer error: {:?}" , e) ,
223298 }
@@ -230,10 +305,9 @@ fn run_spsc_sample(
230305 // Timer thread - wait for sample_duration then signal stop
231306 let timer_handle = s. spawn ( move || {
232307 // Wait for both threads to be ready
233- // Small sleep to ensure threads have started
234- thread:: sleep ( Duration :: from_millis ( 1 ) ) ;
308+ barrier_timer. wait ( ) ;
235309
236- // Record start time
310+ // Record start time exactly when all threads release
237311 let start = Instant :: now ( ) ;
238312 {
239313 let mut st = start_time_timer. lock ( ) . unwrap ( ) ;
0 commit comments