1
+ use std:: collections:: HashMap ;
1
2
use std:: iter:: once;
2
3
use std:: num:: NonZeroUsize ;
3
4
use std:: time:: Duration ;
4
5
6
+ use alloy:: primitives:: B256 ;
5
7
use metrics:: NoMetrics ;
8
+ use sailfish_types:: RoundNumber ;
6
9
use timeboost_sequencer:: { Output , Sequencer } ;
7
10
use timeboost_utils:: types:: logging:: init_logging;
8
11
use tokio:: select;
9
12
use tokio:: sync:: broadcast:: error:: RecvError ;
10
13
use tokio:: sync:: { broadcast, mpsc} ;
11
- use tokio:: task:: JoinSet ;
12
14
use tokio:: time:: sleep;
13
15
use tokio_util:: sync:: CancellationToken ;
14
- use tracing:: info;
16
+ use tokio_util:: task:: TaskTracker ;
17
+ use tracing:: { debug, info} ;
15
18
16
19
use super :: { gen_bundles, make_configs} ;
17
20
@@ -28,10 +31,11 @@ async fn transaction_order() {
28
31
init_logging ( ) ;
29
32
30
33
let num = NonZeroUsize :: new ( 5 ) . unwrap ( ) ;
34
+ let quorum = 4 ;
31
35
let ( enc_keys, cfg) = make_configs ( num, RECOVER_INDEX ) ;
32
36
33
37
let mut rxs = Vec :: new ( ) ;
34
- let mut tasks = JoinSet :: new ( ) ;
38
+ let tasks = TaskTracker :: new ( ) ;
35
39
let ( bcast, _) = broadcast:: channel ( 3 ) ;
36
40
let finish = CancellationToken :: new ( ) ;
37
41
@@ -42,6 +46,7 @@ async fn transaction_order() {
42
46
let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
43
47
let mut brx = bcast. subscribe ( ) ;
44
48
let finish = finish. clone ( ) ;
49
+ let label = c. sign_keypair ( ) . public_key ( ) ;
45
50
tasks. spawn ( async move {
46
51
if c. is_recover ( ) {
47
52
// delay start of a recovering node:
@@ -56,12 +61,11 @@ async fn transaction_order() {
56
61
Err ( err) => panic!( "{err}" )
57
62
} ,
58
63
out = s. next( ) => {
59
- let Output :: Transactions { transactions, .. } = out. unwrap( ) else {
64
+ let Output :: Transactions { round , transactions, .. } = out. unwrap( ) else {
60
65
continue
61
66
} ;
62
- for t in transactions {
63
- tx. send( t) . unwrap( )
64
- }
67
+ let transactions = transactions. into_iter( ) . map( |t| * t. hash( ) ) . collect( ) ;
68
+ tx. send( ( round, transactions) ) . unwrap( )
65
69
}
66
70
_ = finish. cancelled( ) => {
67
71
info!( node = %s. public_key( ) , "done" ) ;
@@ -70,7 +74,7 @@ async fn transaction_order() {
70
74
}
71
75
}
72
76
} ) ;
73
- rxs. push ( rx )
77
+ rxs. push ( ( label , rx ) )
74
78
}
75
79
76
80
for enc_key in & enc_keys {
@@ -79,19 +83,29 @@ async fn transaction_order() {
79
83
80
84
tasks. spawn ( gen_bundles ( enc_keys[ 0 ] . clone ( ) , bcast. clone ( ) ) ) ;
81
85
82
- for _ in 0 ..NUM_OF_TRANSACTIONS {
83
- let first = rxs[ 0 ] . recv ( ) . await . unwrap ( ) ;
84
- for rx in & mut rxs[ 1 ..] {
85
- let t = rx. recv ( ) . await . unwrap ( ) ;
86
- assert_eq ! ( first. hash( ) , t. hash( ) )
86
+ let mut map: HashMap < ( RoundNumber , Vec < B256 > ) , usize > = HashMap :: new ( ) ;
87
+ let mut transactions = 0 ;
88
+
89
+ while transactions < NUM_OF_TRANSACTIONS {
90
+ map. clear ( ) ;
91
+ info ! ( "{transactions}/{NUM_OF_TRANSACTIONS}" ) ;
92
+ for ( node, r) in & mut rxs {
93
+ debug ! ( %node, "awaiting ..." ) ;
94
+ let value = r. recv ( ) . await . unwrap ( ) ;
95
+ * map. entry ( value) . or_default ( ) += 1
96
+ }
97
+ if let Some ( ( ( _, trxs) , _) ) = map. iter ( ) . find ( |( _, n) | * * n >= quorum && * * n <= num. get ( ) ) {
98
+ transactions += trxs. len ( ) ;
99
+ continue ;
100
+ }
101
+ for ( ( r, trxs) , k) in map {
102
+ eprintln ! (
103
+ "{r}: {:?} = {k}" ,
104
+ trxs. into_iter( ) . map( |t| t. to_string( ) ) . collect:: <Vec <_>>( )
105
+ )
87
106
}
107
+ panic ! ( "outputs do not match" )
88
108
}
89
109
90
110
finish. cancel ( ) ;
91
-
92
- while let Some ( result) = tasks. join_next ( ) . await {
93
- if let Err ( err) = result {
94
- panic ! ( "task panic: {err}" )
95
- }
96
- }
97
111
}
0 commit comments