@@ -7,7 +7,8 @@ use std::{
7
7
8
8
use bon:: Builder ;
9
9
use multisig:: { Committee , PublicKey , Validated } ;
10
- use robusta:: { Client , espresso_types:: NamespaceId } ;
10
+ use rand:: seq:: IndexedRandom ;
11
+ use robusta:: { Client , Config , espresso_types:: NamespaceId } ;
11
12
use timeboost_types:: {
12
13
CertifiedBlock ,
13
14
sailfish:: { CommitteeVec , Empty } ,
@@ -49,27 +50,26 @@ impl Submitter {
49
50
where
50
51
M : :: metrics:: Metrics ,
51
52
{
52
- let client = Client :: new ( cfg. robusta . 0 . clone ( ) ) ;
53
53
let verified = Verified :: default ( ) ;
54
54
let committees = Arc :: new ( Mutex :: new ( CommitteeVec :: new ( cfg. committee . clone ( ) ) ) ) ;
55
55
let metrics = Arc :: new ( BuilderMetrics :: new ( metrics) ) ;
56
56
let verifier = Verifier :: builder ( )
57
57
. label ( cfg. pubkey )
58
58
. nsid ( cfg. namespace )
59
59
. committees ( committees. clone ( ) )
60
- . client ( client . clone ( ) )
60
+ . client ( Client :: new ( cfg . robusta . 0 . clone ( ) ) )
61
61
. verified ( verified. clone ( ) )
62
62
. metrics ( metrics. clone ( ) )
63
63
. build ( ) ;
64
64
let ( tx, rx) = mpsc:: channel ( 10_000 ) ;
65
65
let sender = Sender :: builder ( )
66
66
. label ( cfg. pubkey )
67
67
. nsid ( cfg. namespace )
68
- . client ( client)
69
68
. verified ( verified. clone ( ) )
70
69
. receiver ( rx)
71
70
. clock ( Instant :: now ( ) )
72
71
. size_limit ( cfg. max_transaction_size )
72
+ . config ( cfg. robusta . 0 . clone ( ) )
73
73
. build ( ) ;
74
74
let mut configs = vec ! [ cfg. robusta. 0 . clone( ) ] ;
75
75
configs. extend ( cfg. robusta . 1 . iter ( ) . cloned ( ) ) ;
@@ -79,8 +79,8 @@ impl Submitter {
79
79
committees,
80
80
metrics,
81
81
sender : tx,
82
- verify_task : spawn ( verifier. verify ( configs) ) ,
83
- sender_task : spawn ( sender. go ( ) ) ,
82
+ verify_task : spawn ( verifier. verify ( configs. clone ( ) ) ) ,
83
+ sender_task : spawn ( sender. send ( configs ) ) ,
84
84
}
85
85
}
86
86
@@ -109,17 +109,20 @@ pub struct SenderTaskDown(());
109
109
struct Sender {
110
110
label : PublicKey ,
111
111
nsid : NamespaceId ,
112
- client : Client ,
113
112
verified : Verified < 15_000 > ,
114
113
receiver : mpsc:: Receiver < CertifiedBlock < Validated > > ,
115
114
clock : Instant ,
116
115
#[ builder( default ) ]
117
116
pending : BTreeMap < Instant , Vec < CertifiedBlock < Validated > > > ,
118
117
size_limit : usize ,
118
+ config : Config ,
119
119
}
120
120
121
121
impl Sender {
122
- async fn go ( mut self ) {
122
+ async fn send ( mut self , configs : Vec < Config > ) {
123
+ let clients: Vec < Client > = configs. into_iter ( ) . map ( Client :: new) . collect ( ) ;
124
+ assert ! ( !clients. is_empty( ) ) ;
125
+
123
126
// Blocks we receive from the application:
124
127
let mut inbox = Vec :: new ( ) ;
125
128
// Blocks scheduled for submission:
@@ -131,6 +134,12 @@ impl Sender {
131
134
v. retain ( |b| !self . verified . contains ( b. cert ( ) . data ( ) . num ( ) ) ) ;
132
135
} ;
133
136
137
+ let random_client = || {
138
+ clients
139
+ . choose ( & mut rand:: rng ( ) )
140
+ . expect ( "Vec<Client> is non-empty" )
141
+ } ;
142
+
134
143
let mut checkpoints = interval ( Duration :: from_secs ( 1 ) ) ;
135
144
checkpoints. set_missed_tick_behavior ( MissedTickBehavior :: Skip ) ;
136
145
@@ -186,9 +195,9 @@ impl Sender {
186
195
187
196
debug ! ( node = %self . label, blocks = %transaction. len( ) , %size, "submitting blocks" ) ;
188
197
189
- let mut delays = self . client . config ( ) . delay_iter ( ) ;
198
+ let mut delays = self . config . delay_iter ( ) ;
190
199
191
- while let Err ( err) = self . client . submit ( self . nsid , & transaction) . await {
200
+ while let Err ( err) = random_client ( ) . submit ( self . nsid , & transaction) . await {
192
201
warn ! ( node= %self . label, %err, "error submitting blocks" ) ;
193
202
let d = delays. next ( ) . expect ( "delay iterator repeats" ) ;
194
203
sleep ( d) . await ;
0 commit comments