1
- use std:: { collections:: BTreeMap , mem, sync:: Arc , time:: Duration } ;
1
+ use std:: {
2
+ collections:: { BTreeMap , VecDeque } ,
3
+ mem,
4
+ sync:: Arc ,
5
+ time:: Duration ,
6
+ } ;
2
7
3
8
use bon:: Builder ;
4
9
use multisig:: { Committee , PublicKey , Validated } ;
@@ -13,7 +18,7 @@ use tokio::{
13
18
task:: JoinHandle ,
14
19
time:: { Instant , MissedTickBehavior , interval, sleep} ,
15
20
} ;
16
- use tracing:: { debug, warn} ;
21
+ use tracing:: { Level , debug, enabled , error , trace , warn} ;
17
22
18
23
mod verify;
19
24
@@ -64,6 +69,7 @@ impl Submitter {
64
69
. verified ( verified. clone ( ) )
65
70
. receiver ( rx)
66
71
. clock ( Instant :: now ( ) )
72
+ . size_limit ( cfg. max_transaction_size )
67
73
. build ( ) ;
68
74
let mut configs = vec ! [ cfg. robusta. 0 . clone( ) ] ;
69
75
configs. extend ( cfg. robusta . 1 . iter ( ) . cloned ( ) ) ;
@@ -109,12 +115,17 @@ struct Sender {
109
115
clock : Instant ,
110
116
#[ builder( default ) ]
111
117
pending : BTreeMap < Instant , Vec < CertifiedBlock < Validated > > > ,
118
+ size_limit : usize ,
112
119
}
113
120
114
121
impl Sender {
115
122
async fn go ( mut self ) {
123
+ // Blocks we receive from the application:
116
124
let mut inbox = Vec :: new ( ) ;
117
- let mut outbox = Vec :: new ( ) ;
125
+ // Blocks scheduled for submission:
126
+ let mut outbox = VecDeque :: new ( ) ;
127
+ // A subset of `outbox` that fits into one transaction:
128
+ let mut transaction = Vec :: new ( ) ;
118
129
119
130
let drop_verified_blocks = |v : & mut Vec < CertifiedBlock < Validated > > | {
120
131
v. retain ( |b| !self . verified . contains ( b. cert ( ) . data ( ) . num ( ) ) ) ;
@@ -123,15 +134,16 @@ impl Sender {
123
134
let mut checkpoints = interval ( Duration :: from_secs ( 1 ) ) ;
124
135
checkpoints. set_missed_tick_behavior ( MissedTickBehavior :: Skip ) ;
125
136
126
- ' main : loop {
137
+ loop {
127
138
select ! {
128
139
k = self . receiver. recv_many( & mut inbox, 10 ) => {
129
140
if k == 0 { // channel is closed
130
141
return
131
142
}
132
143
for b in inbox. drain( ..) {
133
144
if b. is_leader( ) {
134
- outbox. push( b)
145
+ trace!( node = %self . label, block = %b. cert( ) . data( ) . num( ) , "leader submits" ) ;
146
+ outbox. push_back( b)
135
147
} else {
136
148
self . pending. entry( self . clock + DELAY ) . or_default( ) . push( b) ;
137
149
}
@@ -143,35 +155,59 @@ impl Sender {
143
155
let mut blocks = self . pending. split_off( & self . clock) ;
144
156
mem:: swap( & mut blocks, & mut self . pending) ;
145
157
outbox. extend( blocks. into_values( ) . flatten( ) ) ;
158
+ if enabled!( Level :: TRACE ) {
159
+ for b in & outbox {
160
+ trace!( node = %self . label, block = %b. cert( ) . data( ) . num( ) , "timeout" ) ;
161
+ }
162
+ }
146
163
}
147
164
}
148
165
149
- drop_verified_blocks ( & mut outbox ) ;
166
+ debug_assert ! ( transaction . is_empty ( ) ) ;
150
167
151
- if outbox. is_empty ( ) {
152
- continue ;
153
- }
168
+ ' submit: while !outbox. is_empty ( ) {
169
+ let mut size: usize = 0 ;
170
+ while let Some ( b) = outbox. pop_front ( ) {
171
+ if self . verified . contains ( b. cert ( ) . data ( ) . num ( ) ) {
172
+ continue ;
173
+ }
174
+ let n = minicbor:: len ( & b) ;
175
+ if size + n < self . size_limit {
176
+ size += n;
177
+ transaction. push ( b)
178
+ } else {
179
+ break ;
180
+ }
181
+ }
154
182
155
- // TODO: Ensure that the resulting payload size does not exceed the allowed maximum.
183
+ if transaction. is_empty ( ) {
184
+ break ;
185
+ }
156
186
157
- debug ! ( node = %self . label, blocks = %outbox . len( ) , "submitting blocks" ) ;
187
+ debug ! ( node = %self . label, blocks = %transaction . len( ) , %size , "submitting blocks" ) ;
158
188
159
- let mut delays = self . client . config ( ) . delay_iter ( ) ;
189
+ let mut delays = self . client . config ( ) . delay_iter ( ) ;
160
190
161
- while let Err ( err) = self . client . submit ( self . nsid , & outbox) . await {
162
- warn ! ( node= %self . label, %err, "error submitting blocks" ) ;
163
- let d = delays. next ( ) . expect ( "delay iterator repeats" ) ;
164
- sleep ( d) . await ;
165
- drop_verified_blocks ( & mut outbox) ;
166
- if outbox. is_empty ( ) {
167
- continue ' main;
191
+ while let Err ( err) = self . client . submit ( self . nsid , & transaction) . await {
192
+ warn ! ( node= %self . label, %err, "error submitting blocks" ) ;
193
+ let d = delays. next ( ) . expect ( "delay iterator repeats" ) ;
194
+ sleep ( d) . await ;
195
+ drop_verified_blocks ( & mut transaction) ;
196
+ if transaction. is_empty ( ) {
197
+ continue ' submit;
198
+ }
168
199
}
169
- }
170
200
171
- self . pending
172
- . entry ( self . clock + DELAY )
173
- . or_default ( )
174
- . append ( & mut outbox) ;
201
+ drop_verified_blocks ( & mut transaction) ;
202
+ if transaction. is_empty ( ) {
203
+ continue ;
204
+ }
205
+
206
+ self . pending
207
+ . entry ( self . clock + DELAY )
208
+ . or_default ( )
209
+ . append ( & mut transaction) ;
210
+ }
175
211
}
176
212
}
177
213
}
0 commit comments