@@ -14,21 +14,23 @@ use std::ops::DerefMut;
14
14
use std:: sync:: Arc ;
15
15
use std:: sync:: Mutex ;
16
16
17
+ use dashmap:: DashMap ;
17
18
use tokio:: sync:: mpsc;
18
19
use tokio:: sync:: mpsc:: error:: SendError ;
20
+ use uuid:: Uuid ;
19
21
20
- use crate :: dashmap :: DashMap ;
22
+ use crate :: ActorId ;
21
23
22
24
/// A client's re-ordering buffer state.
23
25
struct BufferState < T > {
24
26
/// the last sequence number sent to receiver for this client. seq starts
25
27
/// with 1 and 0 mean no message has been sent.
26
- last_seq : usize ,
28
+ last_seq : u64 ,
27
29
/// Buffer out-of-order messages in order to ensures messages are delivered
28
30
/// strictly in per-client sequence order.
29
31
///
30
32
/// Map's key is seq_no, value is msg.
31
- buffer : HashMap < usize , T > ,
33
+ buffer : HashMap < u64 , T > ,
32
34
}
33
35
34
36
impl < T > Default for BufferState < T > {
@@ -43,9 +45,8 @@ impl<T> Default for BufferState<T> {
43
45
/// A sender that ensures messages are delivered in per-client sequence order.
44
46
pub ( crate ) struct OrderedSender < T > {
45
47
tx : mpsc:: UnboundedSender < T > ,
46
- // map's key is name client which sens messages through this channel. Map's
47
- // value is the buffer state of that client.
48
- states : Arc < DashMap < String , Arc < Mutex < BufferState < T > > > > > ,
48
+ /// Map's key is session ID, and value is the buffer state of that session.
49
+ states : Arc < DashMap < Uuid , Arc < Mutex < BufferState < T > > > > > ,
49
50
pub ( crate ) enable_buffering : bool ,
50
51
/// The identify of this object, which is used to distiguish it in debugging.
51
52
log_id : String ,
@@ -98,8 +99,8 @@ impl<T> OrderedSender<T> {
98
99
/// * calls from different clients will be executed concurrently.
99
100
pub ( crate ) fn send (
100
101
& self ,
101
- client : String ,
102
- seq_no : usize ,
102
+ session_id : Uuid ,
103
+ seq_no : u64 ,
103
104
msg : T ,
104
105
) -> Result < ( ) , OrderedSenderError < T > > {
105
106
use std:: cmp:: Ordering ;
@@ -109,25 +110,17 @@ impl<T> OrderedSender<T> {
109
110
return Err ( OrderedSenderError :: InvalidZeroSeq ( msg) ) ;
110
111
}
111
112
112
- // Make sure only this client's state is locked, not all states.
113
- let state = match self . states . get ( & client) {
114
- Some ( state) => state. value ( ) . clone ( ) ,
115
- None => self
116
- . states
117
- . entry ( client. clone ( ) )
118
- . or_default ( )
119
- . value ( )
120
- . clone ( ) ,
121
- } ;
113
+ // Make sure only this session's state is locked, not all states.
114
+ let state = self . states . entry ( session_id) . or_default ( ) . value ( ) . clone ( ) ;
122
115
let mut state_guard = state. lock ( ) . unwrap ( ) ;
123
116
let BufferState { last_seq, buffer } = state_guard. deref_mut ( ) ;
124
117
125
118
match seq_no. cmp ( & ( * last_seq + 1 ) ) {
126
119
Ordering :: Less => {
127
120
tracing:: warn!(
128
- "{} duplicate message from {} with seq no: {}" ,
121
+ "{} duplicate message from session {} with seq no: {}" ,
129
122
self . log_id,
130
- client ,
123
+ session_id ,
131
124
seq_no,
132
125
) ;
133
126
}
@@ -176,9 +169,49 @@ impl<T> OrderedSender<T> {
176
169
}
177
170
}
178
171
172
+ /// Used by sender to track the message sequence numbers it sends to each actor.
173
+ /// Each [Sequencer] object has a session id, sequencer numbers are scoped by
174
+ /// the (session_id, destination_actor) pair.
175
+ #[ derive( Clone , Debug ) ]
176
+ pub struct Sequencer {
177
+ session_id : Uuid ,
178
+ // map's key is the destination actor's name, value is the last seq number
179
+ // sent to that actor.
180
+ last_seqs : Arc < Mutex < HashMap < ActorId , u64 > > > ,
181
+ }
182
+
183
+ impl Sequencer {
184
+ pub ( crate ) fn new ( session_id : Uuid ) -> Self {
185
+ Self {
186
+ session_id,
187
+ last_seqs : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
188
+ }
189
+ }
190
+
191
+ /// Assign the next seq for the given actor ID, mutate the sequencer with
192
+ /// the new seq, and return the new seq.
193
+ pub fn assign_seq ( & self , actor_id : & ActorId ) -> u64 {
194
+ let mut guard = self . last_seqs . lock ( ) . unwrap ( ) ;
195
+ let mut_ref = match guard. get_mut ( actor_id) {
196
+ Some ( m) => m,
197
+ None => guard. entry ( actor_id. clone ( ) ) . or_default ( ) ,
198
+ } ;
199
+ * mut_ref += 1 ;
200
+ * mut_ref
201
+ }
202
+
203
+ /// Id of the session this sequencer belongs to.
204
+ pub fn session_id ( & self ) -> Uuid {
205
+ self . session_id
206
+ }
207
+ }
208
+
179
209
#[ cfg( test) ]
180
210
mod tests {
211
+ use std:: sync:: Arc ;
212
+
181
213
use super :: * ;
214
+ use crate :: id;
182
215
183
216
fn drain_try_recv < T : std:: fmt:: Debug + Clone > ( rx : & mut mpsc:: UnboundedReceiver < T > ) -> Vec < T > {
184
217
let mut out = Vec :: new ( ) ;
@@ -190,26 +223,28 @@ mod tests {
190
223
191
224
#[ test]
192
225
fn test_ordered_channel_single_client_send_in_order ( ) {
193
- let ( tx, mut rx) = ordered_channel :: < usize > ( "test" . to_string ( ) , true ) ;
226
+ let session_id_a = Uuid :: now_v7 ( ) ;
227
+ let ( tx, mut rx) = ordered_channel :: < u64 > ( "test" . to_string ( ) , true ) ;
194
228
for s in 1 ..=10 {
195
- tx. send ( "A" . into ( ) , s, s) . unwrap ( ) ;
229
+ tx. send ( session_id_a , s, s) . unwrap ( ) ;
196
230
let got = drain_try_recv ( & mut rx) ;
197
231
assert_eq ! ( got, vec![ s] ) ;
198
232
}
199
233
}
200
234
201
235
#[ test]
202
236
fn test_ordered_channel_single_client_send_out_of_order ( ) {
203
- let ( tx, mut rx) = ordered_channel :: < usize > ( "test" . to_string ( ) , true ) ;
237
+ let session_id_a = Uuid :: now_v7 ( ) ;
238
+ let ( tx, mut rx) = ordered_channel :: < u64 > ( "test" . to_string ( ) , true ) ;
204
239
205
240
// Send 2 to 4 in descending order: all should buffer until 1 arrives.
206
241
for s in ( 2 ..=4 ) . rev ( ) {
207
- tx. send ( "A" . into ( ) , s, s) . unwrap ( ) ;
242
+ tx. send ( session_id_a , s, s) . unwrap ( ) ;
208
243
}
209
244
210
245
// Send 7 to 9 in descending order: all should buffer until 1 - 6 arrives.
211
246
for s in ( 7 ..=9 ) . rev ( ) {
212
- tx. send ( "A" . into ( ) , s, s) . unwrap ( ) ;
247
+ tx. send ( session_id_a , s, s) . unwrap ( ) ;
213
248
}
214
249
215
250
assert ! (
@@ -218,127 +253,175 @@ mod tests {
218
253
) ;
219
254
220
255
// Now send 1: should deliver 1 then flush 2 - 4.
221
- tx. send ( "A" . into ( ) , 1 , 1 ) . unwrap ( ) ;
256
+ tx. send ( session_id_a , 1 , 1 ) . unwrap ( ) ;
222
257
assert_eq ! ( drain_try_recv( & mut rx) , vec![ 1 , 2 , 3 , 4 ] ) ;
223
258
224
259
// Now send 5: should deliver immediately but not flush 7 - 9.
225
- tx. send ( "A" . into ( ) , 5 , 5 ) . unwrap ( ) ;
260
+ tx. send ( session_id_a , 5 , 5 ) . unwrap ( ) ;
226
261
assert_eq ! ( drain_try_recv( & mut rx) , vec![ 5 ] ) ;
227
262
228
263
// Now send 6: should deliver 6 then flush 7 - 9.
229
- tx. send ( "A" . into ( ) , 6 , 6 ) . unwrap ( ) ;
264
+ tx. send ( session_id_a , 6 , 6 ) . unwrap ( ) ;
230
265
assert_eq ! ( drain_try_recv( & mut rx) , vec![ 6 , 7 , 8 , 9 ] ) ;
231
266
232
267
// Send 10: should deliver immediately.
233
- tx. send ( "A" . into ( ) , 10 , 10 ) . unwrap ( ) ;
268
+ tx. send ( session_id_a , 10 , 10 ) . unwrap ( ) ;
234
269
let got = drain_try_recv ( & mut rx) ;
235
270
assert_eq ! ( got, vec![ 10 ] ) ;
236
271
}
237
272
238
273
#[ test]
239
274
fn test_ordered_channel_multi_clients ( ) {
240
- let ( tx, mut rx) = ordered_channel :: < ( String , usize ) > ( "test" . to_string ( ) , true ) ;
275
+ let session_id_a = Uuid :: now_v7 ( ) ;
276
+ let session_id_b = Uuid :: now_v7 ( ) ;
277
+ let ( tx, mut rx) = ordered_channel :: < ( Uuid , u64 ) > ( "test" . to_string ( ) , true ) ;
241
278
242
279
// A1 -> deliver
243
- tx. send ( "A" . into ( ) , 1 , ( "A" . into ( ) , 1 ) ) . unwrap ( ) ;
244
- assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( "A" . into ( ) , 1 ) ] ) ;
280
+ tx. send ( session_id_a , 1 , ( session_id_a , 1 ) ) . unwrap ( ) ;
281
+ assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( session_id_a , 1 ) ] ) ;
245
282
// B1 -> deliver
246
- tx. send ( "B" . into ( ) , 1 , ( "B" . into ( ) , 1 ) ) . unwrap ( ) ;
247
- assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( "B" . into ( ) , 1 ) ] ) ;
283
+ tx. send ( session_id_b , 1 , ( session_id_b , 1 ) ) . unwrap ( ) ;
284
+ assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( session_id_b , 1 ) ] ) ;
248
285
for s in ( 3 ..=5 ) . rev ( ) {
249
286
// A3-5 -> buffer (waiting for A2)
250
- tx. send ( "A" . into ( ) , s, ( "A" . into ( ) , s) ) . unwrap ( ) ;
287
+ tx. send ( session_id_a , s, ( session_id_a , s) ) . unwrap ( ) ;
251
288
// B3-5 -> buffer (waiting for B2)
252
- tx. send ( "B" . into ( ) , s, ( "B" . into ( ) , s) ) . unwrap ( ) ;
289
+ tx. send ( session_id_b , s, ( session_id_b , s) ) . unwrap ( ) ;
253
290
}
254
291
for s in ( 7 ..=9 ) . rev ( ) {
255
292
// A7-9 -> buffer (waiting for A1-6)
256
- tx. send ( "A" . into ( ) , s, ( "A" . into ( ) , s) ) . unwrap ( ) ;
293
+ tx. send ( session_id_a , s, ( session_id_a , s) ) . unwrap ( ) ;
257
294
// B7-9 -> buffer (waiting for B1-6)
258
- tx. send ( "B" . into ( ) , s, ( "B" . into ( ) , s) ) . unwrap ( ) ;
295
+ tx. send ( session_id_b , s, ( session_id_b , s) ) . unwrap ( ) ;
259
296
}
260
297
assert ! (
261
298
drain_try_recv( & mut rx) . is_empty( ) ,
262
299
"nothing should be delivered yet"
263
300
) ;
264
301
265
302
// A2 -> deliver A2 then flush A3
266
- tx. send ( "A" . into ( ) , 2 , ( "A" . into ( ) , 2 ) ) . unwrap ( ) ;
303
+ tx. send ( session_id_a , 2 , ( session_id_a , 2 ) ) . unwrap ( ) ;
267
304
assert_eq ! (
268
305
drain_try_recv( & mut rx) ,
269
306
vec![
270
- ( "A" . into ( ) , 2 ) ,
271
- ( "A" . into ( ) , 3 ) ,
272
- ( "A" . into ( ) , 4 ) ,
273
- ( "A" . into ( ) , 5 ) ,
307
+ ( session_id_a , 2 ) ,
308
+ ( session_id_a , 3 ) ,
309
+ ( session_id_a , 4 ) ,
310
+ ( session_id_a , 5 ) ,
274
311
]
275
312
) ;
276
313
// B2 -> deliver B2 then flush B3
277
- tx. send ( "B" . into ( ) , 2 , ( "B" . into ( ) , 2 ) ) . unwrap ( ) ;
314
+ tx. send ( session_id_b , 2 , ( session_id_b , 2 ) ) . unwrap ( ) ;
278
315
assert_eq ! (
279
316
drain_try_recv( & mut rx) ,
280
317
vec![
281
- ( "B" . into ( ) , 2 ) ,
282
- ( "B" . into ( ) , 3 ) ,
283
- ( "B" . into ( ) , 4 ) ,
284
- ( "B" . into ( ) , 5 ) ,
318
+ ( session_id_b , 2 ) ,
319
+ ( session_id_b , 3 ) ,
320
+ ( session_id_b , 4 ) ,
321
+ ( session_id_b , 5 ) ,
285
322
]
286
323
) ;
287
324
288
325
// A6 -> should deliver immediately and flush A7-9
289
- tx. send ( "A" . into ( ) , 6 , ( "A" . into ( ) , 6 ) ) . unwrap ( ) ;
326
+ tx. send ( session_id_a , 6 , ( session_id_a , 6 ) ) . unwrap ( ) ;
290
327
assert_eq ! (
291
328
drain_try_recv( & mut rx) ,
292
329
vec![
293
- ( "A" . into ( ) , 6 ) ,
294
- ( "A" . into ( ) , 7 ) ,
295
- ( "A" . into ( ) , 8 ) ,
296
- ( "A" . into ( ) , 9 )
330
+ ( session_id_a , 6 ) ,
331
+ ( session_id_a , 7 ) ,
332
+ ( session_id_a , 8 ) ,
333
+ ( session_id_a , 9 )
297
334
]
298
335
) ;
299
336
// B6 -> should deliver immediately and flush B7-9
300
- tx. send ( "B" . into ( ) , 6 , ( "B" . into ( ) , 6 ) ) . unwrap ( ) ;
337
+ tx. send ( session_id_b , 6 , ( session_id_b , 6 ) ) . unwrap ( ) ;
301
338
assert_eq ! (
302
339
drain_try_recv( & mut rx) ,
303
340
vec![
304
- ( "B" . into ( ) , 6 ) ,
305
- ( "B" . into ( ) , 7 ) ,
306
- ( "B" . into ( ) , 8 ) ,
307
- ( "B" . into ( ) , 9 )
341
+ ( session_id_b , 6 ) ,
342
+ ( session_id_b , 7 ) ,
343
+ ( session_id_b , 8 ) ,
344
+ ( session_id_b , 9 )
308
345
]
309
346
) ;
310
347
}
311
348
312
349
#[ test]
313
350
fn test_ordered_channel_duplicates ( ) {
314
- fn verify_empty_buffers < T > ( states : & DashMap < String , Arc < Mutex < BufferState < T > > > > ) {
351
+ let session_id_a = Uuid :: now_v7 ( ) ;
352
+ fn verify_empty_buffers < T > ( states : & DashMap < Uuid , Arc < Mutex < BufferState < T > > > > ) {
315
353
for entry in states. iter ( ) {
316
354
assert ! ( entry. value( ) . lock( ) . unwrap( ) . buffer. is_empty( ) ) ;
317
355
}
318
356
}
319
357
320
- let ( tx, mut rx) = ordered_channel :: < ( String , usize ) > ( "test" . to_string ( ) , true ) ;
358
+ let ( tx, mut rx) = ordered_channel :: < ( Uuid , u64 ) > ( "test" . to_string ( ) , true ) ;
321
359
// A1 -> deliver
322
- tx. send ( "A" . into ( ) , 1 , ( "A" . into ( ) , 1 ) ) . unwrap ( ) ;
323
- assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( "A" . into ( ) , 1 ) ] ) ;
360
+ tx. send ( session_id_a , 1 , ( session_id_a , 1 ) ) . unwrap ( ) ;
361
+ assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( session_id_a , 1 ) ] ) ;
324
362
verify_empty_buffers ( & tx. states ) ;
325
363
// duplicate A1 -> drop even if the message is different.
326
- tx. send ( "A" . into ( ) , 1 , ( "A" . into ( ) , 1_000 ) ) . unwrap ( ) ;
364
+ tx. send ( session_id_a , 1 , ( session_id_a , 1_000 ) ) . unwrap ( ) ;
327
365
assert ! (
328
366
drain_try_recv( & mut rx) . is_empty( ) ,
329
367
"nothing should be delivered yet"
330
368
) ;
331
369
verify_empty_buffers ( & tx. states ) ;
332
370
// A2 -> deliver
333
- tx. send ( "A" . into ( ) , 2 , ( "A" . into ( ) , 2 ) ) . unwrap ( ) ;
334
- assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( "A" . into ( ) , 2 ) ] ) ;
371
+ tx. send ( session_id_a , 2 , ( session_id_a , 2 ) ) . unwrap ( ) ;
372
+ assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( session_id_a , 2 ) ] ) ;
335
373
verify_empty_buffers ( & tx. states ) ;
336
374
// late A1 duplicate -> drop
337
- tx. send ( "A" . into ( ) , 1 , ( "A" . into ( ) , 1_001 ) ) . unwrap ( ) ;
375
+ tx. send ( session_id_a , 1 , ( session_id_a , 1_001 ) ) . unwrap ( ) ;
338
376
assert ! (
339
377
drain_try_recv( & mut rx) . is_empty( ) ,
340
378
"nothing should be delivered yet"
341
379
) ;
342
380
verify_empty_buffers ( & tx. states ) ;
343
381
}
382
+
383
+ #[ test]
384
+ fn test_sequencer_clone ( ) {
385
+ let sequencer = Sequencer {
386
+ session_id : Uuid :: now_v7 ( ) ,
387
+ last_seqs : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
388
+ } ;
389
+
390
+ let actor_id = id ! ( test[ 0 ] . test) ;
391
+
392
+ // Modify original sequencer
393
+ sequencer. assign_seq ( & actor_id) ;
394
+ sequencer. assign_seq ( & actor_id) ;
395
+
396
+ // Clone should share the same state
397
+ let cloned_sequencer = sequencer. clone ( ) ;
398
+ assert_eq ! ( sequencer. session_id( ) , cloned_sequencer. session_id( ) , ) ;
399
+ assert_eq ! ( cloned_sequencer. assign_seq( & actor_id) , 3 ) ;
400
+ }
401
+
402
+ #[ test]
403
+ fn test_sequencer_assign_seq ( ) {
404
+ let sequencer = Sequencer {
405
+ session_id : Uuid :: now_v7 ( ) ,
406
+ last_seqs : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
407
+ } ;
408
+
409
+ let actor_id_0 = id ! ( worker[ 0 ] . worker) ;
410
+ let actor_id_1 = id ! ( worker[ 1 ] . worker) ;
411
+
412
+ // Both actors should start with next_seq = 1
413
+ assert_eq ! ( sequencer. assign_seq( & actor_id_0) , 1 ) ;
414
+ assert_eq ! ( sequencer. assign_seq( & actor_id_1) , 1 ) ;
415
+
416
+ // Increment actor_0 twice
417
+ sequencer. assign_seq ( & actor_id_0) ;
418
+ sequencer. assign_seq ( & actor_id_0) ;
419
+
420
+ // Increment actor_1 once
421
+ sequencer. assign_seq ( & actor_id_1) ;
422
+
423
+ // Check independent sequences
424
+ assert_eq ! ( sequencer. assign_seq( & actor_id_0) , 4 ) ;
425
+ assert_eq ! ( sequencer. assign_seq( & actor_id_1) , 3 ) ;
426
+ }
344
427
}
0 commit comments