@@ -4,12 +4,23 @@ use std::iter::once;
4
4
use std:: sync:: Arc ;
5
5
6
6
use :: metrics:: prometheus:: PrometheusMetrics ;
7
- use anyhow:: Result ;
7
+ use alloy:: eips:: BlockNumberOrTag ;
8
+ use alloy:: providers:: { Provider , ProviderBuilder } ;
9
+ use alloy:: rpc:: types:: Filter ;
10
+ use alloy:: sol_types:: SolEvent ;
11
+ use alloy:: transports:: ws:: WsConnect ;
12
+ use anyhow:: { Result , anyhow} ;
13
+ use cliquenet:: AddressableCommittee ;
14
+ use futures:: StreamExt ;
8
15
use metrics:: TimeboostMetrics ;
9
- use multisig:: PublicKey ;
16
+ use multisig:: { Committee , PublicKey , x25519} ;
17
+ use sailfish:: types:: Timestamp ;
10
18
use timeboost_builder:: { Certifier , CertifierDown , Submitter } ;
19
+ use timeboost_contract:: CommitteeMemberSol ;
20
+ use timeboost_contract:: { KeyManager , KeyManager :: CommitteeCreated } ;
21
+ use timeboost_crypto:: prelude:: DkgEncKey ;
11
22
use timeboost_sequencer:: { Output , Sequencer } ;
12
- use timeboost_types:: BundleVariant ;
23
+ use timeboost_types:: { BundleVariant , ConsensusTime , KeyStore } ;
13
24
use tokio:: select;
14
25
use tokio:: sync:: mpsc:: { self , Receiver , Sender } ;
15
26
use tracing:: { info, warn} ;
@@ -87,6 +98,17 @@ impl Timeboost {
87
98
}
88
99
89
100
pub async fn go ( mut self ) -> Result < ( ) > {
101
+ // setup the websocket for contract event stream
102
+ let ws = WsConnect :: new ( self . config . chain_config . parent . ws_url . clone ( ) ) ;
103
+ // spawn the pubsub service (and backend) and the frontend is registered at the provider
104
+ let provider = ProviderBuilder :: new ( ) . connect_pubsub_with ( ws) . await ?;
105
+
106
+ let filter = Filter :: new ( )
107
+ . address ( self . config . chain_config . parent . key_manager_contract )
108
+ . event ( KeyManager :: CommitteeCreated :: SIGNATURE )
109
+ . from_block ( BlockNumberOrTag :: Finalized ) ;
110
+ let mut events = provider. subscribe_logs ( & filter) . await ?. into_stream ( ) ;
111
+
90
112
loop {
91
113
select ! {
92
114
trx = self . receiver. recv( ) => {
@@ -127,8 +149,81 @@ impl Timeboost {
127
149
let e: CertifierDown = e;
128
150
return Err ( e. into( ) )
129
151
}
152
+ } ,
153
+ res = events. next( ) => match res {
154
+ Some ( log) => {
155
+ let typed_log = log. log_decode_validate:: <CommitteeCreated >( ) ?;
156
+ let id = typed_log. data( ) . id;
157
+ let cur: u64 = self . config. key_store. committee( ) . id( ) . into( ) ;
158
+
159
+ if id == cur + 1 {
160
+ info!( node = %self . label, committee_id = %id, current = %cur, "setting next committee" ) ;
161
+ let ( t, a, k) = self . fetch_next_committee( & provider, id) . await ?;
162
+ self . sequencer. set_next_committee( t, a, k) . await ?;
163
+ } else {
164
+ warn!( node = %self . label, committee_id = %id, current = %cur, "ignored new CommitteeCreated event" ) ;
165
+ continue ;
166
+ }
167
+ } ,
168
+ None => {
169
+ warn!( node = %self . label, "event subscription stream ended" ) ;
170
+ return Err ( anyhow!( "contract event pubsub service prematurely shutdown" ) ) ;
171
+ }
130
172
}
131
173
}
132
174
}
133
175
}
176
+
177
+ /// Given the next committee is available on chain, fetch it and prepare it for `NextCommittee`
178
+ async fn fetch_next_committee (
179
+ & self ,
180
+ provider : impl Provider ,
181
+ next_committee_id : u64 ,
182
+ ) -> Result < ( ConsensusTime , AddressableCommittee , KeyStore ) > {
183
+ let contract = KeyManager :: new (
184
+ self . config . chain_config . parent . key_manager_contract ,
185
+ & provider,
186
+ ) ;
187
+ let c = contract. getCommitteeById ( next_committee_id) . call ( ) . await ?;
188
+ let members: Vec < CommitteeMemberSol > = c. members ;
189
+ let timestamp: Timestamp = c. effectiveTimestamp . into ( ) ;
190
+
191
+ let sailfish_peer_hosts_and_keys = members
192
+ . iter ( )
193
+ . map ( |peer| {
194
+ let sig_key = multisig:: PublicKey :: try_from ( peer. sigKey . as_ref ( ) ) ?;
195
+ let dh_key = x25519:: PublicKey :: try_from ( peer. dhKey . as_ref ( ) ) ?;
196
+ let sailfish_address = cliquenet:: Address :: try_from ( peer. networkAddress . as_ref ( ) ) ?;
197
+ Ok ( ( sig_key, dh_key, sailfish_address) )
198
+ } )
199
+ . collect :: < Result < Vec < _ > > > ( ) ?;
200
+ let dkg_enc_keys = members
201
+ . iter ( )
202
+ . map ( |peer| {
203
+ let dkg_enc_key = DkgEncKey :: from_bytes ( peer. dkgKey . as_ref ( ) ) ?;
204
+ Ok ( dkg_enc_key)
205
+ } )
206
+ . collect :: < Result < Vec < _ > > > ( ) ?;
207
+
208
+ let sailfish_committee = {
209
+ let c = Committee :: new (
210
+ next_committee_id,
211
+ sailfish_peer_hosts_and_keys
212
+ . iter ( )
213
+ . enumerate ( )
214
+ . map ( |( i, ( k, ..) ) | ( i as u8 , * k) ) ,
215
+ ) ;
216
+ AddressableCommittee :: new ( c, sailfish_peer_hosts_and_keys. iter ( ) . cloned ( ) )
217
+ } ;
218
+
219
+ let key_store = KeyStore :: new (
220
+ sailfish_committee. committee ( ) . clone ( ) ,
221
+ dkg_enc_keys
222
+ . into_iter ( )
223
+ . enumerate ( )
224
+ . map ( |( i, k) | ( i as u8 , k) ) ,
225
+ ) ;
226
+
227
+ Ok ( ( ConsensusTime ( timestamp) , sailfish_committee, key_store) )
228
+ }
134
229
}
0 commit comments