Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 3 additions & 28 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,33 +184,8 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
}
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())

existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.GetChainID()})
if err != nil {
return fmt.Errorf("error getting block failures while handling gap: %v", err)
}

existingBlockFailuresMap := make(map[string]*common.BlockFailure)
for _, failure := range existingBlockFailures {
blockNumberStr := failure.BlockNumber.String()
existingBlockFailuresMap[blockNumberStr] = &failure
}

blockFailures := make([]common.BlockFailure, 0)
for _, blockNumber := range missingBlockNumbers {
blockNumberStr := blockNumber.String()
if _, ok := existingBlockFailuresMap[blockNumberStr]; !ok {
blockFailures = append(blockFailures, common.BlockFailure{
BlockNumber: blockNumber,
ChainId: c.rpc.GetChainID(),
FailureTime: time.Now(),
FailureCount: 1,
FailureReason: "Gap detected for this block",
})
}
}
log.Debug().Msgf("Storing %d block failures while handling gap", len(blockFailures))
if err := c.storage.OrchestratorStorage.StoreBlockFailures(blockFailures); err != nil {
return fmt.Errorf("error storing block failures while handling gap: %v", err)
}
poller := NewBoundlessPoller(c.rpc, c.storage)
log.Debug().Msgf("Polling %d blocks while handling gap: %v", len(missingBlockNumbers), missingBlockNumbers)
poller.Poll(missingBlockNumbers)
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
}
34 changes: 12 additions & 22 deletions internal/orchestrator/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/mock"
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
mocks "github.com/thirdweb-dev/indexer/test/mocks"
)
Expand Down Expand Up @@ -173,36 +174,25 @@ func TestHandleGap(t *testing.T) {
}
committer := NewCommitter(mockRPC, mockStorage)

chainID := big.NewInt(1)
mockRPC.EXPECT().GetChainID().Return(chainID)

expectedStartBlockNumber := big.NewInt(100)
actualFirstBlock := common.Block{Number: big.NewInt(105)}

mockOrchestratorStorage.EXPECT().GetBlockFailures(storage.QueryFilter{
ChainId: chainID,
BlockNumbers: []*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)},
}).Return([]common.BlockFailure{}, nil)
mockOrchestratorStorage.On("StoreBlockFailures", mock.MatchedBy(func(failures []common.BlockFailure) bool {
return len(failures) == 5 && failures[0].ChainId == chainID && failures[0].BlockNumber.Cmp(big.NewInt(100)) == 0 &&
failures[0].FailureCount == 1 && failures[0].FailureReason == "Gap detected for this block" &&
failures[1].ChainId == chainID && failures[1].BlockNumber.Cmp(big.NewInt(101)) == 0 &&
failures[1].FailureCount == 1 && failures[1].FailureReason == "Gap detected for this block" &&
failures[2].ChainId == chainID && failures[2].BlockNumber.Cmp(big.NewInt(102)) == 0 &&
failures[2].FailureCount == 1 && failures[2].FailureReason == "Gap detected for this block" &&
failures[3].ChainId == chainID && failures[3].BlockNumber.Cmp(big.NewInt(103)) == 0 &&
failures[3].FailureCount == 1 && failures[3].FailureReason == "Gap detected for this block" &&
failures[4].ChainId == chainID && failures[4].BlockNumber.Cmp(big.NewInt(104)) == 0 &&
failures[4].FailureCount == 1 && failures[4].FailureReason == "Gap detected for this block"
})).Return(nil)
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
Blocks: 5,
})
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)}).Return([]rpc.GetFullBlockResult{
{BlockNumber: big.NewInt(100), Data: common.BlockData{Block: common.Block{Number: big.NewInt(100)}}},
{BlockNumber: big.NewInt(101), Data: common.BlockData{Block: common.Block{Number: big.NewInt(101)}}},
{BlockNumber: big.NewInt(102), Data: common.BlockData{Block: common.Block{Number: big.NewInt(102)}}},
{BlockNumber: big.NewInt(103), Data: common.BlockData{Block: common.Block{Number: big.NewInt(103)}}},
{BlockNumber: big.NewInt(104), Data: common.BlockData{Block: common.Block{Number: big.NewInt(104)}}},
})
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)

err := committer.handleGap(expectedStartBlockNumber, actualFirstBlock)

assert.Error(t, err)
assert.Contains(t, err.Error(), "first block number (105) in commit batch does not match expected (100)")

mockRPC.AssertExpectations(t)
mockOrchestratorStorage.AssertExpectations(t)
}

