Skip to content

Commit 31df1e0

Browse files
authored
Poll missing blocks in gap handler (#113)
### TL;DR Changed block gap handling to poll missing blocks instead of marking them as failures. ### What changed? - Replaced the block failure recording mechanism in `handleGap` with a new polling approach - Created a new `BoundlessPoller` that can poll arbitrary block ranges - Refactored the `Poller` to extract common functionality into `BoundlessPoller` - Updated tests to reflect the new polling behavior ### How to test? 1. Run the indexer with a gap in block sequence 2. Verify that missing blocks are now polled and processed 3. Confirm that blocks are properly indexed instead of being marked as failures 4. Run unit tests to verify the new gap handling behavior ### Why make this change? The previous implementation would mark missing blocks as failures without attempting to retrieve them. This new approach actively tries to fetch and process missing blocks, improving indexing speed
2 parents c6fc50e + 7187f40 commit 31df1e0

File tree

3 files changed

+66
-85
lines changed

3 files changed

+66
-85
lines changed

internal/orchestrator/committer.go

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -184,33 +184,8 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
184184
}
185185
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())
186186

187-
existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.GetChainID()})
188-
if err != nil {
189-
return fmt.Errorf("error getting block failures while handling gap: %v", err)
190-
}
191-
192-
existingBlockFailuresMap := make(map[string]*common.BlockFailure)
193-
for _, failure := range existingBlockFailures {
194-
blockNumberStr := failure.BlockNumber.String()
195-
existingBlockFailuresMap[blockNumberStr] = &failure
196-
}
197-
198-
blockFailures := make([]common.BlockFailure, 0)
199-
for _, blockNumber := range missingBlockNumbers {
200-
blockNumberStr := blockNumber.String()
201-
if _, ok := existingBlockFailuresMap[blockNumberStr]; !ok {
202-
blockFailures = append(blockFailures, common.BlockFailure{
203-
BlockNumber: blockNumber,
204-
ChainId: c.rpc.GetChainID(),
205-
FailureTime: time.Now(),
206-
FailureCount: 1,
207-
FailureReason: "Gap detected for this block",
208-
})
209-
}
210-
}
211-
log.Debug().Msgf("Storing %d block failures while handling gap", len(blockFailures))
212-
if err := c.storage.OrchestratorStorage.StoreBlockFailures(blockFailures); err != nil {
213-
return fmt.Errorf("error storing block failures while handling gap: %v", err)
214-
}
187+
poller := NewBoundlessPoller(c.rpc, c.storage)
188+
log.Debug().Msgf("Polling %d blocks while handling gap: %v", len(missingBlockNumbers), missingBlockNumbers)
189+
poller.Poll(missingBlockNumbers)
215190
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
216191
}

internal/orchestrator/committer_test.go

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/stretchr/testify/mock"
1010
config "github.com/thirdweb-dev/indexer/configs"
1111
"github.com/thirdweb-dev/indexer/internal/common"
12+
"github.com/thirdweb-dev/indexer/internal/rpc"
1213
"github.com/thirdweb-dev/indexer/internal/storage"
1314
mocks "github.com/thirdweb-dev/indexer/test/mocks"
1415
)
@@ -173,36 +174,25 @@ func TestHandleGap(t *testing.T) {
173174
}
174175
committer := NewCommitter(mockRPC, mockStorage)
175176

176-
chainID := big.NewInt(1)
177-
mockRPC.EXPECT().GetChainID().Return(chainID)
178-
179177
expectedStartBlockNumber := big.NewInt(100)
180178
actualFirstBlock := common.Block{Number: big.NewInt(105)}
181179

