Skip to content

Commit 7187f40

Browse files
committed
trigger polling when gap is detected
1 parent c6fc50e commit 7187f40

File tree

3 files changed

+66
-85
lines changed

3 files changed

+66
-85
lines changed

internal/orchestrator/committer.go

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -184,33 +184,8 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
184184
}
185185
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())
186186

187-
existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.GetChainID()})
188-
if err != nil {
189-
return fmt.Errorf("error getting block failures while handling gap: %v", err)
190-
}
191-
192-
existingBlockFailuresMap := make(map[string]*common.BlockFailure)
193-
for _, failure := range existingBlockFailures {
194-
blockNumberStr := failure.BlockNumber.String()
195-
existingBlockFailuresMap[blockNumberStr] = &failure
196-
}
197-
198-
blockFailures := make([]common.BlockFailure, 0)
199-
for _, blockNumber := range missingBlockNumbers {
200-
blockNumberStr := blockNumber.String()
201-
if _, ok := existingBlockFailuresMap[blockNumberStr]; !ok {
202-
blockFailures = append(blockFailures, common.BlockFailure{
203-
BlockNumber: blockNumber,
204-
ChainId: c.rpc.GetChainID(),
205-
FailureTime: time.Now(),
206-
FailureCount: 1,
207-
FailureReason: "Gap detected for this block",
208-
})
209-
}
210-
}
211-
log.Debug().Msgf("Storing %d block failures while handling gap", len(blockFailures))
212-
if err := c.storage.OrchestratorStorage.StoreBlockFailures(blockFailures); err != nil {
213-
return fmt.Errorf("error storing block failures while handling gap: %v", err)
214-
}
187+
poller := NewBoundlessPoller(c.rpc, c.storage)
188+
log.Debug().Msgf("Polling %d blocks while handling gap: %v", len(missingBlockNumbers), missingBlockNumbers)
189+
poller.Poll(missingBlockNumbers)
215190
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
216191
}

internal/orchestrator/committer_test.go

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/stretchr/testify/mock"
1010
config "github.com/thirdweb-dev/indexer/configs"
1111
"github.com/thirdweb-dev/indexer/internal/common"
12+
"github.com/thirdweb-dev/indexer/internal/rpc"
1213
"github.com/thirdweb-dev/indexer/internal/storage"
1314
mocks "github.com/thirdweb-dev/indexer/test/mocks"
1415
)
@@ -173,36 +174,25 @@ func TestHandleGap(t *testing.T) {
173174
}
174175
committer := NewCommitter(mockRPC, mockStorage)
175176

176-
chainID := big.NewInt(1)
177-
mockRPC.EXPECT().GetChainID().Return(chainID)
178-
179177
expectedStartBlockNumber := big.NewInt(100)
180178
actualFirstBlock := common.Block{Number: big.NewInt(105)}
181179

