1
- use std:: { collections:: HashMap , sync:: Arc , time:: Duration } ;
1
+ use std:: { collections:: BTreeMap , mem , sync:: Arc , time:: Duration } ;
2
2
3
3
use bon:: Builder ;
4
4
use multisig:: { Committee , PublicKey , Validated } ;
5
5
use robusta:: { Client , espresso_types:: NamespaceId } ;
6
6
use timeboost_types:: {
7
- BlockNumber , CertifiedBlock ,
7
+ CertifiedBlock ,
8
8
sailfish:: { CommitteeVec , Empty } ,
9
9
} ;
10
10
use tokio:: {
11
11
select, spawn,
12
12
sync:: { Mutex , mpsc} ,
13
13
task:: JoinHandle ,
14
- time:: sleep,
14
+ time:: { Instant , MissedTickBehavior , interval , sleep} ,
15
15
} ;
16
16
use tracing:: { debug, warn} ;
17
17
18
- mod time;
19
18
mod verify;
20
19
21
20
use crate :: { config:: SubmitterConfig , metrics:: BuilderMetrics } ;
22
- use time:: Timer ;
23
21
use verify:: { Verified , Verifier } ;
24
22
25
23
const DELAY : Duration = Duration :: from_secs ( 30 ) ;
@@ -62,10 +60,10 @@ impl Submitter {
62
60
let sender = Sender :: builder ( )
63
61
. label ( cfg. pubkey )
64
62
. nsid ( cfg. namespace )
65
- . timer ( Timer :: new ( cfg. pubkey ) )
66
63
. client ( client)
67
64
. verified ( verified. clone ( ) )
68
65
. receiver ( rx)
66
+ . clock ( Instant :: now ( ) )
69
67
. build ( ) ;
70
68
let mut configs = vec ! [ cfg. robusta. 0 . clone( ) ] ;
71
69
configs. extend ( cfg. robusta . 1 . iter ( ) . cloned ( ) ) ;
@@ -88,88 +86,92 @@ impl Submitter {
88
86
self . committees . lock ( ) . await . add ( c) ;
89
87
}
90
88
91
- pub async fn submit ( & mut self , cb : CertifiedBlock < Validated > ) {
89
+ pub async fn submit ( & mut self , cb : CertifiedBlock < Validated > ) -> Result < ( ) , SenderTaskDown > {
92
90
self . metrics . blocks_submitted . add ( 1 ) ;
93
91
if self . verified . contains ( cb. cert ( ) . data ( ) . num ( ) ) {
94
- return ;
92
+ return Ok ( ( ) ) ;
95
93
}
96
- self . sender . send ( cb) . await . unwrap ( ) // TODO
94
+ self . sender . send ( cb) . await . map_err ( |_| SenderTaskDown ( ( ) ) )
97
95
}
98
96
}
99
97
98
+ #[ derive( Debug , thiserror:: Error ) ]
99
+ #[ error( "submit sender task terminated" ) ]
100
+ pub struct SenderTaskDown ( ( ) ) ;
101
+
100
102
#[ derive( Builder ) ]
101
103
struct Sender {
102
104
label : PublicKey ,
103
- timer : Timer < BlockNumber > ,
104
105
nsid : NamespaceId ,
105
106
client : Client ,
106
107
verified : Verified < 15_000 > ,
107
108
receiver : mpsc:: Receiver < CertifiedBlock < Validated > > ,
109
+ clock : Instant ,
110
+ #[ builder( default ) ]
111
+ pending : BTreeMap < Instant , Vec < CertifiedBlock < Validated > > > ,
108
112
}
109
113
110
114
impl Sender {
111
115
async fn go ( mut self ) {
112
- let mut pending = HashMap :: new ( ) ;
113
116
let mut inbox = Vec :: new ( ) ;
114
117
let mut outbox = Vec :: new ( ) ;
115
- let mut timeouts = Vec :: new ( ) ;
116
118
117
- loop {
119
+ let drop_verified_blocks = |v : & mut Vec < CertifiedBlock < Validated > > | {
120
+ v. retain ( |b| !self . verified . contains ( b. cert ( ) . data ( ) . num ( ) ) ) ;
121
+ } ;
122
+
123
+ let mut checkpoints = interval ( Duration :: from_secs ( 1 ) ) ;
124
+ checkpoints. set_missed_tick_behavior ( MissedTickBehavior :: Skip ) ;
125
+
126
+ ' main: loop {
118
127
select ! {
119
128
k = self . receiver. recv_many( & mut inbox, 10 ) => {
120
- if k == 0 {
129
+ if k == 0 { // channel is closed
121
130
return
122
- } else {
123
- for b in inbox. drain( ..) {
124
- let n = b. cert( ) . data( ) . num( ) ;
125
- if self . verified. contains( n) {
126
- continue
127
- }
128
- if b. is_leader( ) {
129
- outbox. push( b)
130
- } else {
131
- pending. insert( n, b) ;
132
- }
133
- self . timer. set( n, DELAY )
134
- }
135
- }
136
- } ,
137
- n = self . timer. next( ) => {
138
- timeouts. push( n) ;
139
- while let Some ( n) = self . timer. try_next( ) {
140
- timeouts. push( n)
141
131
}
142
- timeouts. sort( ) ;
143
- for n in timeouts. drain( ..) {
144
- let Some ( b) = pending. remove( & n) else {
145
- continue
146
- } ;
147
- if self . verified. contains( n) {
148
- continue
132
+ for b in inbox. drain( ..) {
133
+ if b. is_leader( ) {
134
+ outbox. push( b)
135
+ } else {
136
+ self . pending. entry( self . clock + DELAY ) . or_default( ) . push( b) ;
149
137
}
150
- debug!( node = %self . label, num = %n, "block timeout" ) ;
151
- outbox. push( b)
152
138
}
153
139
}
140
+ t = checkpoints. tick( ) => {
141
+ self . clock = t;
142
+ // Move blocks that timed out into `outbox`:
143
+ let mut blocks = self . pending. split_off( & self . clock) ;
144
+ mem:: swap( & mut blocks, & mut self . pending) ;
145
+ outbox. extend( blocks. into_values( ) . flatten( ) ) ;
146
+ }
154
147
}
155
148
149
+ drop_verified_blocks ( & mut outbox) ;
150
+
156
151
if outbox. is_empty ( ) {
157
152
continue ;
158
153
}
159
154
160
- let mut delays = self . client . config ( ) . delay_iter ( ) ;
155
+ // TODO: Ensure that the resulting payload size does not exceed the allowed maximum.
161
156
162
157
debug ! ( node = %self . label, blocks = %outbox. len( ) , "submitting blocks" ) ;
163
158
159
+ let mut delays = self . client . config ( ) . delay_iter ( ) ;
160
+
164
161
while let Err ( err) = self . client . submit ( self . nsid , & outbox) . await {
165
162
warn ! ( node= %self . label, %err, "error submitting blocks" ) ;
166
163
let d = delays. next ( ) . expect ( "delay iterator repeats" ) ;
167
- sleep ( d) . await
164
+ sleep ( d) . await ;
165
+ drop_verified_blocks ( & mut outbox) ;
166
+ if outbox. is_empty ( ) {
167
+ continue ' main;
168
+ }
168
169
}
169
170
170
- for b in outbox. drain ( ..) {
171
- pending. insert ( b. cert ( ) . data ( ) . num ( ) , b) ;
172
- }
171
+ self . pending
172
+ . entry ( self . clock + DELAY )
173
+ . or_default ( )
174
+ . append ( & mut outbox) ;
173
175
}
174
176
}
175
177
}
0 commit comments