@@ -105,7 +105,7 @@ func (c *Committer) Start(ctx context.Context) {
105105 }
106106}
107107
108- func (c * Committer ) getBlockNumbersToCommit () ([]* big.Int , error ) {
108+ func (c * Committer ) getBlockNumbersToCommit (ctx context. Context ) ([]* big.Int , error ) {
109109 startTime := time .Now ()
110110 defer func () {
111111 log .Debug ().Str ("metric" , "get_block_numbers_to_commit_duration" ).Msgf ("getBlockNumbersToCommit duration: %f" , time .Since (startTime ).Seconds ())
@@ -129,7 +129,10 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
129129 }
130130
131131 startBlock := new (big.Int ).Add (latestCommittedBlockNumber , big .NewInt (1 ))
132- endBlock := new (big.Int ).Add (latestCommittedBlockNumber , big .NewInt (int64 (c .blocksPerCommit )))
132+ endBlock , err := c .getBlockToCommitUntil (ctx , latestCommittedBlockNumber )
133+ if err != nil {
134+ return nil , fmt .Errorf ("error getting block to commit until: %v" , err )
135+ }
133136
134137 blockCount := new (big.Int ).Sub (endBlock , startBlock ).Int64 () + 1
135138 blockNumbers := make ([]* big.Int , blockCount )
@@ -140,26 +143,61 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
140143 return blockNumbers , nil
141144}
142145
146+ func (c * Committer ) getBlockToCommitUntil (ctx context.Context , latestCommittedBlockNumber * big.Int ) (* big.Int , error ) {
147+ untilBlock := new (big.Int ).Add (latestCommittedBlockNumber , big .NewInt (int64 (c .blocksPerCommit )))
148+ if c .workMode == WorkModeBackfill {
149+ return untilBlock , nil
150+ } else {
151+ // get latest block from RPC and if that's less than until block, return that
152+ latestBlock , err := c .rpc .GetLatestBlockNumber (ctx )
153+ if err != nil {
154+ return nil , fmt .Errorf ("error getting latest block from RPC: %v" , err )
155+ }
156+ if latestBlock .Cmp (untilBlock ) < 0 {
157+ log .Debug ().Msgf ("Committing until latest block: %s" , latestBlock .String ())
158+ return latestBlock , nil
159+ }
160+ return untilBlock , nil
161+ }
162+ }
163+
164+ func (c * Committer ) fetchBlockDataToCommit (ctx context.Context , blockNumbers []* big.Int ) ([]common.BlockData , error ) {
165+ if c .workMode == WorkModeBackfill {
166+ startTime := time .Now ()
167+ blocksData , err := c .storage .StagingStorage .GetStagingData (storage.QueryFilter {BlockNumbers : blockNumbers , ChainId : c .rpc .GetChainID ()})
168+ log .Debug ().Str ("metric" , "get_staging_data_duration" ).Msgf ("StagingStorage.GetStagingData duration: %f" , time .Since (startTime ).Seconds ())
169+ metrics .GetStagingDataDuration .Observe (time .Since (startTime ).Seconds ())
170+
171+ if err != nil {
172+ return nil , fmt .Errorf ("error fetching blocks to commit: %v" , err )
173+ }
174+ if len (blocksData ) == 0 {
175+ log .Warn ().Msgf ("Committer didn't find the following range in staging: %v - %v" , blockNumbers [0 ].Int64 (), blockNumbers [len (blockNumbers )- 1 ].Int64 ())
176+ c .handleMissingStagingData (ctx , blockNumbers )
177+ return nil , nil
178+ }
179+ return blocksData , nil
180+ } else {
181+ poller := NewBoundlessPoller (c .rpc , c .storage )
182+ blocksData , _ := poller .PollWithoutSaving (ctx , blockNumbers )
183+ return blocksData , nil
184+ }
185+ }
186+
143187func (c * Committer ) getSequentialBlockDataToCommit (ctx context.Context ) ([]common.BlockData , error ) {
144- blocksToCommit , err := c .getBlockNumbersToCommit ()
188+ blocksToCommit , err := c .getBlockNumbersToCommit (ctx )
145189 if err != nil {
146190 return nil , fmt .Errorf ("error determining blocks to commit: %v" , err )
147191 }
148192 if len (blocksToCommit ) == 0 {
149193 return nil , nil
150194 }
151195
152- startTime := time .Now ()
153- blocksData , err := c .storage .StagingStorage .GetStagingData (storage.QueryFilter {BlockNumbers : blocksToCommit , ChainId : c .rpc .GetChainID ()})
154- log .Debug ().Str ("metric" , "get_staging_data_duration" ).Msgf ("StagingStorage.GetStagingData duration: %f" , time .Since (startTime ).Seconds ())
155- metrics .GetStagingDataDuration .Observe (time .Since (startTime ).Seconds ())
156-
196+ blocksData , err := c .fetchBlockDataToCommit (ctx , blocksToCommit )
157197 if err != nil {
158- return nil , fmt . Errorf ( "error fetching blocks to commit: %v" , err )
198+ return nil , err
159199 }
160200 if len (blocksData ) == 0 {
161- log .Warn ().Msgf ("Committer didn't find the following range in staging: %v - %v" , blocksToCommit [0 ].Int64 (), blocksToCommit [len (blocksToCommit )- 1 ].Int64 ())
162- c .handleMissingStagingData (ctx , blocksToCommit )
163201 return nil , nil
164202 }
165203
@@ -250,6 +288,11 @@ func (c *Committer) handleGap(ctx context.Context, expectedStartBlockNumber *big
250288 // record the first missed block number in prometheus
251289 metrics .MissedBlockNumbers .Set (float64 (expectedStartBlockNumber .Int64 ()))
252290
291+ if c .workMode == WorkModeLive {
292+ log .Debug ().Msgf ("Skipping gap handling in live mode. Expected block %s, actual first block %s" , expectedStartBlockNumber .String (), actualFirstBlock .Number .String ())
293+ return nil
294+ }
295+
253296 poller := NewBoundlessPoller (c .rpc , c .storage )
254297
255298 missingBlockCount := new (big.Int ).Sub (actualFirstBlock .Number , expectedStartBlockNumber ).Int64 ()
0 commit comments