182-
mockOrchestratorStorage.EXPECT().GetBlockFailures(storage.QueryFilter{
183-
ChainId: chainID,
184-
BlockNumbers: []*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)},
185-
}).Return([]common.BlockFailure{}, nil)
186-
mockOrchestratorStorage.On("StoreBlockFailures", mock.MatchedBy(func(failures []common.BlockFailure) bool {
187-
return len(failures) == 5 && failures[0].ChainId == chainID && failures[0].BlockNumber.Cmp(big.NewInt(100)) == 0 &&
188-
failures[0].FailureCount == 1 && failures[0].FailureReason == "Gap detected for this block" &&
189-
failures[1].ChainId == chainID && failures[1].BlockNumber.Cmp(big.NewInt(101)) == 0 &&
190-
failures[1].FailureCount == 1 && failures[1].FailureReason == "Gap detected for this block" &&
191-
failures[2].ChainId == chainID && failures[2].BlockNumber.Cmp(big.NewInt(102)) == 0 &&
192-
failures[2].FailureCount == 1 && failures[2].FailureReason == "Gap detected for this block" &&
193-
failures[3].ChainId == chainID && failures[3].BlockNumber.Cmp(big.NewInt(103)) == 0 &&
194-
failures[3].FailureCount == 1 && failures[3].FailureReason == "Gap detected for this block" &&
195-
failures[4].ChainId == chainID && failures[4].BlockNumber.Cmp(big.NewInt(104)) == 0 &&
196-
failures[4].FailureCount == 1 && failures[4].FailureReason == "Gap detected for this block"
197-
})).Return(nil)
180+
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
181+
Blocks: 5,
182+
})
183+
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)}).Return([]rpc.GetFullBlockResult{
184+
{BlockNumber: big.NewInt(100), Data: common.BlockData{Block: common.Block{Number: big.NewInt(100)}}},
185+
{BlockNumber: big.NewInt(101), Data: common.BlockData{Block: common.Block{Number: big.NewInt(101)}}},
186+
{BlockNumber: big.NewInt(102), Data: common.BlockData{Block: common.Block{Number: big.NewInt(102)}}},
187+
{BlockNumber: big.NewInt(103), Data: common.BlockData{Block: common.Block{Number: big.NewInt(103)}}},
188+
{BlockNumber: big.NewInt(104), Data: common.BlockData{Block: common.Block{Number: big.NewInt(104)}}},
189+
})
190+
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)
198191

199192
err := committer.handleGap(expectedStartBlockNumber, actualFirstBlock)
200193

201194
assert.Error(t, err)
202195
assert.Contains(t, err.Error(), "first block number (105) in commit batch does not match expected (100)")
203-
204-
mockRPC.AssertExpectations(t)
205-
mockOrchestratorStorage.AssertExpectations(t)
206196
}
207197

208198
func TestStartCommitter(t *testing.T) {

internal/orchestrator/poller.go

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type Poller struct {
2424
triggerIntervalMs int64
2525
storage storage.IStorage
2626
lastPolledBlock *big.Int
27+
pollFromBlock *big.Int
2728
pollUntilBlock *big.Int
2829
parallelPollers int
2930
}
@@ -33,7 +34,7 @@ type BlockNumberWithError struct {
3334
Error error
3435
}
3536

36-
func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
37+
func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
3738
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
3839
if blocksPerPoll == 0 {
3940
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
@@ -42,6 +43,17 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
4243
if triggerInterval == 0 {
4344
triggerInterval = DEFAULT_TRIGGER_INTERVAL
4445
}
46+
return &Poller{
47+
rpc: rpc,
48+
triggerIntervalMs: int64(triggerInterval),
49+
blocksPerPoll: int64(blocksPerPoll),
50+
storage: storage,
51+
parallelPollers: config.Cfg.Poller.ParallelPollers,
52+
}
53+
}
54+
55+
func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
56+
poller := NewBoundlessPoller(rpc, storage)
4557
untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock))
4658
pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock))
4759
lastPolledBlock := new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
@@ -56,15 +68,10 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
5668
log.Debug().Msgf("Last polled block found in staging: %s", lastPolledBlock.String())
5769
}
5870
}
59-
return &Poller{
60-
rpc: rpc,
61-
triggerIntervalMs: int64(triggerInterval),
62-
blocksPerPoll: int64(blocksPerPoll),
63-
storage: storage,
64-
lastPolledBlock: lastPolledBlock,
65-
pollUntilBlock: untilBlock,
66-
parallelPollers: config.Cfg.Poller.ParallelPollers,
67-
}
71+
poller.lastPolledBlock = lastPolledBlock
72+
poller.pollFromBlock = pollFromBlock
73+
poller.pollUntilBlock = untilBlock
74+
return poller
6875
}
6976

