@@ -73,7 +73,7 @@ use katana_provider_api::stage::StageCheckpointProvider;
7373use katana_provider_api:: ProviderError ;
7474use katana_stage:: { Stage , StageExecutionInput , StageExecutionOutput } ;
7575use tokio:: sync:: watch;
76- use tracing:: { debug, error, info, info_span, trace, Instrument } ;
76+ use tracing:: { debug, error, info, info_span, trace, Instrument , Span } ;
7777
7878/// The result of a pipeline execution.
7979pub type PipelineResult < T > = Result < T , Error > ;
@@ -254,13 +254,6 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
254254 continue ;
255255 }
256256 }
257-
258- debug ! ( target: "pipeline" , "Waiting for new command." ) ;
259-
260- // Wait for the next command
261- if self . command_rx . changed ( ) . await . is_err ( ) {
262- break ;
263- }
264257 }
265258
266259 info ! ( target: "pipeline" , "Pipeline finished." ) ;
@@ -317,27 +310,29 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
317310 let from = if checkpoint == 0 {
318311 checkpoint
319312 } else {
320- // plus 1 because the checkpoint is inclusive
313+ // plus 1 because the checkpoint is the last block processed, so we need to start from the next block
321314 checkpoint + 1
322315 } ;
323316
324317 let input = StageExecutionInput :: new ( from, to) ;
325- let span = info_span ! ( target: "pipeline" , "execute" , stage = %id, %from, %to) ;
318+ let span = info_span ! ( target: "pipeline" , "stage.execute" , stage = %id, %from, %to) ;
319+ let _guard = span. enter ( ) ;
326320
327- info ! ( target: "pipeline" , %id, %from, %to, "Executing stage." ) ;
321+ info ! ( target: "pipeline" , stage = %id, %from, %to, "[{}/{}] Executing stage." , i + 1 , last_stage_idx + 1 ) ;
328322
329323 let StageExecutionOutput { last_block_processed } =
330324 stage
331325 . execute ( & input)
332- . instrument ( span )
326+ . instrument ( Span :: current ( ) )
333327 . await
334328 . map_err ( |error| Error :: StageExecution { id, error } ) ?;
335329
330+ info ! ( target: "pipeline" , stage = %id, %from, %to, "Stage execution completed." ) ;
331+
336332
337333 self . provider . set_checkpoint ( id, last_block_processed) ?;
338334 last_block_processed_list. push ( last_block_processed) ;
339-
340- info ! ( target: "pipeline" , %id, %from, %to, "Stage execution completed." ) ;
335+ info ! ( target: "pipeline" , stage = %id, checkpoint = %to, "New checkpoint set." ) ;
341336 }
342337
343338 Ok ( last_block_processed_list. into_iter ( ) . min ( ) . unwrap_or ( to) )
0 commit comments