Skip to content

Commit 8f86e1f

Browse files
committed
find all reorgs in current batch
1 parent a6a7a7b commit 8f86e1f

File tree

3 files changed

+272
-87
lines changed

3 files changed

+272
-87
lines changed

internal/orchestrator/reorg_handler.go

Lines changed: 86 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package orchestrator
33
import (
44
"fmt"
55
"math/big"
6+
"sort"
7+
"sync"
68
"time"
79

810
"github.com/rs/zerolog/log"
@@ -73,7 +75,7 @@ func (rh *ReorgHandler) Start() {
7375
for range ticker.C {
7476
mostRecentBlockChecked, err := rh.RunFromBlock(rh.lastCheckedBlock)
7577
if err != nil {
76-
log.Error().Err(err).Msg("Error during reorg handling")
78+
log.Error().Err(err).Msgf("Error during reorg handling: %s", err.Error())
7779
continue
7880
}
7981
if mostRecentBlockChecked == nil {
@@ -107,98 +109,129 @@ func (rh *ReorgHandler) RunFromBlock(fromBlock *big.Int) (lastCheckedBlock *big.
107109
log.Debug().Msgf("Most recent (%s) and last checked (%s) block numbers are equal, skipping reorg check", mostRecentBlockHeader.Number.String(), lastBlockHeader.Number.String())
108110
return nil, nil
109111
}
110-
reorgEndIndex := findReorgEndIndex(blockHeaders)
111-
if reorgEndIndex == -1 {
112+
113+
firstMismatchIndex, err := findIndexOfFirstHashMismatch(blockHeaders)
114+
if err != nil {
115+
return nil, fmt.Errorf("error detecting reorgs: %w", err)
116+
}
117+
if firstMismatchIndex == -1 {
118+
log.Debug().Msgf("No reorg detected, most recent block number checked: %s", mostRecentBlockHeader.Number.String())
112119
return mostRecentBlockHeader.Number, nil
113120
}
121+
114122
metrics.ReorgCounter.Inc()
115-
forkPoint, err := rh.findFirstForkedBlockNumber(blockHeaders[reorgEndIndex:])
123+
reorgedBlockNumbers := make([]*big.Int, 0)
124+
err = rh.findReorgedBlockNumbers(blockHeaders[firstMismatchIndex:], &reorgedBlockNumbers)
116125
if err != nil {
117-
return nil, fmt.Errorf("error while finding fork point: %w", err)
126+
return nil, fmt.Errorf("error finding reorged block numbers: %w", err)
127+
}
128+
129+
if len(reorgedBlockNumbers) == 0 {
130+
log.Debug().Msgf("Reorg was detected, but no reorged block numbers found, most recent block number checked: %s", mostRecentBlockHeader.Number.String())
131+
return mostRecentBlockHeader.Number, nil
118132
}
119-
reorgEndBlock := blockHeaders[reorgEndIndex].Number
120-
err = rh.handleReorg(forkPoint, reorgEndBlock)
133+
134+
err = rh.handleReorg(reorgedBlockNumbers)
121135
if err != nil {
122136
return nil, fmt.Errorf("error while handling reorg: %w", err)
123137
}
124138
return mostRecentBlockHeader.Number, nil
125139
}
126140

127-
func findReorgEndIndex(blockHeadersDescending []common.BlockHeader) (index int) {
141+
func findIndexOfFirstHashMismatch(blockHeadersDescending []common.BlockHeader) (int, error) {
128142
for i := 0; i < len(blockHeadersDescending)-1; i++ {
129143
currentBlock := blockHeadersDescending[i]
130144
previousBlockInChain := blockHeadersDescending[i+1]
131-
132145
if currentBlock.Number.Cmp(previousBlockInChain.Number) == 0 { // unmerged block
133146
continue
134147
}
148+
if currentBlock.Number.Cmp(new(big.Int).Add(previousBlockInChain.Number, big.NewInt(1))) != 0 {
149+
return -1, fmt.Errorf("block headers are not sequential - cannot proceed with detecting reorgs. Comparing blocks: %s and %s", currentBlock.Number.String(), previousBlockInChain.Number.String())
150+
}
135151
if currentBlock.ParentHash != previousBlockInChain.Hash {
136-
log.Debug().
137-
Str("currentBlockNumber", currentBlock.Number.String()).
138-
Str("currentBlockHash", currentBlock.Hash).
139-
Str("currentBlockParentHash", currentBlock.ParentHash).
140-
Str("previousBlockNumber", previousBlockInChain.Number.String()).
141-
Str("previousBlockHash", previousBlockInChain.Hash).
142-
Msg("Reorg detected: parent hash mismatch")
143-
return i + 1
152+
return i + 1, nil
144153
}
145154
}
146-
return -1
155+
return -1, nil
147156
}
148157

149-
func (rh *ReorgHandler) findFirstForkedBlockNumber(reversedBlockHeaders []common.BlockHeader) (forkPoint *big.Int, err error) {
150-
newBlocksByNumber, err := rh.getNewBlocksByNumber(reversedBlockHeaders)
158+
func (rh *ReorgHandler) findReorgedBlockNumbers(blockHeadersDescending []common.BlockHeader, reorgedBlockNumbers *[]*big.Int) error {
159+
newBlocksByNumber, err := rh.getNewBlocksByNumber(blockHeadersDescending)
151160
if err != nil {
152-
return nil, err
161+
return err
153162
}
154-
155-
for i := 0; i < len(reversedBlockHeaders); i++ {
156-
blockHeader := reversedBlockHeaders[i]
157-
block, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
163+
continueCheckingForReorgs := false
164+
for i := 0; i < len(blockHeadersDescending); i++ {
165+
blockHeader := blockHeadersDescending[i]
166+
fetchedBlock, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
158167
if !ok {
159-
return nil, fmt.Errorf("block not found: %s", blockHeader.Number.String())
168+
return fmt.Errorf("block not found: %s", blockHeader.Number.String())
160169
}
161-
if blockHeader.ParentHash == block.ParentHash && blockHeader.Hash == block.Hash {
162-
if i == 0 {
163-
return nil, fmt.Errorf("unable to find reorg fork point due to block %s being first in the array", blockHeader.Number.String())
170+
if blockHeader.ParentHash != fetchedBlock.ParentHash || blockHeader.Hash != fetchedBlock.Hash {
171+
*reorgedBlockNumbers = append(*reorgedBlockNumbers, blockHeader.Number)
172+
if i == len(blockHeadersDescending)-1 {
173+
continueCheckingForReorgs = true // if last block in range is reorged, we should continue checking
164174
}
165-
previousBlock := reversedBlockHeaders[i-1]
166-
return previousBlock.Number, nil
167175
}
168176
}
169-
fetchUntilBlock := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number
170-
fetchFromBlock := new(big.Int).Sub(fetchUntilBlock, big.NewInt(int64(rh.blocksPerScan)))
171-
nextHeadersBatch, err := rh.storage.MainStorage.GetBlockHeadersDescending(rh.rpc.GetChainID(), fetchFromBlock, fetchUntilBlock)
172-
if err != nil {
173-
return nil, fmt.Errorf("error getting next headers batch: %w", err)
177+
if continueCheckingForReorgs {
178+
fetchUntilBlock := blockHeadersDescending[len(blockHeadersDescending)-1].Number
179+
fetchFromBlock := new(big.Int).Sub(fetchUntilBlock, big.NewInt(int64(rh.blocksPerScan)))
180+
nextHeadersBatch, err := rh.storage.MainStorage.GetBlockHeadersDescending(rh.rpc.GetChainID(), fetchFromBlock, new(big.Int).Sub(fetchUntilBlock, big.NewInt(1))) // we sub 1 to not check the last block again
181+
if err != nil {
182+
return fmt.Errorf("error getting next headers batch: %w", err)
183+
}
184+
sort.Slice(nextHeadersBatch, func(i, j int) bool {
185+
return nextHeadersBatch[i].Number.Cmp(nextHeadersBatch[j].Number) > 0
186+
})
187+
return rh.findReorgedBlockNumbers(nextHeadersBatch, reorgedBlockNumbers)
174188
}
175-
return rh.findFirstForkedBlockNumber(nextHeadersBatch)
189+
return nil
176190
}
177191

178-
func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
179-
blockNumbers := make([]*big.Int, 0, len(reversedBlockHeaders))
180-
for _, header := range reversedBlockHeaders {
192+
func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
193+
blockNumbers := make([]*big.Int, 0, len(blockHeaders))
194+
for _, header := range blockHeaders {
181195
blockNumbers = append(blockNumbers, header.Number)
182196
}
183-
blockResults := rh.rpc.GetBlocks(blockNumbers)
197+
blockCount := len(blockNumbers)
198+
chunks := common.BigIntSliceToChunks(blockNumbers, rh.rpc.GetBlocksPerRequest().Blocks)
199+
200+
var wg sync.WaitGroup
201+
resultsCh := make(chan []rpc.GetBlocksResult, len(chunks))
202+
203+
// TODO: move batching to rpc
204+
log.Debug().Msgf("Reorg handler fetching %d blocks in %d chunks of max %d blocks", blockCount, len(chunks), rh.rpc.GetBlocksPerRequest().Blocks)
205+
for _, chunk := range chunks {
206+
wg.Add(1)
207+
go func(chunk []*big.Int) {
208+
defer wg.Done()
209+
resultsCh <- rh.rpc.GetBlocks(chunk)
210+
if config.Cfg.RPC.Blocks.BatchDelay > 0 {
211+
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
212+
}
213+
}(chunk)
214+
}
215+
go func() {
216+
wg.Wait()
217+
close(resultsCh)
218+
}()
219+
184220
fetchedBlocksByNumber := make(map[string]common.Block)
185-
for _, blockResult := range blockResults {
186-
if blockResult.Error != nil {
187-
return nil, fmt.Errorf("error fetching block %s: %w", blockResult.BlockNumber.String(), blockResult.Error)
221+
for batchResults := range resultsCh {
222+
for _, blockResult := range batchResults {
223+
if blockResult.Error != nil {
224+
return nil, fmt.Errorf("error fetching block %s: %w", blockResult.BlockNumber.String(), blockResult.Error)
225+
}
226+
fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data
188227
}
189-
fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data
190228
}
191229
return &fetchedBlocksByNumber, nil
192230
}
193231

194-
func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) error {
195-
log.Debug().Msgf("Handling reorg from block %s to %s", reorgStart.String(), reorgEnd.String())
196-
blockRange := make([]*big.Int, 0, new(big.Int).Sub(reorgEnd, reorgStart).Int64())
197-
for i := new(big.Int).Set(reorgStart); i.Cmp(reorgEnd) <= 0; i.Add(i, big.NewInt(1)) {
198-
blockRange = append(blockRange, new(big.Int).Set(i))
199-
}
200-
201-
results := rh.worker.Run(blockRange)
232+
func (rh *ReorgHandler) handleReorg(reorgedBlockNumbers []*big.Int) error {
233+
log.Debug().Msgf("Handling reorg for blocks %v", reorgedBlockNumbers)
234+
results := rh.worker.Run(reorgedBlockNumbers)
202235
data := make([]common.BlockData, 0, len(results))
203236
blocksToDelete := make([]*big.Int, 0, len(results))
204237
for _, result := range results {

0 commit comments

Comments
 (0)