182-
mockOrchestratorStorage.EXPECT().GetBlockFailures(storage.QueryFilter{
183-
ChainId: chainID,
184-
BlockNumbers: []*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)},
185-
}).Return([]common.BlockFailure{}, nil)
186-
mockOrchestratorStorage.On("StoreBlockFailures", mock.MatchedBy(func(failures []common.BlockFailure) bool {
187-
return len(failures) == 5 && failures[0].ChainId == chainID && failures[0].BlockNumber.Cmp(big.NewInt(100)) == 0 &&
188-
failures[0].FailureCount == 1 && failures[0].FailureReason == "Gap detected for this block" &&
189-
failures[1].ChainId == chainID && failures[1].BlockNumber.Cmp(big.NewInt(101)) == 0 &&
190-
failures[1].FailureCount == 1 && failures[1].FailureReason == "Gap detected for this block" &&
191-
failures[2].ChainId == chainID && failures[2].BlockNumber.Cmp(big.NewInt(102)) == 0 &&
192-
failures[2].FailureCount == 1 && failures[2].FailureReason == "Gap detected for this block" &&
193-
failures[3].ChainId == chainID && failures[3].BlockNumber.Cmp(big.NewInt(103)) == 0 &&
194-
failures[3].FailureCount == 1 && failures[3].FailureReason == "Gap detected for this block" &&
195-
failures[4].ChainId == chainID && failures[4].BlockNumber.Cmp(big.NewInt(104)) == 0 &&
196-
failures[4].FailureCount == 1 && failures[4].FailureReason == "Gap detected for this block"
197-
})).Return(nil)
180+
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
181+
Blocks: 5,
182+
})
183+
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)}).Return([]rpc.GetFullBlockResult{
184+
{BlockNumber: big.NewInt(100), Data: common.BlockData{Block: common.Block{Number: big.NewInt(100)}}},
185+
{BlockNumber: big.NewInt(101), Data: common.BlockData{Block: common.Block{Number: big.NewInt(101)}}},
186+
{BlockNumber: big.NewInt(102), Data: common.BlockData{Block: common.Block{Number: big.NewInt(102)}}},
187+
{BlockNumber: big.NewInt(103), Data: common.BlockData{Block: common.Block{Number: big.NewInt(103)}}},
188+
{BlockNumber: big.NewInt(104), Data: common.BlockData{Block: common.Block{Number: big.NewInt(104)}}},
189+
})
190+
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)
198191

199192
err := committer.handleGap(expectedStartBlockNumber, actualFirstBlock)
200193

201194
assert.Error(t, err)
202195
assert.Contains(t, err.Error(), "first block number (105) in commit batch does not match expected (100)")
203-
204-
mockRPC.AssertExpectations(t)
205-
mockOrchestratorStorage.AssertExpectations(t)
206196
}
207197

208198
func TestStartCommitter(t *testing.T) {

internal/orchestrator/poller.go

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type Poller struct {
2424
triggerIntervalMs int64
2525
storage storage.IStorage
2626
lastPolledBlock *big.Int
27+
pollFromBlock *big.Int
2728
pollUntilBlock *big.Int
2829
parallelPollers int
2930
}
@@ -33,7 +34,7 @@ type BlockNumberWithError struct {
3334
Error error
3435
}
3536

36-
func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
37+
func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
3738
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
3839
if blocksPerPoll == 0 {
3940
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
@@ -42,6 +43,17 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
4243
if triggerInterval == 0 {
4344
triggerInterval = DEFAULT_TRIGGER_INTERVAL
4445
}
46+
return &Poller{
47+
rpc: rpc,
48+
triggerIntervalMs: int64(triggerInterval),
49+
blocksPerPoll: int64(blocksPerPoll),
50+
storage: storage,
51+
parallelPollers: config.Cfg.Poller.ParallelPollers,
52+
}
53+
}
54+
55+
func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
56+
poller := NewBoundlessPoller(rpc, storage)
4557
untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock))
4658
pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock))
4759
lastPolledBlock := new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
@@ -56,15 +68,10 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
5668
log.Debug().Msgf("Last polled block found in staging: %s", lastPolledBlock.String())
5769
}
5870
}
59-
return &Poller{
60-
rpc: rpc,
61-
triggerIntervalMs: int64(triggerInterval),
62-
blocksPerPoll: int64(blocksPerPoll),
63-
storage: storage,
64-
lastPolledBlock: lastPolledBlock,
65-
pollUntilBlock: untilBlock,
66-
parallelPollers: config.Cfg.Poller.ParallelPollers,
67-
}
71+
poller.lastPolledBlock = lastPolledBlock
72+
poller.pollFromBlock = pollFromBlock
73+
poller.pollUntilBlock = untilBlock
74+
return poller
6875
}
6976

