Skip to content

Commit 3776405

Browse files
committed
find all reorgs in current batch
1 parent a6a7a7b commit 3776405

File tree

2 files changed

+83
-53
lines changed

2 files changed

+83
-53
lines changed

internal/orchestrator/reorg_handler.go

Lines changed: 82 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package orchestrator
33
import (
44
"fmt"
55
"math/big"
6+
"sync"
67
"time"
78

89
"github.com/rs/zerolog/log"
@@ -73,7 +74,7 @@ func (rh *ReorgHandler) Start() {
7374
for range ticker.C {
7475
mostRecentBlockChecked, err := rh.RunFromBlock(rh.lastCheckedBlock)
7576
if err != nil {
76-
log.Error().Err(err).Msg("Error during reorg handling")
77+
log.Error().Err(err).Msgf("Error during reorg handling: %s", err.Error())
7778
continue
7879
}
7980
if mostRecentBlockChecked == nil {
@@ -107,98 +108,126 @@ func (rh *ReorgHandler) RunFromBlock(fromBlock *big.Int) (lastCheckedBlock *big.
107108
log.Debug().Msgf("Most recent (%s) and last checked (%s) block numbers are equal, skipping reorg check", mostRecentBlockHeader.Number.String(), lastBlockHeader.Number.String())
108109
return nil, nil
109110
}
110-
reorgEndIndex := findReorgEndIndex(blockHeaders)
111-
if reorgEndIndex == -1 {
111+
112+
firstMismatchIndex, err := findIndexOfFirstHashMismatch(blockHeaders)
113+
if err != nil {
114+
return nil, fmt.Errorf("error detecting reorgs: %w", err)
115+
}
116+
if firstMismatchIndex == -1 {
117+
log.Debug().Msgf("No reorg detected, most recent block number checked: %s", mostRecentBlockHeader.Number.String())
112118
return mostRecentBlockHeader.Number, nil
113119
}
120+
114121
metrics.ReorgCounter.Inc()
115-
forkPoint, err := rh.findFirstForkedBlockNumber(blockHeaders[reorgEndIndex:])
122+
reorgedBlockNumbers := make([]*big.Int, 0)
123+
err = rh.findReorgedBlockNumbers(blockHeaders[firstMismatchIndex:], &reorgedBlockNumbers)
116124
if err != nil {
117-
return nil, fmt.Errorf("error while finding fork point: %w", err)
125+
return nil, fmt.Errorf("error finding reorged block numbers: %w", err)
126+
}
127+
128+
if len(reorgedBlockNumbers) == 0 {
129+
log.Debug().Msgf("Reorg was detected, but no reorged block numbers found, most recent block number checked: %s", mostRecentBlockHeader.Number.String())
130+
return mostRecentBlockHeader.Number, nil
118131
}
119-
reorgEndBlock := blockHeaders[reorgEndIndex].Number
120-
err = rh.handleReorg(forkPoint, reorgEndBlock)
132+
133+
err = rh.handleReorg(reorgedBlockNumbers)
121134
if err != nil {
122135
return nil, fmt.Errorf("error while handling reorg: %w", err)
123136
}
124137
return mostRecentBlockHeader.Number, nil
125138
}
126139

127-
func findReorgEndIndex(blockHeadersDescending []common.BlockHeader) (index int) {
140+
func findIndexOfFirstHashMismatch(blockHeadersDescending []common.BlockHeader) (int, error) {
128141
for i := 0; i < len(blockHeadersDescending)-1; i++ {
129142
currentBlock := blockHeadersDescending[i]
130143
previousBlockInChain := blockHeadersDescending[i+1]
131-
132144
if currentBlock.Number.Cmp(previousBlockInChain.Number) == 0 { // unmerged block
133145
continue
134146
}
147+
if currentBlock.Number.Cmp(new(big.Int).Add(previousBlockInChain.Number, big.NewInt(1))) != 0 {
148+
return -1, fmt.Errorf("block headers are not sequential - cannot proceed with detecting reorgs. Comparing blocks: %s and %s", currentBlock.Number.String(), previousBlockInChain.Number.String())
149+
}
135150
if currentBlock.ParentHash != previousBlockInChain.Hash {
136-
log.Debug().
137-
Str("currentBlockNumber", currentBlock.Number.String()).
138-
Str("currentBlockHash", currentBlock.Hash).
139-
Str("currentBlockParentHash", currentBlock.ParentHash).
140-
Str("previousBlockNumber", previousBlockInChain.Number.String()).
141-
Str("previousBlockHash", previousBlockInChain.Hash).
142-
Msg("Reorg detected: parent hash mismatch")
143-
return i + 1
151+
return i + 1, nil
144152
}
145153
}
146-
return -1
154+
return -1, nil
147155
}
148156

149-
func (rh *ReorgHandler) findFirstForkedBlockNumber(reversedBlockHeaders []common.BlockHeader) (forkPoint *big.Int, err error) {
150-
newBlocksByNumber, err := rh.getNewBlocksByNumber(reversedBlockHeaders)
157+
func (rh *ReorgHandler) findReorgedBlockNumbers(blockHeadersDescending []common.BlockHeader, reorgedBlockNumbers *[]*big.Int) error {
158+
newBlocksByNumber, err := rh.getNewBlocksByNumber(blockHeadersDescending)
151159
if err != nil {
152-
return nil, err
160+
return err
153161
}
154-
155-
for i := 0; i < len(reversedBlockHeaders); i++ {
156-
blockHeader := reversedBlockHeaders[i]
157-
block, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
162+
continueCheckingForReorgs := false
163+
for i := 0; i < len(blockHeadersDescending); i++ {
164+
blockHeader := blockHeadersDescending[i]
165+
fetchedBlock, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
158166
if !ok {
159-
return nil, fmt.Errorf("block not found: %s", blockHeader.Number.String())
167+
return fmt.Errorf("block not found: %s", blockHeader.Number.String())
160168
}
161-
if blockHeader.ParentHash == block.ParentHash && blockHeader.Hash == block.Hash {
162-
if i == 0 {
163-
return nil, fmt.Errorf("unable to find reorg fork point due to block %s being first in the array", blockHeader.Number.String())
169+
if blockHeader.ParentHash != fetchedBlock.ParentHash || blockHeader.Hash != fetchedBlock.Hash {
170+
*reorgedBlockNumbers = append(*reorgedBlockNumbers, blockHeader.Number)
171+
if i == len(blockHeadersDescending)-1 {
172+
continueCheckingForReorgs = true // if last block in range is reorged, we should continue checking
164173
}
165-
previousBlock := reversedBlockHeaders[i-1]
166-
return previousBlock.Number, nil
167174
}
168175
}
169-
fetchUntilBlock := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number
170-
fetchFromBlock := new(big.Int).Sub(fetchUntilBlock, big.NewInt(int64(rh.blocksPerScan)))
171-
nextHeadersBatch, err := rh.storage.MainStorage.GetBlockHeadersDescending(rh.rpc.GetChainID(), fetchFromBlock, fetchUntilBlock)
172-
if err != nil {
173-
return nil, fmt.Errorf("error getting next headers batch: %w", err)
176+
if continueCheckingForReorgs {
177+
fetchUntilBlock := blockHeadersDescending[len(blockHeadersDescending)-1].Number
178+
fetchFromBlock := new(big.Int).Sub(fetchUntilBlock, big.NewInt(int64(rh.blocksPerScan)))
179+
nextHeadersBatch, err := rh.storage.MainStorage.GetBlockHeadersDescending(rh.rpc.GetChainID(), fetchFromBlock, fetchUntilBlock)
180+
if err != nil {
181+
return fmt.Errorf("error getting next headers batch: %w", err)
182+
}
183+
return rh.findReorgedBlockNumbers(nextHeadersBatch, reorgedBlockNumbers)
174184
}
175-
return rh.findFirstForkedBlockNumber(nextHeadersBatch)
185+
return nil
176186
}
177187

178-
func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
179-
blockNumbers := make([]*big.Int, 0, len(reversedBlockHeaders))
180-
for _, header := range reversedBlockHeaders {
188+
func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
189+
blockNumbers := make([]*big.Int, 0, len(blockHeaders))
190+
for _, header := range blockHeaders {
181191
blockNumbers = append(blockNumbers, header.Number)
182192
}
183-
blockResults := rh.rpc.GetBlocks(blockNumbers)
193+
blockCount := len(blockNumbers)
194+
chunks := common.BigIntSliceToChunks(blockNumbers, rh.rpc.GetBlocksPerRequest().Blocks)
195+
196+
var wg sync.WaitGroup
197+
resultsCh := make(chan []rpc.GetBlocksResult, len(chunks))
198+
199+
// TODO: move batching to rpc
200+
log.Debug().Msgf("Reorg handler fetching %d blocks in %d chunks of max %d blocks", blockCount, len(chunks), rh.rpc.GetBlocksPerRequest().Blocks)
201+
for _, chunk := range chunks {
202+
wg.Add(1)
203+
go func(chunk []*big.Int) {
204+
defer wg.Done()
205+
resultsCh <- rh.rpc.GetBlocks(chunk)
206+
if config.Cfg.RPC.Blocks.BatchDelay > 0 {
207+
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
208+
}
209+
}(chunk)
210+
}
211+
go func() {
212+
wg.Wait()
213+
close(resultsCh)
214+
}()
215+
184216
fetchedBlocksByNumber := make(map[string]common.Block)
185-
for _, blockResult := range blockResults {
186-
if blockResult.Error != nil {
187-
return nil, fmt.Errorf("error fetching block %s: %w", blockResult.BlockNumber.String(), blockResult.Error)
217+
for batchResults := range resultsCh {
218+
for _, blockResult := range batchResults {
219+
if blockResult.Error != nil {
220+
return nil, fmt.Errorf("error fetching block %s: %w", blockResult.BlockNumber.String(), blockResult.Error)
221+
}
222+
fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data
188223
}
189-
fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data
190224
}
191225
return &fetchedBlocksByNumber, nil
192226
}
193227

194-
func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) error {
195-
log.Debug().Msgf("Handling reorg from block %s to %s", reorgStart.String(), reorgEnd.String())
196-
blockRange := make([]*big.Int, 0, new(big.Int).Sub(reorgEnd, reorgStart).Int64())
197-
for i := new(big.Int).Set(reorgStart); i.Cmp(reorgEnd) <= 0; i.Add(i, big.NewInt(1)) {
198-
blockRange = append(blockRange, new(big.Int).Set(i))
199-
}
200-
201-
results := rh.worker.Run(blockRange)
228+
func (rh *ReorgHandler) handleReorg(reorgedBlockNumbers []*big.Int) error {
229+
log.Debug().Msgf("Handling reorg for blocks %v", reorgedBlockNumbers)
230+
results := rh.worker.Run(reorgedBlockNumbers)
202231
data := make([]common.BlockData, 0, len(results))
203232
blocksToDelete := make([]*big.Int, 0, len(results))
204233
for _, result := range results {

internal/worker/worker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ func NewWorker(rpc rpc.IRPCClient) *Worker {
2222
}
2323
}
2424

25+
// TODO: move batching to rpc
2526
func (w *Worker) Run(blockNumbers []*big.Int) []rpc.GetFullBlockResult {
2627
blockCount := len(blockNumbers)
2728
chunks := common.BigIntSliceToChunks(blockNumbers, w.rpc.GetBlocksPerRequest().Blocks)

0 commit comments

Comments
 (0)