1
+ use std:: collections:: HashMap ;
1
2
use std:: iter:: once;
2
3
use std:: num:: NonZeroUsize ;
3
4
use std:: time:: Duration ;
4
5
5
6
use metrics:: NoMetrics ;
7
+ use sailfish_types:: RoundNumber ;
6
8
use timeboost_sequencer:: { Output , Sequencer } ;
9
+ use timeboost_types:: Transaction ;
7
10
use timeboost_utils:: types:: logging:: init_logging;
8
11
use tokio:: select;
9
12
use tokio:: sync:: broadcast:: error:: RecvError ;
10
13
use tokio:: sync:: { broadcast, mpsc} ;
11
- use tokio:: task:: JoinSet ;
12
14
use tokio:: time:: sleep;
13
15
use tokio_util:: sync:: CancellationToken ;
14
- use tracing:: info;
16
+ use tokio_util:: task:: TaskTracker ;
17
+ use tracing:: { debug, info} ;
15
18
16
19
use super :: { gen_bundles, make_configs} ;
17
20
@@ -28,10 +31,11 @@ async fn transaction_order() {
28
31
init_logging ( ) ;
29
32
30
33
let num = NonZeroUsize :: new ( 5 ) . unwrap ( ) ;
34
+ let quorum = 4 ;
31
35
let ( enc_keys, cfg) = make_configs ( num, RECOVER_INDEX ) ;
32
36
33
37
let mut rxs = Vec :: new ( ) ;
34
- let mut tasks = JoinSet :: new ( ) ;
38
+ let tasks = TaskTracker :: new ( ) ;
35
39
let ( bcast, _) = broadcast:: channel ( 3 ) ;
36
40
let finish = CancellationToken :: new ( ) ;
37
41
@@ -42,6 +46,7 @@ async fn transaction_order() {
42
46
let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
43
47
let mut brx = bcast. subscribe ( ) ;
44
48
let finish = finish. clone ( ) ;
49
+ let label = c. sign_keypair ( ) . public_key ( ) ;
45
50
tasks. spawn ( async move {
46
51
if c. is_recover ( ) {
47
52
// delay start of a recovering node:
@@ -56,12 +61,10 @@ async fn transaction_order() {
56
61
Err ( err) => panic!( "{err}" )
57
62
} ,
58
63
out = s. next( ) => {
59
- let Output :: Transactions { transactions, .. } = out. unwrap( ) else {
64
+ let Output :: Transactions { round , transactions, .. } = out. unwrap( ) else {
60
65
continue
61
66
} ;
62
- for t in transactions {
63
- tx. send( t) . unwrap( )
64
- }
67
+ tx. send( ( round, transactions) ) . unwrap( )
65
68
}
66
69
_ = finish. cancelled( ) => {
67
70
info!( node = %s. public_key( ) , "done" ) ;
@@ -70,7 +73,7 @@ async fn transaction_order() {
70
73
}
71
74
}
72
75
} ) ;
73
- rxs. push ( rx )
76
+ rxs. push ( ( label , rx ) )
74
77
}
75
78
76
79
for enc_key in & enc_keys {
@@ -79,19 +82,31 @@ async fn transaction_order() {
79
82
80
83
tasks. spawn ( gen_bundles ( enc_keys[ 0 ] . clone ( ) , bcast. clone ( ) ) ) ;
81
84
82
- for _ in 0 ..NUM_OF_TRANSACTIONS {
83
- let first = rxs[ 0 ] . recv ( ) . await . unwrap ( ) ;
84
- for rx in & mut rxs[ 1 ..] {
85
- let t = rx. recv ( ) . await . unwrap ( ) ;
86
- assert_eq ! ( first. hash( ) , t. hash( ) )
85
+ let mut map: HashMap < ( RoundNumber , Vec < Transaction > ) , usize > = HashMap :: new ( ) ;
86
+ let mut transactions = 0 ;
87
+
88
+ while transactions < NUM_OF_TRANSACTIONS {
89
+ map. clear ( ) ;
90
+ info ! ( "{transactions}/{NUM_OF_TRANSACTIONS}" ) ;
91
+ for ( node, r) in & mut rxs {
92
+ debug ! ( %node, "awaiting ..." ) ;
93
+ let value = r. recv ( ) . await . unwrap ( ) ;
94
+ * map. entry ( value) . or_default ( ) += 1
95
+ }
96
+ if let Some ( trxs) = map. values ( ) . find ( |n| * * n >= quorum && * * n <= num. get ( ) ) {
97
+ transactions += trxs;
98
+ continue ;
99
+ }
100
+ for ( ( round, trxs) , k) in map {
101
+ eprintln ! (
102
+ "{round}: {:?} = {k}" ,
103
+ trxs. into_iter( )
104
+ . map( |t| t. hash( ) . to_string( ) )
105
+ . collect:: <Vec <_>>( )
106
+ )
87
107
}
108
+ panic ! ( "outputs do not match" )
88
109
}
89
110
90
111
finish. cancel ( ) ;
91
-
92
- while let Some ( result) = tasks. join_next ( ) . await {
93
- if let Err ( err) = result {
94
- panic ! ( "task panic: {err}" )
95
- }
96
- }
97
112
}
0 commit comments