Skip to content

Commit 27daf95

Browse files
committed
find all reorgs in current batch
1 parent a6a7a7b commit 27daf95

File tree

3 files changed

+123
-87
lines changed

3 files changed

+123
-87
lines changed

internal/orchestrator/reorg_handler.go

Lines changed: 86 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package orchestrator
33
import (
44
"fmt"
55
"math/big"
6+
"sort"
7+
"sync"
68
"time"
79

810
"github.com/rs/zerolog/log"
@@ -73,7 +75,7 @@ func (rh *ReorgHandler) Start() {
7375
for range ticker.C {
7476
mostRecentBlockChecked, err := rh.RunFromBlock(rh.lastCheckedBlock)
7577
if err != nil {
76-
log.Error().Err(err).Msg("Error during reorg handling")
78+
log.Error().Err(err).Msgf("Error during reorg handling: %s", err.Error())
7779
continue
7880
}
7981
if mostRecentBlockChecked == nil {
@@ -107,98 +109,129 @@ func (rh *ReorgHandler) RunFromBlock(fromBlock *big.Int) (lastCheckedBlock *big.
107109
log.Debug().Msgf("Most recent (%s) and last checked (%s) block numbers are equal, skipping reorg check", mostRecentBlockHeader.Number.String(), lastBlockHeader.Number.String())
108110
return nil, nil
109111
}
110-
reorgEndIndex := findReorgEndIndex(blockHeaders)
111-
if reorgEndIndex == -1 {
112+
113+
firstMismatchIndex, err := findIndexOfFirstHashMismatch(blockHeaders)
114+
if err != nil {
115+
return nil, fmt.Errorf("error detecting reorgs: %w", err)
116+
}
117+
if firstMismatchIndex == -1 {
118+
log.Debug().Msgf("No reorg detected, most recent block number checked: %s", mostRecentBlockHeader.Number.String())
112119
return mostRecentBlockHeader.Number, nil
113120
}
121+
114122
metrics.ReorgCounter.Inc()
115-
forkPoint, err := rh.findFirstForkedBlockNumber(blockHeaders[reorgEndIndex:])
123+
reorgedBlockNumbers := make([]*big.Int, 0)
124+
err = rh.findReorgedBlockNumbers(blockHeaders[firstMismatchIndex:], &reorgedBlockNumbers)
116125
if err != nil {
117-
return nil, fmt.Errorf("error while finding fork point: %w", err)
126+
return nil, fmt.Errorf("error finding reorged block numbers: %w", err)
127+
}
128+
129+
if len(reorgedBlockNumbers) == 0 {
130+
log.Debug().Msgf("Reorg was detected, but no reorged block numbers found, most recent block number checked: %s", mostRecentBlockHeader.Number.String())
131+
return mostRecentBlockHeader.Number, nil
118132
}
119-
reorgEndBlock := blockHeaders[reorgEndIndex].Number
120-
err = rh.handleReorg(forkPoint, reorgEndBlock)
133+
134+
err = rh.handleReorg(reorgedBlockNumbers)
121135
if err != nil {
122136
return nil, fmt.Errorf("error while handling reorg: %w", err)
123137
}
124138
return mostRecentBlockHeader.Number, nil
125139
}
126140

127-
func findReorgEndIndex(blockHeadersDescending []common.BlockHeader) (index int) {
141+
func findIndexOfFirstHashMismatch(blockHeadersDescending []common.BlockHeader) (int, error) {
128142
for i := 0; i < len(blockHeadersDescending)-1; i++ {
129143
currentBlock := blockHeadersDescending[i]
130144
previousBlockInChain := blockHeadersDescending[i+1]
131-
132145
if currentBlock.Number.Cmp(previousBlockInChain.Number) == 0 { // unmerged block
133146
continue
134147
}
148+
if currentBlock.Number.Cmp(new(big.Int).Add(previousBlockInChain.Number, big.NewInt(1))) != 0 {
149+
return -1, fmt.Errorf("block headers are not sequential - cannot proceed with detecting reorgs. Comparing blocks: %s and %s", currentBlock.Number.String(), previousBlockInChain.Number.String())
150+
}
135151
if currentBlock.ParentHash != previousBlockInChain.Hash {
136-
log.Debug().
137-
Str("currentBlockNumber", currentBlock.Number.String()).
138-
Str("currentBlockHash", currentBlock.Hash).
139-
Str("currentBlockParentHash", currentBlock.ParentHash).
140-
Str("previousBlockNumber", previousBlockInChain.Number.String()).
141-
Str("previousBlockHash", previousBlockInChain.Hash).
142-
Msg("Reorg detected: parent hash mismatch")
143-
return i + 1
152+
return i + 1, nil
144153
}
145154
}
146-
return -1
155+
return -1, nil
147156
}
148157

149-
func (rh *ReorgHandler) findFirstForkedBlockNumber(reversedBlockHeaders []common.BlockHeader) (forkPoint *big.Int, err error) {
150-
newBlocksByNumber, err := rh.getNewBlocksByNumber(reversedBlockHeaders)
158+
func (rh *ReorgHandler) findReorgedBlockNumbers(blockHeadersDescending []common.BlockHeader, reorgedBlockNumbers *[]*big.Int) error {
159+
newBlocksByNumber, err := rh.getNewBlocksByNumber(blockHeadersDescending)
151160
if err != nil {
152-
return nil, err
161+
return err
153162
}
154-
155-
for i := 0; i < len(reversedBlockHeaders); i++ {
156-
blockHeader := reversedBlockHeaders[i]
157-
block, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
163+
continueCheckingForReorgs := false
164+
for i := 0; i < len(blockHeadersDescending); i++ {
165+
blockHeader := blockHeadersDescending[i]
166+
fetchedBlock, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
158167
if !ok {
159-
return nil, fmt.Errorf("block not found: %s", blockHeader.Number.String())
168+
return fmt.Errorf("block not found: %s", blockHeader.Number.String())
160169
}
161-
if blockHeader.ParentHash == block.ParentHash && blockHeader.Hash == block.Hash {
162-
if i == 0 {
163-
return nil, fmt.Errorf("unable to find reorg fork point due to block %s being first in the array", blockHeader.Number.String())
170+
if blockHeader.ParentHash != fetchedBlock.ParentHash || blockHeader.Hash != fetchedBlock.Hash {
171+
*reorgedBlockNumbers = append(*reorgedBlockNumbers, blockHeader.Number)
172+
if i == len(blockHeadersDescending)-1 {
173+
continueCheckingForReorgs = true // if last block in range is reorged, we should continue checking
164174
}
165-
previousBlock := reversedBlockHeaders[i-1]
166-
return previousBlock.Number, nil
167175
}
168176
}
169-
fetchUntilBlock := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number
170-
fetchFromBlock := new(big.Int).Sub(fetchUntilBlock, big.NewInt(int64(rh.blocksPerScan)))
171-
nextHeadersBatch, err := rh.storage.MainStorage.GetBlockHeadersDescending(rh.rpc.GetChainID(), fetchFromBlock, fetchUntilBlock)
172-
if err != nil {
173-
return nil, fmt.Errorf("error getting next headers batch: %w", err)
177+
if continueCheckingForReorgs {
178+
fetchUntilBlock := blockHeadersDescending[len(blockHeadersDescending)-1].Number
179+
fetchFromBlock := new(big.Int).Sub(fetchUntilBlock, big.NewInt(int64(rh.blocksPerScan)))
180+
nextHeadersBatch, err := rh.storage.MainStorage.GetBlockHeadersDescending(rh.rpc.GetChainID(), fetchFromBlock, new(big.Int).Sub(fetchUntilBlock, big.NewInt(1))) // we sub 1 to not check the last block again
181+
if err != nil {
182+
return fmt.Errorf("error getting next headers batch: %w", err)
183+
}
184+
sort.Slice(nextHeadersBatch, func(i, j int) bool {
185+
return nextHeadersBatch[i].Number.Cmp(nextHeadersBatch[j].Number) > 0
186+
})
187+
return rh.findReorgedBlockNumbers(nextHeadersBatch, reorgedBlockNumbers)
174188
}
175-
return rh.findFirstForkedBlockNumber(nextHeadersBatch)
189+
return nil
176190
}
177191

178-
func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
179-
blockNumbers := make([]*big.Int, 0, len(reversedBlockHeaders))
180-
for _, header := range reversedBlockHeaders {
192+
func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
193+
blockNumbers := make([]*big.Int, 0, len(blockHeaders))
194+
for _, header := range blockHeaders {
181195
blockNumbers = append(blockNumbers, header.Number)
182196
}
183-
blockResults := rh.rpc.GetBlocks(blockNumbers)
197+
blockCount := len(blockNumbers)
198+
chunks := common.BigIntSliceToChunks(blockNumbers, rh.rpc.GetBlocksPerRequest().Blocks)
199+
200+
var wg sync.WaitGroup
201+
resultsCh := make(chan []rpc.GetBlocksResult, len(chunks))
202+
203+
// TODO: move batching to rpc
204+
log.Debug().Msgf("Reorg handler fetching %d blocks in %d chunks of max %d blocks", blockCount, len(chunks), rh.rpc.GetBlocksPerRequest().Blocks)
205+
for _, chunk := range chunks {
206+
wg.Add(1)
207+
go func(chunk []*big.Int) {
208+
defer wg.Done()
209+
resultsCh <- rh.rpc.GetBlocks(chunk)
210+
if config.Cfg.RPC.Blocks.BatchDelay > 0 {
211+
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
212+
}
213+
}(chunk)
214+
}
215+
go func() {
216+
wg.Wait()
217+
close(resultsCh)
218+
}()
219+
184220
fetchedBlocksByNumber := make(map[string]common.Block)
185-
for _, blockResult := range blockResults {
186-
if blockResult.Error != nil {
187-
return nil, fmt.Errorf("error fetching block %s: %w", blockResult.BlockNumber.String(), blockResult.Error)
221+
for batchResults := range resultsCh {
222+
for _, blockResult := range batchResults {
223+
if blockResult.Error != nil {
224+
return nil, fmt.Errorf("error fetching block %s: %w", blockResult.BlockNumber.String(), blockResult.Error)
225+
}
226+
fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data
188227
}
189-
fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data
190228
}
191229
return &fetchedBlocksByNumber, nil
192230
}
193231

194-
func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) error {
195-
log.Debug().Msgf("Handling reorg from block %s to %s", reorgStart.String(), reorgEnd.String())
196-
blockRange := make([]*big.Int, 0, new(big.Int).Sub(reorgEnd, reorgStart).Int64())
197-
for i := new(big.Int).Set(reorgStart); i.Cmp(reorgEnd) <= 0; i.Add(i, big.NewInt(1)) {
198-
blockRange = append(blockRange, new(big.Int).Set(i))
199-
}
200-
201-
results := rh.worker.Run(blockRange)
232+
func (rh *ReorgHandler) handleReorg(reorgedBlockNumbers []*big.Int) error {
233+
log.Debug().Msgf("Handling reorg for blocks %v", reorgedBlockNumbers)
234+
results := rh.worker.Run(reorgedBlockNumbers)
202235
data := make([]common.BlockData, 0, len(results))
203236
blocksToDelete := make([]*big.Int, 0, len(results))
204237
for _, result := range results {

internal/orchestrator/reorg_handler_test.go

Lines changed: 36 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ func TestFindReorgEndIndex(t *testing.T) {
119119

120120
for _, tt := range tests {
121121
t.Run(tt.name, func(t *testing.T) {
122-
result := findReorgEndIndex(tt.reversedBlockHeaders)
122+
result, err := findIndexOfFirstHashMismatch(tt.reversedBlockHeaders)
123+
assert.NoError(t, err)
123124
assert.Equal(t, tt.expectedIndex, result)
124125
})
125126
}
@@ -146,7 +147,7 @@ func TestNewReorgHandlerWithForceFromBlock(t *testing.T) {
146147
assert.Equal(t, big.NewInt(2000), handler.lastCheckedBlock)
147148
}
148149

149-
func TestFindFirstForkedBlockNumber(t *testing.T) {
150+
func TestFindFirstReorgedBlockNumber(t *testing.T) {
150151
mockRPC := mocks.NewMockIRPCClient(t)
151152
mockMainStorage := mocks.NewMockIMainStorage(t)
152153
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
@@ -158,6 +159,7 @@ func TestFindFirstForkedBlockNumber(t *testing.T) {
158159

159160
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
160161
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(3), nil)
162+
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})
161163
handler := NewReorgHandler(mockRPC, mockStorage)
162164

163165
reversedBlockHeaders := []common.BlockHeader{
@@ -172,13 +174,14 @@ func TestFindFirstForkedBlockNumber(t *testing.T) {
172174
{BlockNumber: big.NewInt(1), Data: common.Block{Hash: "hash1", ParentHash: "hash0"}},
173175
})
174176

175-
forkPoint, err := handler.findFirstForkedBlockNumber(reversedBlockHeaders)
177+
reorgedBlockNumbers := []*big.Int{}
178+
err := handler.findReorgedBlockNumbers(reversedBlockHeaders, &reorgedBlockNumbers)
176179

177180
assert.NoError(t, err)
178-
assert.Equal(t, big.NewInt(3), forkPoint)
181+
assert.Equal(t, []*big.Int{big.NewInt(3)}, reorgedBlockNumbers)
179182
}
180183

181-
func TestFindFirstForkedBlockNumberWithLastBlockInSlice(t *testing.T) {
184+
func TestFindAllReorgedBlockNumbersWithLastBlockInSliceAsValid(t *testing.T) {
182185
mockRPC := mocks.NewMockIRPCClient(t)
183186
mockMainStorage := mocks.NewMockIMainStorage(t)
184187
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
@@ -189,12 +192,13 @@ func TestFindFirstForkedBlockNumberWithLastBlockInSlice(t *testing.T) {
189192
}
190193

191194
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
195+
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})
192196
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(3), nil)
193197
handler := NewReorgHandler(mockRPC, mockStorage)
194198

195199
reversedBlockHeaders := []common.BlockHeader{
196-
{Number: big.NewInt(3), Hash: "hash3a", ParentHash: "hash2a"},
197-
{Number: big.NewInt(2), Hash: "hash2a", ParentHash: "hash1"}, // <- fork starts from here
200+
{Number: big.NewInt(3), Hash: "hash3a", ParentHash: "hash2a"}, // <- fork starts from here
201+
{Number: big.NewInt(2), Hash: "hash2a", ParentHash: "hash1"},
198202
{Number: big.NewInt(1), Hash: "hash1", ParentHash: "hash0"},
199203
}
200204

@@ -204,13 +208,14 @@ func TestFindFirstForkedBlockNumberWithLastBlockInSlice(t *testing.T) {
204208
{BlockNumber: big.NewInt(1), Data: common.Block{Hash: "hash1", ParentHash: "hash0"}},
205209
})
206210

207-
forkPoint, err := handler.findFirstForkedBlockNumber(reversedBlockHeaders)
211+
reorgedBlockNumbers := []*big.Int{}
212+
err := handler.findReorgedBlockNumbers(reversedBlockHeaders, &reorgedBlockNumbers)
208213

209214
assert.NoError(t, err)
210-
assert.Equal(t, big.NewInt(2), forkPoint)
215+
assert.Equal(t, []*big.Int{big.NewInt(3), big.NewInt(2)}, reorgedBlockNumbers)
211216
}
212217

213-
func TestFindFirstForkedBlockNumberRecursively(t *testing.T) {
218+
func TestReorgedBlockNumbersRecursively(t *testing.T) {
214219
defer func() { config.Cfg = config.Config{} }()
215220
config.Cfg.ReorgHandler.BlocksPerScan = 3
216221

@@ -224,6 +229,7 @@ func TestFindFirstForkedBlockNumberRecursively(t *testing.T) {
224229
}
225230

226231
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
232+
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})
227233
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(3), nil)
228234
handler := NewReorgHandler(mockRPC, mockStorage)
229235

@@ -233,8 +239,7 @@ func TestFindFirstForkedBlockNumberRecursively(t *testing.T) {
233239
{BlockNumber: big.NewInt(4), Data: common.Block{Hash: "hash4", ParentHash: "hash3"}},
234240
}).Once()
235241

236-
mockRPC.EXPECT().GetBlocks([]*big.Int{big.NewInt(4), big.NewInt(3), big.NewInt(2), big.NewInt(1)}).Return([]rpc.GetBlocksResult{
237-
{BlockNumber: big.NewInt(4), Data: common.Block{Hash: "hash4", ParentHash: "hash3"}},
242+
mockRPC.EXPECT().GetBlocks([]*big.Int{big.NewInt(3), big.NewInt(2), big.NewInt(1)}).Return([]rpc.GetBlocksResult{
238243
{BlockNumber: big.NewInt(3), Data: common.Block{Hash: "hash3", ParentHash: "hash2"}},
239244
{BlockNumber: big.NewInt(2), Data: common.Block{Hash: "hash2", ParentHash: "hash1"}},
240245
{BlockNumber: big.NewInt(1), Data: common.Block{Hash: "hash1", ParentHash: "hash0"}},
@@ -246,17 +251,17 @@ func TestFindFirstForkedBlockNumberRecursively(t *testing.T) {
246251
{Number: big.NewInt(4), Hash: "hash4a", ParentHash: "hash3a"},
247252
}
248253

249-
mockMainStorage.EXPECT().GetBlockHeadersDescending(big.NewInt(1), big.NewInt(1), big.NewInt(4)).Return([]common.BlockHeader{
250-
{Number: big.NewInt(4), Hash: "hash4a", ParentHash: "hash3a"},
251-
{Number: big.NewInt(3), Hash: "hash3a", ParentHash: "hash2"}, // <- fork starts from here
254+
mockMainStorage.EXPECT().GetBlockHeadersDescending(big.NewInt(1), big.NewInt(1), big.NewInt(3)).Return([]common.BlockHeader{
255+
{Number: big.NewInt(3), Hash: "hash3a", ParentHash: "hash2a"}, // <- end of reorged blocks
252256
{Number: big.NewInt(2), Hash: "hash2", ParentHash: "hash1"},
253257
{Number: big.NewInt(1), Hash: "hash1", ParentHash: "hash0"},
254258
}, nil)
255259

256-
forkPoint, err := handler.findFirstForkedBlockNumber(initialBlockHeaders)
260+
reorgedBlockNumbers := []*big.Int{}
261+
err := handler.findReorgedBlockNumbers(initialBlockHeaders, &reorgedBlockNumbers)
257262

258263
assert.NoError(t, err)
259-
assert.Equal(t, big.NewInt(3), forkPoint)
264+
assert.Equal(t, []*big.Int{big.NewInt(6), big.NewInt(5), big.NewInt(4), big.NewInt(3)}, reorgedBlockNumbers)
260265
}
261266

262267
func TestHandleReorg(t *testing.T) {
@@ -278,14 +283,11 @@ func TestHandleReorg(t *testing.T) {
278283
})
279284
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(3), nil)
280285

281-
reorgStart := big.NewInt(1)
282-
reorgEnd := big.NewInt(3)
283-
284286
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.Anything).Return(nil)
285287
mockMainStorage.EXPECT().InsertBlockData(mock.Anything).Return(nil)
286288

287289
handler := NewReorgHandler(mockRPC, mockStorage)
288-
err := handler.handleReorg(reorgStart, reorgEnd)
290+
err := handler.handleReorg([]*big.Int{big.NewInt(1), big.NewInt(2), big.NewInt(3)})
289291

290292
assert.NoError(t, err)
291293
}
@@ -418,15 +420,15 @@ func TestHandleReorgWithLatestBlockReorged(t *testing.T) {
418420
{BlockNumber: big.NewInt(100), Data: common.Block{Hash: "hash100", ParentHash: "hash99"}},
419421
})
420422

421-
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104), big.NewInt(105), big.NewInt(106), big.NewInt(107), big.NewInt(108)}).Return([]rpc.GetFullBlockResult{
422-
{BlockNumber: big.NewInt(108), Data: common.BlockData{}},
423-
{BlockNumber: big.NewInt(107), Data: common.BlockData{}},
424-
{BlockNumber: big.NewInt(106), Data: common.BlockData{}},
425-
{BlockNumber: big.NewInt(105), Data: common.BlockData{}},
426-
{BlockNumber: big.NewInt(104), Data: common.BlockData{}},
427-
{BlockNumber: big.NewInt(103), Data: common.BlockData{}},
428-
{BlockNumber: big.NewInt(102), Data: common.BlockData{}},
423+
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(108), big.NewInt(107), big.NewInt(106), big.NewInt(105), big.NewInt(104), big.NewInt(103), big.NewInt(102), big.NewInt(101)}).Return([]rpc.GetFullBlockResult{
429424
{BlockNumber: big.NewInt(101), Data: common.BlockData{}},
425+
{BlockNumber: big.NewInt(102), Data: common.BlockData{}},
426+
{BlockNumber: big.NewInt(103), Data: common.BlockData{}},
427+
{BlockNumber: big.NewInt(104), Data: common.BlockData{}},
428+
{BlockNumber: big.NewInt(105), Data: common.BlockData{}},
429+
{BlockNumber: big.NewInt(106), Data: common.BlockData{}},
430+
{BlockNumber: big.NewInt(107), Data: common.BlockData{}},
431+
{BlockNumber: big.NewInt(108), Data: common.BlockData{}},
430432
})
431433

432434
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.MatchedBy(func(blocks []*big.Int) bool {
@@ -484,12 +486,12 @@ func TestHandleReorgWithManyBlocks(t *testing.T) {
484486
{BlockNumber: big.NewInt(100), Data: common.Block{Hash: "hash100", ParentHash: "hash99"}},
485487
})
486488

487-
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(103), big.NewInt(104), big.NewInt(105), big.NewInt(106), big.NewInt(107)}).Return([]rpc.GetFullBlockResult{
488-
{BlockNumber: big.NewInt(103), Data: common.BlockData{}},
489-
{BlockNumber: big.NewInt(104), Data: common.BlockData{}},
490-
{BlockNumber: big.NewInt(105), Data: common.BlockData{}},
491-
{BlockNumber: big.NewInt(106), Data: common.BlockData{}},
489+
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(107), big.NewInt(106), big.NewInt(105), big.NewInt(104), big.NewInt(103)}).Return([]rpc.GetFullBlockResult{
492490
{BlockNumber: big.NewInt(107), Data: common.BlockData{}},
491+
{BlockNumber: big.NewInt(106), Data: common.BlockData{}},
492+
{BlockNumber: big.NewInt(105), Data: common.BlockData{}},
493+
{BlockNumber: big.NewInt(104), Data: common.BlockData{}},
494+
{BlockNumber: big.NewInt(103), Data: common.BlockData{}},
493495
})
494496

495497
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.MatchedBy(func(blocks []*big.Int) bool {

internal/worker/worker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ func NewWorker(rpc rpc.IRPCClient) *Worker {
2222
}
2323
}
2424

25+
// TODO: move batching to rpc
2526
func (w *Worker) Run(blockNumbers []*big.Int) []rpc.GetFullBlockResult {
2627
blockCount := len(blockNumbers)
2728
chunks := common.BigIntSliceToChunks(blockNumbers, w.rpc.GetBlocksPerRequest().Blocks)

0 commit comments

Comments
 (0)