@@ -171,6 +171,18 @@ impl<T: Message> Consumer<T> {
171171 /// Releases consumed messages to the producer without waking the producer
172172 #[ inline]
173173 pub fn release_no_wake ( & mut self , release_len : u32 ) {
174+ if release_len == 0 {
175+ return ;
176+ }
177+
178+ debug_assert ! (
179+ release_len <= self . cursor. cached_consumer_len( ) ,
180+ "cannot release more messages than acquired"
181+ ) ;
182+ unsafe {
183+ sync_ring_regions :: < _ , false > ( & self . cursor , release_len, replicate_payload_len) ;
184+ }
185+
174186 self . cursor . release_consumer ( release_len) ;
175187 }
176188
@@ -260,7 +272,7 @@ impl<T: Message> Producer<T> {
260272 self . wake ( ) ;
261273 }
262274
263- /// Releases consumed messages to the producer without waking the producer
275+ /// Releases consumed messages to the consumer without waking the consumer
264276 #[ inline]
265277 pub fn release_no_wake ( & mut self , release_len : u32 ) {
266278 if release_len == 0 {
@@ -272,58 +284,8 @@ impl<T: Message> Producer<T> {
272284 "cannot release more messages than acquired"
273285 ) ;
274286
275- let idx = self . cursor . cached_producer ( ) ;
276- let ring_size = self . cursor . capacity ( ) ;
277-
278- // replicate any written items to the secondary region
279287 unsafe {
280- assume ! ( ring_size > idx, "idx should never exceed the ring size" ) ;
281-
282- // calculate the maximum number of replications we need to perform for the primary ->
283- // secondary
284- let max_possible_replications = ring_size - idx;
285- // the replication count should exceed the number that we're releasing
286- let replication_count = max_possible_replications. min ( release_len) ;
287-
288- assume ! (
289- replication_count != 0 ,
290- "we should always be releasing at least 1 item"
291- ) ;
292-
293- // calculate the data pointer based on the current message index
294- let primary = self . cursor . data_ptr ( ) . as_ptr ( ) . add ( idx as _ ) ;
295- // add the size of the ring to the primary pointer to get into the secondary message
296- let secondary = primary. add ( ring_size as _ ) ;
297-
298- // copy the primary into the secondary
299- self . replicate ( primary, secondary, replication_count as _ ) ;
300-
301- // if messages were also written to the secondary region, we need to copy them back to the
302- // primary region
303- assume ! (
304- idx. checked_add( release_len) . is_some( ) ,
305- "overflow amount should not exceed u32::MAX"
306- ) ;
307- assume ! (
308- idx + release_len < ring_size * 2 ,
309- "overflow amount should not extend beyond the secondary replica"
310- ) ;
311-
312- let overflow_amount = ( idx + release_len) . checked_sub ( ring_size) . filter ( |v| {
313- // we didn't overflow if the count is 0
314- * v > 0
315- } ) ;
316-
317- if let Some ( replication_count) = overflow_amount {
318- // secondary -> primary replication always happens at the beginning of the data
319- let primary = self . cursor . data_ptr ( ) . as_ptr ( ) ;
320- // add the size of the ring to the primary pointer to get into the secondary
321- // message
322- let secondary = primary. add ( ring_size as _ ) ;
323-
324- // copy the secondary into the primary
325- self . replicate ( secondary, primary, replication_count as _ ) ;
326- }
288+ sync_ring_regions :: < _ , true > ( & self . cursor , release_len, replicate) ;
327289 }
328290
329291 // finally release the len to the consumer
@@ -352,22 +314,108 @@ impl<T: Message> Producer<T> {
352314 pub fn is_open ( & self ) -> bool {
353315 self . wakers . is_open ( )
354316 }
317+ }
355318
356- /// Replicates messages from the primary to secondary memory regions
357- #[ inline]
358- unsafe fn replicate ( & self , primary : * mut T , secondary : * mut T , len : usize ) {
359- debug_assert_ne ! ( len, 0 ) ;
360-
361- #[ cfg( debug_assertions) ]
362- {
363- let primary = core:: slice:: from_raw_parts ( primary, len as _ ) ;
364- let secondary = core:: slice:: from_raw_parts ( secondary, len as _ ) ;
365- for ( primary, secondary) in primary. iter ( ) . zip ( secondary) {
366- T :: validate_replication ( primary, secondary) ;
367- }
319+ /// Copies messages from the primary to secondary memory regions
320+ #[ inline]
321+ unsafe fn replicate < T : Message > ( src : * mut T , dest : * mut T , len : usize ) {
322+ debug_assert_ne ! ( len, 0 ) ;
323+
324+ #[ cfg( debug_assertions) ]
325+ {
326+ let src_slice = core:: slice:: from_raw_parts ( src, len as _ ) ;
327+ let dest_slice = core:: slice:: from_raw_parts ( dest, len as _ ) ;
328+ for ( src_message, dest_message) in src_slice. iter ( ) . zip ( dest_slice) {
329+ T :: validate_replication ( src_message, dest_message) ;
368330 }
331+ }
332+
333+ core:: ptr:: copy_nonoverlapping ( src, dest, len as _ ) ;
334+ }
335+
336+ #[ inline]
337+ unsafe fn replicate_payload_len < T : Message > ( src : * mut T , dest : * mut T , len : usize ) {
338+ let src_slice = core:: slice:: from_raw_parts_mut ( src, len as _ ) ;
339+ let dest_slice = core:: slice:: from_raw_parts_mut ( dest, len as _ ) ;
340+ for ( src_message, dest_message) in src_slice. iter_mut ( ) . zip ( dest_slice) {
341+ dest_message. set_payload_len ( src_message. payload_len ( ) ) ;
342+ }
343+ }
369344
370- core:: ptr:: copy_nonoverlapping ( primary, secondary, len as _ ) ;
345+ /// Synchronizes data between primary and secondary regions of the ring buffer.
346+ ///
347+ /// The ring buffer is divided into two equal regions to ensure contiguous reads.
348+ /// When data is written to one region, it needs to be replicated to maintain
349+ /// consistency:
350+ ///
351+ /// * Data written to the primary region is copied to the corresponding location
352+ /// in the secondary region
353+ /// * If the write wraps around the end of the primary region, the wrapped portion
354+ /// from the secondary region is copied back to the start of the primary region
355+ ///
356+ /// The `PRODUCER` const generic parameter determines whether this is being called
357+ /// from the producer (writing) or consumer (reading) side of the ring.
358+ unsafe fn sync_ring_regions < T : Message , const PRODUCER : bool > (
359+ cursor : & Cursor < T > ,
360+ release_len : u32 ,
361+ f : unsafe fn ( src : * mut T , dest : * mut T , len : usize ) ,
362+ ) {
363+ let idx = if PRODUCER {
364+ cursor. cached_producer ( )
365+ } else {
366+ cursor. cached_consumer ( )
367+ } ;
368+
369+ let ring_size = cursor. capacity ( ) ;
370+
371+ // replicate any written items to the secondary region
372+
373+ assume ! ( ring_size > idx, "idx should never exceed the ring size" ) ;
374+
375+ // calculate the maximum number of replications we need to perform for the primary ->
376+ // secondary
377+ let max_possible_replications = ring_size - idx;
378+ // the replication count should exceed the number that we're releasing
379+ let replication_count = max_possible_replications. min ( release_len) ;
380+
381+ assume ! (
382+ replication_count != 0 ,
383+ "we should always be releasing at least 1 item"
384+ ) ;
385+
386+ // calculate the data pointer based on the current message index
387+ let primary = cursor. data_ptr ( ) . as_ptr ( ) . add ( idx as _ ) ;
388+ // add the size of the ring to the primary pointer to get into the secondary message
389+ let secondary = primary. add ( ring_size as _ ) ;
390+
391+ // copy the primary into the secondary
392+ f ( primary, secondary, replication_count as _ ) ;
393+
394+ // if messages were also written to the secondary region, we need to copy them back to the
395+ // primary region
396+ assume ! (
397+ idx. checked_add( release_len) . is_some( ) ,
398+ "overflow amount should not exceed u32::MAX"
399+ ) ;
400+ assume ! (
401+ idx + release_len < ring_size * 2 ,
402+ "overflow amount should not extend beyond the secondary replica"
403+ ) ;
404+
405+ let overflow_amount = ( idx + release_len) . checked_sub ( ring_size) . filter ( |v| {
406+ // we didn't overflow if the count is 0
407+ * v > 0
408+ } ) ;
409+
410+ if let Some ( replication_count) = overflow_amount {
411+ // secondary -> primary replication always happens at the beginning of the data
412+ let primary = cursor. data_ptr ( ) . as_ptr ( ) ;
413+ // add the size of the ring to the primary pointer to get into the secondary
414+ // message
415+ let secondary = primary. add ( ring_size as _ ) ;
416+
417+ // copy the secondary into the primary
418+ f ( secondary, primary, replication_count as _ ) ;
371419 }
372420}
373421
@@ -528,4 +576,50 @@ mod tests {
528576 send_recv_test ! ( msg_send_recv, crate :: message:: msg:: Message ) ;
529577 #[ cfg( s2n_quic_platform_socket_mmsg) ]
530578 send_recv_test ! ( mmsg_send_recv, crate :: message:: mmsg:: Message ) ;
579+
580+ macro_rules! consumer_modifications_test {
581+ ( $name: ident, $msg: ty) => {
582+ #[ test]
583+ fn $name( ) {
584+ check!( ) . with_type:: <u32 >( ) . for_each( |& count| {
585+ let entries = if cfg!( kani) { 2 } else { 16 } ;
586+ let payload_len = if cfg!( kani) { 2 } else { 128 } ;
587+ let count = count % entries;
588+
589+ let ( mut producer, mut consumer) = pair:: <$msg>( entries, payload_len) ;
590+
591+ // Producer writes to the ring buffer
592+ producer. acquire( u32 :: MAX ) ;
593+ for entry in & mut producer. data( ) [ ..count as usize ] {
594+ unsafe {
595+ entry. set_payload_len( 100 ) ;
596+ }
597+ }
598+ producer. release( count) ;
599+
600+ // Consumer reads and resets the payload
601+ let count = consumer. acquire( u32 :: MAX ) ;
602+ for entry in & mut consumer. data( ) [ ..count as usize ] {
603+ unsafe {
604+ entry. reset( payload_len as usize ) ;
605+ }
606+ }
607+ consumer. release( count) ;
608+
609+ // Verify modifications seen by producer for reuse
610+ producer. acquire( u32 :: MAX ) ;
611+ let s = producer. data( ) ;
612+ for entry in s {
613+ assert_eq!( entry. payload_len( ) , payload_len as usize ) ;
614+ }
615+ } ) ;
616+ }
617+ } ;
618+ }
619+
620+ consumer_modifications_test ! ( simple_rx_modifications, crate :: message:: simple:: Message ) ;
621+ #[ cfg( s2n_quic_platform_socket_msg) ]
622+ consumer_modifications_test ! ( msg_rx_modifications, crate :: message:: msg:: Message ) ;
623+ #[ cfg( s2n_quic_platform_socket_mmsg) ]
624+ consumer_modifications_test ! ( mmsg_rx_modifications, crate :: message:: mmsg:: Message ) ;
531625}
0 commit comments