11use std:: sync:: Arc ;
22use std:: time:: Duration ;
33
4+ use anyhow:: { anyhow, Result } ;
45use katana_gateway_client:: Client ;
56use katana_gateway_types:: { ConfirmedTransaction , ErrorCode , PreConfirmedBlock , StateDiff } ;
67use katana_pipeline:: PipelineBlockSubscription ;
@@ -45,7 +46,13 @@ impl PreconfStateFactory {
4546 shared_preconf_block : shared_preconf_block. clone ( ) ,
4647 } ;
4748
48- tokio:: spawn ( async move { worker. run ( ) . await } ) ;
49+ tokio:: spawn ( async move {
50+ loop {
51+ if let Err ( error) = worker. run ( ) . await {
52+ error ! ( %error, "PreconfBlockWatcher returned with an error." ) ;
53+ }
54+ }
55+ } ) ;
4956
5057 Self { gateway_client, latest_synced_block, shared_preconf_block, storage_provider }
5158 }
@@ -75,12 +82,12 @@ impl PreconfStateFactory {
7582 . map ( |preconf_data| preconf_data. preconf_state_updates . clone ( ) )
7683 }
7784
78- pub fn block ( & self ) -> Option < PreConfirmedBlock > {
85+ pub fn block ( & self ) -> Option < ( BlockNumber , PreConfirmedBlock ) > {
7986 self . shared_preconf_block
8087 . inner
8188 . lock ( )
8289 . as_ref ( )
83- . map ( |preconf_data| preconf_data. preconf_block . clone ( ) )
90+ . map ( |preconf_data| ( preconf_data. preconf_block_id , preconf_data . preconf_block . clone ( ) ) )
8491 }
8592
8693 pub fn transactions ( & self ) -> Option < Vec < ConfirmedTransaction > > {
@@ -104,8 +111,9 @@ struct PreconfBlockData {
104111 preconf_state_updates : StateUpdates ,
105112}
106113
107- const DEFAULT_INTERVAL : Duration = Duration :: from_millis ( 500 ) ;
114+ const DEFAULT_INTERVAL : Duration = Duration :: from_millis ( 1000 ) ;
108115
116+ #[ derive( Debug ) ]
109117struct PreconfBlockWatcher {
110118 interval : Duration ,
111119 gateway_client : Client ,
@@ -120,7 +128,7 @@ struct PreconfBlockWatcher {
120128}
121129
122130impl PreconfBlockWatcher {
123- async fn run ( & mut self ) {
131+ async fn run ( & mut self ) -> Result < ( ) > {
124132 let mut current_preconf_block_num =
125133 self . latest_synced_block . block ( ) . map ( |b| b + 1 ) . unwrap_or ( 0 ) ;
126134
@@ -160,32 +168,28 @@ impl PreconfBlockWatcher {
160168 // chain's tip, in which case we just skip to the next
161169 // iteration.
162170 Err ( katana_gateway_client:: Error :: Sequencer ( error) )
163- if error. code == ErrorCode :: BlockNotFound =>
164- {
165- continue
166- }
171+ if error. code == ErrorCode :: BlockNotFound => { }
167172
168- Err ( err) => panic ! ( "{err}" ) ,
173+ Err ( err) => return Err ( anyhow ! ( err) ) ,
174+ }
175+ } else {
176+ if let Err ( err) = self . latest_synced_block . changed ( ) . await {
177+ error ! ( error = ?err, "Error receiving latest block number." ) ;
178+ break ;
169179 }
170- }
171-
172- tokio:: select! {
173- biased;
174180
175- res = self . latest_synced_block. changed( ) => {
176- if let Err ( err) = res {
177- error!( error = ?err, "Error receiving latest block number." ) ;
178- break ;
179- }
181+ // reset preconf state
182+ * self . shared_preconf_block . inner . lock ( ) = None ;
180183
181- let latest_synced_block_num = self . latest_synced_block. block( ) . unwrap( ) ;
182- current_preconf_block_num = latest_synced_block_num + 1 ;
183- }
184+ let latest_synced_block_num = self . latest_synced_block . block ( ) . unwrap_or ( 0 ) ;
185+ current_preconf_block_num = latest_synced_block_num + 1 ;
184186
185- _ = tokio:: time:: sleep( self . interval) => {
186- current_preconf_block_num += 1 ;
187- }
187+ continue ;
188188 }
189+
190+ tokio:: time:: sleep ( self . interval ) . await
189191 }
192+
193+ Ok ( ( ) )
190194 }
191195}
0 commit comments