File tree Expand file tree Collapse file tree 2 files changed +36
-10
lines changed
pulsebeam-runtime/src/sync Expand file tree Collapse file tree 2 files changed +36
-10
lines changed Original file line number Diff line number Diff line change @@ -78,9 +78,11 @@ impl<T> Sender<T> {
7878 let seq = self . ring . head . fetch_add ( 1 , Ordering :: AcqRel ) ;
7979 let idx = ( seq as usize ) & self . ring . mask ;
8080
81- let mut slot = self . ring . slots [ idx] . lock ( ) ;
82- slot. val = Some ( val) ;
83- slot. seq = seq;
81+ {
82+ let mut slot = self . ring . slots [ idx] . lock ( ) ;
83+ slot. val = Some ( val) ;
84+ slot. seq = seq;
85+ }
8486
8587 // there's only 1 consumer
8688 self . ring . event . notify ( 1 ) ;
@@ -154,9 +156,21 @@ impl<T: Clone> Receiver<T> {
154156 let slot_seq = slot. seq ;
155157
156158 if slot_seq != self . next_seq {
157- self . next_seq = self . local_head ;
158- metrics:: counter!( "mpsc_receive_lag_total" ) . increment ( 1 ) ;
159- return Poll :: Ready ( Err ( RecvError :: Lagged ( self . local_head ) ) ) ;
159+ // Sanity check: seq should be ahead of us if we lagged
160+ let lagged = slot. seq > self . next_seq ;
161+ drop ( slot) ;
162+
163+ if lagged {
164+ self . next_seq = self . local_head ;
165+ metrics:: counter!( "mpsc_receive_lag_total" ) . increment ( 1 ) ;
166+ return Poll :: Ready ( Err ( RecvError :: Lagged ( self . local_head ) ) ) ;
167+ } else {
168+ // Stale slot, producer hasn't reached here yet
169+ if self . listener . is_none ( ) {
170+ self . listener = Some ( self . ring . event . listen ( ) ) ;
171+ }
172+ return Poll :: Pending ;
173+ }
160174 }
161175
162176 if let Some ( ref v) = slot. val {
Original file line number Diff line number Diff line change @@ -169,9 +169,21 @@ impl<T: Clone> Receiver<T> {
169169
170170 // Seq mismatch — producer overwrote after head snapshot
171171 if slot_seq != self . next_seq {
172- self . next_seq = self . local_head ;
173- metrics:: counter!( "spmc_receive_lag_total" ) . increment ( 1 ) ;
174- return Poll :: Ready ( Err ( RecvError :: Lagged ( self . local_head ) ) ) ;
172+ // Sanity check: seq should be ahead of us if we lagged
173+ let lagged = slot. seq > self . next_seq ;
174+ drop ( slot) ;
175+
176+ if lagged {
177+ self . next_seq = self . local_head ;
178+ metrics:: counter!( "spmc_receive_lag_total" ) . increment ( 1 ) ;
179+ return Poll :: Ready ( Err ( RecvError :: Lagged ( self . local_head ) ) ) ;
180+ } else {
181+ // Stale slot, producer hasn't reached here yet
182+ if self . listener . is_none ( ) {
183+ self . listener = Some ( self . ring . event . listen ( ) ) ;
184+ }
185+ return Poll :: Pending ;
186+ }
175187 }
176188
177189 // Valid message
@@ -189,7 +201,7 @@ impl<T: Clone> Receiver<T> {
189201 return Poll :: Ready ( Ok ( out) ) ;
190202 }
191203
192- // This shouldn't never happen, but just in case..
204+ // This shouldn't ever happen, but just in case..
193205 // Seq was correct but value missing — treat as lag
194206 self . next_seq = self . local_head ;
195207 metrics:: counter!( "spmc_receive_lag_total" ) . increment ( 1 ) ;
You can’t perform that action at this time.
0 commit comments