Skip to content

Commit 621d0c1

Browse files
committed
poller fixes
1 parent 7042a03 commit 621d0c1

File tree

2 files changed

+17
-6
lines changed

2 files changed

+17
-6
lines changed

internal/orchestrator/poller.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (p *Poller) Start() {
9898
return
9999
}
100100
}
101-
log.Debug().Msgf("Polling blocks %s to %s", blockNumbers[0], endBlock)
101+
log.Debug().Msgf("Polling %d blocks starting from %s to %s", len(blockNumbers), blockNumbers[0], endBlock)
102102

103103
worker := worker.NewWorker(p.rpc)
104104
results := worker.Run(blockNumbers)
@@ -131,11 +131,15 @@ func (p *Poller) getBlockRange() ([]*big.Int, error) {
131131
startBlock := new(big.Int).Add(p.lastPolledBlock, big.NewInt(1))
132132
endBlock := new(big.Int).Add(startBlock, big.NewInt(p.blocksPerPoll-1))
133133

134+
if startBlock.Cmp(latestBlock) > 0 {
135+
log.Debug().Msgf("Start block %s is greater than latest block %s, skipping", startBlock, latestBlock)
136+
return nil, nil
137+
}
134138
if endBlock.Cmp(latestBlock) > 0 {
135139
endBlock = latestBlock
136140
}
137141

138-
blockCount := endBlock.Sub(endBlock, startBlock).Int64() + 1
142+
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
139143
blockNumbers := make([]*big.Int, blockCount)
140144
for i := int64(0); i < blockCount; i++ {
141145
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))

internal/worker/serializer.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package worker
22

33
import (
44
"encoding/json"
5+
"fmt"
56
"math/big"
67
"strconv"
78

@@ -10,7 +11,7 @@ import (
1011
)
1112

1213
func SerializeWorkerResults(chainId *big.Int, blocks []BatchFetchResult[RawBlock], logs []BatchFetchResult[RawLogs], traces []BatchFetchResult[RawTraces]) []WorkerResult {
13-
results := make([]WorkerResult, len(blocks))
14+
results := make([]WorkerResult, 0, len(blocks))
1415

1516
rawLogsMap := make(map[string]BatchFetchResult[RawLogs])
1617
for _, rawLogs := range logs {
@@ -22,14 +23,20 @@ func SerializeWorkerResults(chainId *big.Int, blocks []BatchFetchResult[RawBlock
2223
rawTracesMap[rawTraces.BlockNumber.String()] = rawTraces
2324
}
2425

25-
for i, rawBlock := range blocks {
26+
for _, rawBlock := range blocks {
2627
result := WorkerResult{
2728
BlockNumber: rawBlock.BlockNumber,
2829
}
30+
if rawBlock.Result == nil {
31+
log.Warn().Msgf("Received a nil block result for block %s.", rawBlock.BlockNumber.String())
32+
result.Error = fmt.Errorf("received a nil block result from RPC")
33+
results = append(results, result)
34+
continue
35+
}
2936

3037
if rawBlock.Error != nil {
3138
result.Error = rawBlock.Error
32-
results[i] = result
39+
results = append(results, result)
3340
continue
3441
}
3542

@@ -55,7 +62,7 @@ func SerializeWorkerResults(chainId *big.Int, blocks []BatchFetchResult[RawBlock
5562
}
5663
}
5764

58-
results[i] = result
65+
results = append(results, result)
5966
}
6067

6168
return results

0 commit comments

Comments
 (0)