7077
func (p *Poller) Start() {
@@ -78,30 +85,16 @@ func (p *Poller) Start() {
7885
go func() {
7986
for range tasks {
8087
blockRangeMutex.Lock()
81-
blockNumbers, err := p.getBlockRange()
88+
blockNumbers, err := p.getNextBlockRange()
8289
blockRangeMutex.Unlock()
8390

8491
if err != nil {
8592
log.Error().Err(err).Msg("Error getting block range")
8693
continue
8794
}
88-
if len(blockNumbers) < 1 {
89-
log.Debug().Msg("No blocks to poll, skipping")
90-
continue
91-
}
92-
endBlock := blockNumbers[len(blockNumbers)-1]
93-
if endBlock != nil {
94-
p.lastPolledBlock = endBlock
95-
}
96-
log.Debug().Msgf("Polling %d blocks starting from %s to %s", len(blockNumbers), blockNumbers[0], endBlock)
9795

98-
endBlockNumberFloat, _ := endBlock.Float64()
99-
metrics.PollerLastTriggeredBlock.Set(endBlockNumberFloat)
100-
101-
worker := worker.NewWorker(p.rpc)
102-
results := worker.Run(blockNumbers)
103-
p.handleWorkerResults(results)
104-
if p.reachedPollLimit(endBlock) {
96+
lastPolledBlock := p.Poll(blockNumbers)
97+
if p.reachedPollLimit(lastPolledBlock) {
10598
log.Debug().Msg("Reached poll limit, exiting poller")
10699
ticker.Stop()
107100
return
@@ -118,11 +111,31 @@ func (p *Poller) Start() {
118111
select {}
119112
}
120113

114+
func (p *Poller) Poll(blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
115+
if len(blockNumbers) < 1 {
116+
log.Debug().Msg("No blocks to poll, skipping")
117+
return
118+
}
119+
endBlock := blockNumbers[len(blockNumbers)-1]
120+
if endBlock != nil {
121+
p.lastPolledBlock = endBlock
122+
}
123+
log.Debug().Msgf("Polling %d blocks starting from %s to %s", len(blockNumbers), blockNumbers[0], endBlock)
124+
125+
endBlockNumberFloat, _ := endBlock.Float64()
126+
metrics.PollerLastTriggeredBlock.Set(endBlockNumberFloat)
127+
128+
worker := worker.NewWorker(p.rpc)
129+
results := worker.Run(blockNumbers)
130+
p.handleWorkerResults(results)
131+
return endBlock
132+
}
133+
121134
func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
122135
return p.pollUntilBlock.Sign() > 0 && blockNumber.Cmp(p.pollUntilBlock) >= 0
123136
}
124137

125-
func (p *Poller) getBlockRange() ([]*big.Int, error) {
138+
func (p *Poller) getNextBlockRange() ([]*big.Int, error) {
126139
latestBlock, err := p.rpc.GetLatestBlockNumber()
127140
if err != nil {
128141
return nil, err
@@ -140,13 +153,7 @@ func (p *Poller) getBlockRange() ([]*big.Int, error) {
140153
return nil, nil
141154
}
142155

143-
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
144-
blockNumbers := make([]*big.Int, blockCount)
145-
for i := int64(0); i < blockCount; i++ {
146-
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
147-
}
148-
149-
return blockNumbers, nil
156+
return p.createBlockNumbersForRange(startBlock, endBlock), nil
150157
}
151158

152159
func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int) *big.Int {
@@ -161,6 +168,15 @@ func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int)
161168
return endBlock
162169
}
163170

171+
func (p *Poller) createBlockNumbersForRange(startBlock *big.Int, endBlock *big.Int) []*big.Int {
172+
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
173+
blockNumbers := make([]*big.Int, blockCount)
174+
for i := int64(0); i < blockCount; i++ {
175+
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
176+
}
177+
return blockNumbers
178+
}
179+
164180
func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
165181
var successfulResults []rpc.GetFullBlockResult
166182
var failedResults []rpc.GetFullBlockResult

0 commit comments

Comments
 (0)