Skip to content

Commit f2005cc

Browse files
committed
use inserts instead of lightweight deletes to remove reorged data
1 parent 9538ef4 commit f2005cc

File tree

10 files changed

+394
-437
lines changed

10 files changed

+394
-437
lines changed

internal/common/utils.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ var allowedFunctions = map[string]struct{}{
173173
"toStartOfDay": {},
174174
"toDate": {},
175175
"concat": {},
176+
"in": {},
177+
"IN": {},
176178
}
177179

178180
var disallowedPatterns = []string{

internal/orchestrator/reorg_handler.go

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,7 @@ func (rh *ReorgHandler) Start() {
7171
log.Debug().Msgf("Reorg handler running")
7272
go func() {
7373
for range ticker.C {
74-
// need to include lastCheckedBlock to check if next block's parent matches
75-
lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan-1)))
76-
mostRecentBlockChecked, err := rh.RunFromBlock(lookbackFrom)
74+
mostRecentBlockChecked, err := rh.RunFromBlock(rh.lastCheckedBlock)
7775
if err != nil {
7876
log.Error().Err(err).Msg("Error during reorg handling")
7977
continue
@@ -92,8 +90,10 @@ func (rh *ReorgHandler) Start() {
9290
select {}
9391
}
9492

95-
func (rh *ReorgHandler) RunFromBlock(lookbackFrom *big.Int) (lastCheckedBlock *big.Int, err error) {
96-
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.GetChainID(), rh.blocksPerScan, lookbackFrom)
93+
func (rh *ReorgHandler) RunFromBlock(fromBlock *big.Int) (lastCheckedBlock *big.Int, err error) {
94+
toBlock := new(big.Int).Add(fromBlock, big.NewInt(int64(rh.blocksPerScan)))
95+
log.Debug().Msgf("Checking for reorgs from block %s to %s", fromBlock.String(), toBlock.String())
96+
blockHeaders, err := rh.storage.MainStorage.GetBlockHeadersDescending(rh.rpc.GetChainID(), fromBlock, toBlock)
9797
if err != nil {
9898
return nil, fmt.Errorf("error getting recent block headers: %w", err)
9999
}
@@ -107,41 +107,40 @@ func (rh *ReorgHandler) RunFromBlock(lookbackFrom *big.Int) (lastCheckedBlock *b
107107
log.Debug().Msgf("Most recent (%s) and last checked (%s) block numbers are equal, skipping reorg check", mostRecentBlockHeader.Number.String(), lastBlockHeader.Number.String())
108108
return nil, nil
109109
}
110-
log.Debug().Msgf("Checking for reorgs from block %s to %s", mostRecentBlockHeader.Number.String(), lastBlockHeader.Number.String())
111110
reorgEndIndex := findReorgEndIndex(blockHeaders)
112111
if reorgEndIndex == -1 {
113112
return mostRecentBlockHeader.Number, nil
114113
}
115-
reorgEndBlock := blockHeaders[reorgEndIndex].Number
116114
metrics.ReorgCounter.Inc()
117115
forkPoint, err := rh.findFirstForkedBlockNumber(blockHeaders[reorgEndIndex:])
118116
if err != nil {
119117
return nil, fmt.Errorf("error while finding fork point: %w", err)
120118
}
119+
reorgEndBlock := blockHeaders[reorgEndIndex].Number
121120
err = rh.handleReorg(forkPoint, reorgEndBlock)
122121
if err != nil {
123122
return nil, fmt.Errorf("error while handling reorg: %w", err)
124123
}
125124
return mostRecentBlockHeader.Number, nil
126125
}
127126

128-
func findReorgEndIndex(reversedBlockHeaders []common.BlockHeader) (index int) {
129-
for i := 0; i < len(reversedBlockHeaders)-1; i++ {
130-
currentBlock := reversedBlockHeaders[i]
131-
previousBlock := reversedBlockHeaders[i+1]
127+
func findReorgEndIndex(blockHeadersDescending []common.BlockHeader) (index int) {
128+
for i := 0; i < len(blockHeadersDescending)-1; i++ {
129+
currentBlock := blockHeadersDescending[i]
130+
previousBlockInChain := blockHeadersDescending[i+1]
132131

133-
if currentBlock.Number.Cmp(previousBlock.Number) == 0 { // unmerged block
132+
if currentBlock.Number.Cmp(previousBlockInChain.Number) == 0 { // unmerged block
134133
continue
135134
}
136-
if currentBlock.ParentHash != previousBlock.Hash {
135+
if currentBlock.ParentHash != previousBlockInChain.Hash {
137136
log.Debug().
138137
Str("currentBlockNumber", currentBlock.Number.String()).
139138
Str("currentBlockHash", currentBlock.Hash).
140139
Str("currentBlockParentHash", currentBlock.ParentHash).
141-
Str("previousBlockNumber", previousBlock.Number.String()).
142-
Str("previousBlockHash", previousBlock.Hash).
140+
Str("previousBlockNumber", previousBlockInChain.Number.String()).
141+
Str("previousBlockHash", previousBlockInChain.Hash).
143142
Msg("Reorg detected: parent hash mismatch")
144-
return i
143+
return i + 1
145144
}
146145
}
147146
return -1
@@ -153,20 +152,23 @@ func (rh *ReorgHandler) findFirstForkedBlockNumber(reversedBlockHeaders []common
153152
return nil, err
154153
}
155154

156-
// skip first because that is the reorg end block
157-
for i := 1; i < len(reversedBlockHeaders); i++ {
155+
for i := 0; i < len(reversedBlockHeaders); i++ {
158156
blockHeader := reversedBlockHeaders[i]
159157
block, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
160158
if !ok {
161159
return nil, fmt.Errorf("block not found: %s", blockHeader.Number.String())
162160
}
163161
if blockHeader.ParentHash == block.ParentHash && blockHeader.Hash == block.Hash {
162+
if i == 0 {
163+
return nil, fmt.Errorf("unable to find reorg fork point due to block %s being first in the array", blockHeader.Number.String())
164+
}
164165
previousBlock := reversedBlockHeaders[i-1]
165166
return previousBlock.Number, nil
166167
}
167168
}
168-
lookbackFrom := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number
169-
nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.GetChainID(), rh.blocksPerScan, lookbackFrom)
169+
fetchUntilBlock := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number
170+
fetchFromBlock := new(big.Int).Sub(fetchUntilBlock, big.NewInt(int64(rh.blocksPerScan)))
171+
nextHeadersBatch, err := rh.storage.MainStorage.GetBlockHeadersDescending(rh.rpc.GetChainID(), fetchFromBlock, fetchUntilBlock)
170172
if err != nil {
171173
return nil, fmt.Errorf("error getting next headers batch: %w", err)
172174
}

0 commit comments

Comments
 (0)