Skip to content

Commit 5bd1700

Browse files
authored
Fix poller creating gaps if pollUntilBlock is configured (#85)
2 parents c54223e + 349328a commit 5bd1700

File tree

1 file changed

+33
-15
lines changed

1 file changed

+33
-15
lines changed

internal/orchestrator/poller.go

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -92,17 +92,17 @@ func (p *Poller) Start() {
9292
endBlock := blockNumbers[len(blockNumbers)-1]
9393
if endBlock != nil {
9494
p.lastPolledBlock = endBlock
95-
if p.reachedPollLimit() {
96-
log.Debug().Msg("Reached poll limit, exiting poller")
97-
ticker.Stop()
98-
return
99-
}
10095
}
10196
log.Debug().Msgf("Polling %d blocks starting from %s to %s", len(blockNumbers), blockNumbers[0], endBlock)
10297

10398
worker := worker.NewWorker(p.rpc)
10499
results := worker.Run(blockNumbers)
105100
p.handleWorkerResults(results)
101+
if p.reachedPollLimit(endBlock) {
102+
log.Debug().Msg("Reached poll limit, exiting poller")
103+
ticker.Stop()
104+
return
105+
}
106106
}
107107
}()
108108
}
@@ -115,28 +115,26 @@ func (p *Poller) Start() {
115115
select {}
116116
}
117117

118-
func (p *Poller) reachedPollLimit() bool {
119-
return p.pollUntilBlock != nil && p.pollUntilBlock.Sign() > 0 && p.lastPolledBlock.Cmp(p.pollUntilBlock) >= 0
118+
func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
119+
return p.pollUntilBlock.Sign() > 0 && blockNumber.Cmp(p.pollUntilBlock) >= 0
120120
}
121121

122122
func (p *Poller) getBlockRange() ([]*big.Int, error) {
123-
latestBlockUint64, err := p.rpc.EthClient.BlockNumber(context.Background())
123+
latestBlock, err := p.getLatestBlockNumber()
124124
if err != nil {
125-
return nil, fmt.Errorf("failed to get latest block number: %v", err)
125+
return nil, err
126126
}
127-
latestBlock := new(big.Int).SetUint64(latestBlockUint64)
128-
129127
log.Debug().Msgf("Last polled block: %s", p.lastPolledBlock.String())
130128

131129
startBlock := new(big.Int).Add(p.lastPolledBlock, big.NewInt(1))
132-
endBlock := new(big.Int).Add(startBlock, big.NewInt(p.blocksPerPoll-1))
133-
134130
if startBlock.Cmp(latestBlock) > 0 {
135131
log.Debug().Msgf("Start block %s is greater than latest block %s, skipping", startBlock, latestBlock)
136132
return nil, nil
137133
}
138-
if endBlock.Cmp(latestBlock) > 0 {
139-
endBlock = latestBlock
134+
endBlock := p.getEndBlockForRange(startBlock, latestBlock)
135+
if startBlock.Cmp(endBlock) > 0 {
136+
log.Debug().Msgf("Invalid range: start block %s is greater than end block %s, skipping", startBlock, endBlock)
137+
return nil, nil
140138
}
141139

142140
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
@@ -148,6 +146,26 @@ func (p *Poller) getBlockRange() ([]*big.Int, error) {
148146
return blockNumbers, nil
149147
}
150148

149+
func (p *Poller) getLatestBlockNumber() (*big.Int, error) {
150+
latestBlockUint64, err := p.rpc.EthClient.BlockNumber(context.Background())
151+
if err != nil {
152+
return nil, fmt.Errorf("failed to get latest block number: %v", err)
153+
}
154+
return new(big.Int).SetUint64(latestBlockUint64), nil
155+
}
156+
157+
func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int) *big.Int {
158+
endBlock := new(big.Int).Add(startBlock, big.NewInt(p.blocksPerPoll-1))
159+
if endBlock.Cmp(latestBlock) > 0 {
160+
endBlock = latestBlock
161+
}
162+
if p.reachedPollLimit(endBlock) {
163+
log.Debug().Msgf("End block %s is greater than poll until block %s, setting to poll until block", endBlock, p.pollUntilBlock)
164+
endBlock = p.pollUntilBlock
165+
}
166+
return endBlock
167+
}
168+
151169
func (p *Poller) handleWorkerResults(results []worker.WorkerResult) {
152170
var successfulResults []worker.WorkerResult
153171
var failedResults []worker.WorkerResult

0 commit comments

Comments
 (0)