@@ -139,6 +139,9 @@ func Commit(chainId *big.Int) error {
139139 }
140140 log .Info ().Str ("max_block_number" , maxBlockNumber .String ()).Msg ("Retrieved max block number from ClickHouse" )
141141
142+ nextCommitBlockNumber := new (big.Int ).Add (maxBlockNumber , big .NewInt (1 ))
143+ log .Info ().Str ("next_commit_block" , nextCommitBlockNumber .String ()).Msg ("Starting producer-consumer processing" )
144+
142145 files , err := listS3ParquetFiles (chainId )
143146 if err != nil {
144147 log .Error ().Err (err ).Msg ("Failed to list S3 parquet files" )
@@ -154,48 +157,44 @@ func Commit(chainId *big.Int) error {
154157 log .Info ().Int ("filtered_ranges" , len (blockRanges )).Msg ("Filtered and sorted block ranges" )
155158
156159 // Check if there are any files to process
157- if len (blockRanges ) == 0 {
158- log .Info ().
159- Str ("next_commit_block" , new (big.Int ).Add (maxBlockNumber , big .NewInt (1 )).String ()).
160- Msg ("No files to process - all blocks are up to date" )
161- return nil
162- }
163-
164- nextCommitBlockNumber := new (big.Int ).Add (maxBlockNumber , big .NewInt (1 ))
165- log .Info ().Str ("next_commit_block" , nextCommitBlockNumber .String ()).Msg ("Starting producer-consumer processing" )
166-
167- // Start the block range processor goroutine
168- processorDone := make (chan struct {})
169- go func () {
170- blockRangeProcessor (nextCommitBlockNumber )
171- close (processorDone )
172- }()
160+ if len (blockRanges ) != 0 {
161+ // Start the block range processor goroutine
162+ processorDone := make (chan struct {})
163+ go func () {
164+ blockRangeProcessor (nextCommitBlockNumber )
165+ close (processorDone )
166+ }()
167+
168+ // Download files synchronously and send to channel
169+ for i , blockRange := range blockRanges {
170+ log .Info ().
171+ Int ("processing" , i + 1 ).
172+ Int ("total" , len (blockRanges )).
173+ Str ("file" , blockRange .S3Key ).
174+ Str ("start_block" , blockRange .StartBlock .String ()).
175+ Str ("end_block" , blockRange .EndBlock .String ()).
176+ Msg ("Starting download" )
173177
174- // Download files synchronously and send to channel
175- for i , blockRange := range blockRanges {
176- log .Info ().
177- Int ("processing" , i + 1 ).
178- Int ("total" , len (blockRanges )).
179- Str ("file" , blockRange .S3Key ).
180- Str ("start_block" , blockRange .StartBlock .String ()).
181- Str ("end_block" , blockRange .EndBlock .String ()).
182- Msg ("Starting download" )
178+ if err := downloadFile (& blockRange ); err != nil {
179+ log .Panic ().Err (err ).Str ("file" , blockRange .S3Key ).Msg ("Failed to download file" )
180+ }
183181
184- if err := downloadFile ( & blockRange ); err != nil {
185- log . Panic (). Err ( err ). Str ( "file" , blockRange . S3Key ). Msg ( "Failed to download file" )
182+ log . Debug (). Str ( "file" , blockRange . S3Key ). Msg ( "Download completed, sending to channel" )
183+ downloadComplete <- & blockRange
186184 }
187185
188- log .Debug ().Str ("file" , blockRange .S3Key ).Msg ("Download completed, sending to channel" )
189- downloadComplete <- & blockRange
186+ // Close channel to signal processor that all downloads are done
187+ log .Info ().Msg ("All downloads completed, waiting for processing to finish from S3" )
188+ close (downloadComplete )
189+ <- processorDone
190+ log .Info ().Msg ("All processing completed successfully from S3" )
191+ } else {
192+ log .Info ().
193+ Str ("next_commit_block" , nextCommitBlockNumber .String ()).
194+ Msg ("No files to process - all blocks are up to date from S3" )
190195 }
191196
192- // Close channel to signal processor that all downloads are done
193- log .Info ().Msg ("All downloads completed, waiting for processing to finish from S3" )
194- close (downloadComplete )
195- <- processorDone
196- log .Info ().Msg ("All processing completed successfully from S3" )
197-
198- log .Info ().Msg ("Fetching latest blocks" )
197+ log .Info ().Msg ("Consuming latest blocks from RPC" )
199198 fetchLatest (nextCommitBlockNumber )
200199
201200 return nil
@@ -451,11 +450,6 @@ func filterAndSortBlockRanges(files []string, maxBlockNumber *big.Int) ([]BlockR
451450
452451 // Skip files where end block is less than max block number from ClickHouse
453452 if endBlock .Cmp (maxBlockNumber ) <= 0 {
454- log .Debug ().
455- Str ("file" , file ).
456- Str ("end_block" , endBlock .String ()).
457- Str ("max_block" , maxBlockNumber .String ()).
458- Msg ("Skipping file - end block is less than or equal to max block" )
459453 continue
460454 }
461455
@@ -468,9 +462,11 @@ func filterAndSortBlockRanges(files []string, maxBlockNumber *big.Int) ([]BlockR
468462 }
469463
470464 // Sort by start block number in ascending order
471- sort .Slice (blockRanges , func (i , j int ) bool {
472- return blockRanges [i ].StartBlock .Cmp (blockRanges [j ].StartBlock ) < 0
473- })
465+ if len (blockRanges ) > 0 {
466+ sort .Slice (blockRanges , func (i , j int ) bool {
467+ return blockRanges [i ].StartBlock .Cmp (blockRanges [j ].StartBlock ) < 0
468+ })
469+ }
474470
475471 return blockRanges , nil
476472}
0 commit comments