7077
func (p *Poller) Start() {
@@ -78,30 +85,16 @@ func (p *Poller) Start() {
7885
go func() {
7986
for range tasks {
8087
blockRangeMutex.Lock()
81-
blockNumbers, err := p.getBlockRange()
88+
blockNumbers, err := p.getNextBlockRange()
8289
blockRangeMutex.Unlock()
8390

8491
if err != nil {
8592
log.Error().Err(err).Msg("Error getting block range")
8693
continue
8794
}
88-
if len(blockNumbers) < 1 {
89-
log.Debug().Msg("No blocks to poll, skipping")
90-
continue
91-
}
92-
endBlock := blockNumbers[len(blockNumbers)-1]
93-
if endBlock != nil {
94-
p.lastPolledBlock = endBlock
95-
}
96-
log.Debug().Msgf("Polling %d blocks starting from %s to %s", len(blockNumbers), blockNumbers[0], endBlock)
9795

98-
endBlockNumberFloat, _ := endBlock.Float64()
99-
metrics.PollerLastTriggeredBlock.Set(endBlockNumberFloat)
100-
101-
worker := worker.NewWorker(p.rpc)
102-
results := worker.Run(blockNumbers)
103-
p.handleWorkerResults(results)
104-
if p.reachedPollLimit(endBlock) {
96+
lastPolledBlock := p.Poll(blockNumbers)
97+
if p.reachedPollLimit(lastPolledBlock) {
10598
log.Debug().Msg("Reached poll limit, exiting poller")
10699
ticker.Stop()
107100
return
@@ -118,11 +111,31 @@ func (p *Poller) Start() {
118111
select {}
119112
}
120113

114+
func (p *Poller) Poll(blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
115+
if len(blockNumbers) < 1 {
116+
log.Debug().Msg("No blocks to poll, skipping")
117+
return
118+
}
119+
endBlock := blockNumbers[len(blockNumbers)-1]
120+
if endBlock != nil {
121+
p.lastPolledBlock = endBlock
122+
}
123+
log.Debug().Msgf("Polling %d blocks starting from %s to %s", len(blockNumbers), blockNumbers[0], endBlock)
124+
125+
endBlockNumberFloat, _ := endBlock.Float64()
126+
metrics.PollerLastTriggeredBlock.Set(endBlockNumberFloat)
127+
128+
worker := worker.NewWorker(p.rpc)
129+
results := worker.Run(blockNumbers)
130+
p.handleWorkerResults(results)
131+
return endBlock
132+
}
133+
121134
func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
122135
return p.pollUntilBlock.Sign() > 0 && blockNumber.Cmp(p.pollUntilBlock) >= 0
123136
}
124137

125-
func (p *Poller) getBlockRange() ([]*big.Int, error) {
138+
func (p *Poller) getNextBlockRange() ([]*big.Int, error) {
126139
latestBlock, err := p.rpc.GetLatestBlockNumber()
127140
if err != nil {
128141
return nil, err
@@ -140,13 +153,7 @@ func (p *Poller) getBlockRange() ([]*big.Int, error) {
140153
return nil, nil
141154
}
142155

143-
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
144-
blockNumbers := make([]*big.Int, blockCount)
145-
for i := int64(0); i < blockCount; i++ {
146-
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
147-
}
148-
149-
return blockNumbers, nil
156+
return p.createBlockNumbersForRange(startBlock, endBlock), nil
150157
}
151158

152159
func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int) *big.Int {
@@ -161,6 +168,15 @@ func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int)
161168
return endBlock
162169
}
163170

171+
func (p *Poller) createBlockNumbersForRange(startBlock *big.Int, endBlock *big.Int) []*big.Int {
172+
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
173+
blockNumbers := make([]*big.Int, blockCount)
174+
for i := int64(0); i < blockCount; i++ {
175+
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
176+
}
177+
return blockNumbers
178+
}
179+
164180
func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
165181
var successfulResults []rpc.GetFullBlockResult
166182
var failedResults []rpc.GetFullBlockResult

0 commit comments

Comments
 (0)