Skip to content

Commit 9fc708c

Browse files
authored
Improve gap detection and handling in committer (#80)
### TL;DR Improved gap handling in the Committer to handle cases where the service crashes before managing to store a block failure for the block ### What changed? - Extracted gap handling logic into a new `handleGap` function. - Enhanced gap detection to store block failures for missing blocks. - Updated logging to provide more detailed information about detected gaps. - Modified the error message in case of a gap to include both expected and actual block numbers. ### How to test? 1. Simulate a scenario where there's a gap in block numbers. 2. Verify that the `handleGap` function is called and processes the gap correctly. 3. Check that block failures are stored for missing blocks. 4. Confirm that metrics are updated (GapCounter and MissedBlockNumbers). 5. Validate that the log messages provide accurate information about the detected gap. ### Why make this change? This change improves the robustness of the orchestrator by: 1. Providing better visibility into gaps in block data. 2. Ensuring that all missing blocks are properly recorded as failures. 3. Facilitating easier debugging and monitoring of gap occurrences. 4. Enhancing the system's ability to recover from and track data inconsistencies.
2 parents de8ab92 + 884586f commit 9fc708c

File tree

1 file changed

+47
-7
lines changed

1 file changed

+47
-7
lines changed

internal/orchestrator/committer.go

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
118118
})
119119

120120
if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
121-
// Note: we are missing block(s) in the beginning of the batch in staging, The Failure Recoverer will handle this
122-
// increment the a gap counter in prometheus
123-
metrics.GapCounter.Inc()
124-
// record the first missed block number in prometheus
125-
metrics.MissedBlockNumbers.Set(float64(blocksData[0].Block.Number.Int64()))
126-
return nil, fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", blocksData[0].Block.Number.String(), blocksToCommit[0].String())
121+
return nil, c.handleGap(blocksToCommit[0], blocksData[0].Block)
127122
}
128123

129124
var sequentialBlockData []common.BlockData
@@ -133,7 +128,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
133128
for i := 1; i < len(blocksData); i++ {
134129
if blocksData[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
135130
// Note: Gap detected, stop here
136-
log.Warn().Msgf("Gap detected at block %s, stopping commit", expectedBlockNumber.String())
131+
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), blocksData[i-1].Block.Number.String())
137132
// increment the a gap counter in prometheus
138133
metrics.GapCounter.Inc()
139134
// record the first missed block number in prometheus
@@ -234,3 +229,48 @@ func (c *Committer) saveDataToMainStorage(blockData []common.BlockData) error {
234229

235230
return nil
236231
}
232+
233+
func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
234+
// increment the a gap counter in prometheus
235+
metrics.GapCounter.Inc()
236+
// record the first missed block number in prometheus
237+
metrics.MissedBlockNumbers.Set(float64(expectedStartBlockNumber.Int64()))
238+
239+
missingBlockCount := new(big.Int).Sub(actualFirstBlock.Number, expectedStartBlockNumber).Int64()
240+
missingBlockNumbers := make([]*big.Int, missingBlockCount)
241+
for i := int64(0); i < missingBlockCount; i++ {
242+
missingBlockNumber := new(big.Int).Add(expectedStartBlockNumber, big.NewInt(i))
243+
missingBlockNumbers[i] = missingBlockNumber
244+
}
245+
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())
246+
247+
existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.ChainID})
248+
if err != nil {
249+
return fmt.Errorf("error getting block failures while handling gap: %v", err)
250+
}
251+
252+
existingBlockFailuresMap := make(map[string]*common.BlockFailure)
253+
for _, failure := range existingBlockFailures {
254+
blockNumberStr := failure.BlockNumber.String()
255+
existingBlockFailuresMap[blockNumberStr] = &failure
256+
}
257+
258+
blockFailures := make([]common.BlockFailure, 0)
259+
for _, blockNumber := range missingBlockNumbers {
260+
blockNumberStr := blockNumber.String()
261+
if _, ok := existingBlockFailuresMap[blockNumberStr]; !ok {
262+
blockFailures = append(blockFailures, common.BlockFailure{
263+
BlockNumber: blockNumber,
264+
ChainId: c.rpc.ChainID,
265+
FailureTime: time.Now(),
266+
FailureCount: 1,
267+
FailureReason: "Gap detected for this block",
268+
})
269+
}
270+
}
271+
log.Debug().Msgf("Storing %d block failures while handling gap", len(blockFailures))
272+
if err := c.storage.OrchestratorStorage.StoreBlockFailures(blockFailures); err != nil {
273+
return fmt.Errorf("error storing block failures while handling gap: %v", err)
274+
}
275+
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
276+
}

0 commit comments

Comments
 (0)