1
+ use std:: collections:: HashMap ;
1
2
use std:: iter:: once;
2
3
use std:: num:: NonZeroUsize ;
4
+ use std:: sync:: Arc ;
5
+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
3
6
use std:: time:: Duration ;
4
7
8
+ use alloy:: eips:: Encodable2718 ;
5
9
use bytes:: Bytes ;
6
10
use metrics:: NoMetrics ;
7
11
use multisig:: Certificate ;
8
- use timeboost_builder:: Certifier ;
9
- use timeboost_sequencer:: { Output , Sequencer } ;
10
- use timeboost_types:: { Block , BlockInfo } ;
12
+ use parking_lot:: Mutex ;
13
+ use timeboost:: builder:: Certifier ;
14
+ use timeboost:: sequencer:: { Output , Sequencer } ;
15
+ use timeboost:: types:: sailfish:: RoundNumber ;
16
+ use timeboost:: types:: { Block , BlockInfo , BlockNumber , Transaction } ;
11
17
use timeboost_utils:: types:: logging:: init_logging;
12
18
use tokio:: select;
13
19
use tokio:: sync:: broadcast:: error:: RecvError ;
14
20
use tokio:: sync:: { broadcast, mpsc} ;
15
- use tokio:: task:: JoinSet ;
16
21
use tokio:: time:: sleep;
17
22
use tokio_util:: sync:: CancellationToken ;
18
- use tracing:: info;
23
+ use tokio_util:: task:: TaskTracker ;
24
+ use tracing:: { debug, error, info} ;
19
25
20
26
use super :: { gen_bundles, make_configs} ;
21
27
@@ -27,40 +33,57 @@ async fn block_order() {
27
33
init_logging ( ) ;
28
34
29
35
let num = NonZeroUsize :: new ( 5 ) . unwrap ( ) ;
36
+ let quorum = 4 ;
30
37
let ( enc_keys, cfg) = make_configs ( num, RECOVER_INDEX ) ;
31
38
32
39
let mut rxs = Vec :: new ( ) ;
33
- let mut tasks = JoinSet :: new ( ) ;
40
+ let tasks = TaskTracker :: new ( ) ;
34
41
let ( bcast, _) = broadcast:: channel ( 3 ) ;
35
42
let finish = CancellationToken :: new ( ) ;
43
+ let round2block = Arc :: new ( Round2Block :: new ( ) ) ;
36
44
37
45
for ( c, b) in cfg {
38
46
let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
39
47
let mut brx = bcast. subscribe ( ) ;
40
48
let finish = finish. clone ( ) ;
49
+ let label = c. sign_keypair ( ) . public_key ( ) ;
50
+ let r2b = round2block. clone ( ) ;
41
51
tasks. spawn ( async move {
42
52
if c. is_recover ( ) {
43
53
// delay start of a recovering node:
44
54
sleep ( Duration :: from_secs ( 5 ) ) . await
45
55
}
46
56
let mut s = Sequencer :: new ( c, & NoMetrics ) . await . unwrap ( ) ;
47
57
let mut p = Certifier :: new ( b, & NoMetrics ) . await . unwrap ( ) ;
58
+ let mut r = None ;
59
+ let handle = p. handle ( ) ;
48
60
loop {
49
61
select ! {
50
62
t = brx. recv( ) => match t {
51
63
Ok ( trx) => s. add_bundles( once( trx) ) ,
52
- Err ( RecvError :: Lagged ( _) ) => continue ,
64
+ Err ( RecvError :: Lagged ( _) ) => {
65
+ error!( node = %s. public_key( ) , "lagging behind" ) ;
66
+ continue
67
+ }
53
68
Err ( err) => panic!( "{err}" )
54
69
} ,
55
70
o = s. next( ) => {
56
- let Output :: Transactions { round, .. } = o. unwrap( ) else {
71
+ let Output :: Transactions { round, transactions, .. } = o. unwrap( ) else {
72
+ error!( node = %s. public_key( ) , "no sequencer output" ) ;
57
73
continue
58
74
} ;
59
- let b = Block :: new( * round, Bytes :: new( ) ) ;
60
- p. handle( ) . enqueue( b) . await . unwrap( )
75
+ // We require unique round numbers.
76
+ if Some ( round) == r {
77
+ continue
78
+ }
79
+ r = Some ( round) ;
80
+ let i = r2b. get( round) ;
81
+ let b = Block :: new( i, * round, hash( & transactions) ) ;
82
+ handle. enqueue( b) . await . unwrap( )
61
83
}
62
84
b = p. next_block( ) => {
63
85
let b = b. expect( "block" ) ;
86
+ debug!( node = %s. public_key( ) , hash = %b. data( ) . hash( ) , "block received" ) ;
64
87
let c: Certificate <BlockInfo > = b. into( ) ;
65
88
tx. send( c. into_data( ) ) . unwrap( )
66
89
}
@@ -71,7 +94,7 @@ async fn block_order() {
71
94
}
72
95
}
73
96
} ) ;
74
- rxs. push ( rx )
97
+ rxs. push ( ( label , rx ) )
75
98
}
76
99
77
100
for enc_key in & enc_keys {
@@ -80,28 +103,59 @@ async fn block_order() {
80
103
81
104
tasks. spawn ( gen_bundles ( enc_keys[ 0 ] . clone ( ) , bcast. clone ( ) ) ) ;
82
105
83
- // Collect all outputs:
84
- let mut outputs: Vec < Vec < BlockInfo > > = vec ! [ Vec :: new( ) ; num. get( ) ] ;
85
- for _ in 0 ..NUM_OF_BLOCKS {
86
- for ( i, o) in outputs. iter_mut ( ) . enumerate ( ) {
87
- let x = rxs[ i] . recv ( ) . await . unwrap ( ) ;
88
- o. push ( x) ;
106
+ let mut map: HashMap < BlockInfo , usize > = HashMap :: new ( ) ;
107
+
108
+ for b in 0 ..NUM_OF_BLOCKS {
109
+ map. clear ( ) ;
110
+ info ! ( block = %b) ;
111
+ for ( node, r) in & mut rxs {
112
+ debug ! ( %node, block = %b, "awaiting ..." ) ;
113
+ let info = r. recv ( ) . await . unwrap ( ) ;
114
+ * map. entry ( info) . or_default ( ) += 1
115
+ }
116
+ if map. values ( ) . any ( |n| * n >= quorum && * n <= num. get ( ) ) {
117
+ continue ;
89
118
}
119
+ for ( info, n) in map {
120
+ eprintln ! ( "{}: {} = {n}" , info. hash( ) , info. round( ) . num( ) )
121
+ }
122
+ panic ! ( "outputs do not match" )
90
123
}
91
124
92
125
finish. cancel ( ) ;
126
+ }
93
127
94
- // Compare outputs:
95
- for ( a, b) in outputs. iter ( ) . zip ( outputs. iter ( ) . skip ( 1 ) ) {
96
- if a != b {
97
- for infos in & outputs {
98
- let xy = infos
99
- . iter ( )
100
- . map ( |i| ( * i. num ( ) , * i. round ( ) ) )
101
- . collect :: < Vec < _ > > ( ) ;
102
- eprintln ! ( "{xy:?}" )
103
- }
104
- panic ! ( "outputs do not match" )
128
+ fn hash ( tx : & [ Transaction ] ) -> Bytes {
129
+ let mut h = blake3:: Hasher :: new ( ) ;
130
+ for t in tx {
131
+ h. update ( & t. encoded_2718 ( ) ) ;
132
+ }
133
+ Bytes :: copy_from_slice ( h. finalize ( ) . as_bytes ( ) )
134
+ }
135
+
136
+ /// Map round numbers to block numbers.
137
+ ///
138
+ /// Block numbers need to be consistent, consecutive and strictly monotonic.
139
+ /// The round numbers of our sequencer output may contain gaps. To provide
140
+ /// block numbers with the required properties we have here one monotonic
141
+ /// counter and record which block number is used for a round number.
142
+ /// Subsequent lookups will then get a consistent result.
143
+ struct Round2Block {
144
+ counter : AtomicU64 ,
145
+ block_numbers : Mutex < HashMap < RoundNumber , BlockNumber > > ,
146
+ }
147
+
148
+ impl Round2Block {
149
+ fn new ( ) -> Self {
150
+ Self {
151
+ counter : AtomicU64 :: new ( 0 ) ,
152
+ block_numbers : Mutex :: new ( HashMap :: new ( ) ) ,
105
153
}
106
154
}
155
+
156
+ fn get ( & self , r : RoundNumber ) -> BlockNumber {
157
+ let mut map = self . block_numbers . lock ( ) ;
158
+ * map. entry ( r)
159
+ . or_insert_with ( || self . counter . fetch_add ( 1 , Ordering :: Relaxed ) . into ( ) )
160
+ }
107
161
}
0 commit comments