func TestStartCommitter(t *testing.T) {
Expand Down
86 changes: 51 additions & 35 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Poller struct {
triggerIntervalMs int64
storage storage.IStorage
lastPolledBlock *big.Int
pollFromBlock *big.Int
pollUntilBlock *big.Int
parallelPollers int
}
Expand All @@ -33,7 +34,7 @@ type BlockNumberWithError struct {
Error error
}

func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
if blocksPerPoll == 0 {
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
Expand All @@ -42,6 +43,17 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
if triggerInterval == 0 {
triggerInterval = DEFAULT_TRIGGER_INTERVAL
}
return &Poller{
rpc: rpc,
triggerIntervalMs: int64(triggerInterval),
blocksPerPoll: int64(blocksPerPoll),
storage: storage,
parallelPollers: config.Cfg.Poller.ParallelPollers,
}
}

func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
poller := NewBoundlessPoller(rpc, storage)
untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock))
pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock))
lastPolledBlock := new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
Expand All @@ -56,15 +68,10 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
log.Debug().Msgf("Last polled block found in staging: %s", lastPolledBlock.String())
}
}
return &Poller{
rpc: rpc,
triggerIntervalMs: int64(triggerInterval),
blocksPerPoll: int64(blocksPerPoll),
storage: storage,
lastPolledBlock: lastPolledBlock,
pollUntilBlock: untilBlock,
parallelPollers: config.Cfg.Poller.ParallelPollers,
}
poller.lastPolledBlock = lastPolledBlock
poller.pollFromBlock = pollFromBlock
poller.pollUntilBlock = untilBlock
return poller
}

func (p *Poller) Start() {
Expand All @@ -78,30 +85,16 @@ func (p *Poller) Start() {
go func() {
for range tasks {
blockRangeMutex.Lock()
blockNumbers, err := p.getBlockRange()
blockNumbers, err := p.getNextBlockRange()
blockRangeMutex.Unlock()

if err != nil {
log.Error().Err(err).Msg("Error getting block range")
continue
}
if len(blockNumbers) < 1 {
log.Debug().Msg("No blocks to poll, skipping")
continue
}
endBlock := blockNumbers[len(blockNumbers)-1]
if endBlock != nil {
p.lastPolledBlock = endBlock
}
log.Debug().Msgf("Polling %d blocks starting from %s to %s", len(blockNumbers), blockNumbers[0], endBlock)

endBlockNumberFloat, _ := endBlock.Float64()
metrics.PollerLastTriggeredBlock.Set(endBlockNumberFloat)

worker := worker.NewWorker(p.rpc)
results := worker.Run(blockNumbers)
p.handleWorkerResults(results)
if p.reachedPollLimit(endBlock) {
lastPolledBlock := p.Poll(blockNumbers)
if p.reachedPollLimit(lastPolledBlock) {
log.Debug().Msg("Reached poll limit, exiting poller")
ticker.Stop()
return
Expand All @@ -118,11 +111,31 @@ func (p *Poller) Start() {
select {}
}

func (p *Poller) Poll(blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
if len(blockNumbers) < 1 {
log.Debug().Msg("No blocks to poll, skipping")
return
}
endBlock := blockNumbers[len(blockNumbers)-1]
if endBlock != nil {
p.lastPolledBlock = endBlock
}
log.Debug().Msgf("Polling %d blocks starting from %s to %s", len(blockNumbers), blockNumbers[0], endBlock)

endBlockNumberFloat, _ := endBlock.Float64()
metrics.PollerLastTriggeredBlock.Set(endBlockNumberFloat)

worker := worker.NewWorker(p.rpc)
results := worker.Run(blockNumbers)
p.handleWorkerResults(results)
return endBlock
}

func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
return p.pollUntilBlock.Sign() > 0 && blockNumber.Cmp(p.pollUntilBlock) >= 0
}

func (p *Poller) getBlockRange() ([]*big.Int, error) {
func (p *Poller) getNextBlockRange() ([]*big.Int, error) {
latestBlock, err := p.rpc.GetLatestBlockNumber()
if err != nil {
return nil, err
Expand All @@ -140,13 +153,7 @@ func (p *Poller) getBlockRange() ([]*big.Int, error) {
return nil, nil
}

blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
blockNumbers := make([]*big.Int, blockCount)
for i := int64(0); i < blockCount; i++ {
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
}

return blockNumbers, nil
return p.createBlockNumbersForRange(startBlock, endBlock), nil
}

func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int) *big.Int {
Expand All @@ -161,6 +168,15 @@ func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int)
return endBlock
}

func (p *Poller) createBlockNumbersForRange(startBlock *big.Int, endBlock *big.Int) []*big.Int {
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
blockNumbers := make([]*big.Int, blockCount)
for i := int64(0); i < blockCount; i++ {
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
}
return blockNumbers
}

func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
var successfulResults []rpc.GetFullBlockResult
var failedResults []rpc.GetFullBlockResult
Expand Down
Loading