Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
}
if blocksData == nil || len(*blocksData) == 0 {
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
c.handleMissingStagingData(blocksToCommit)
return nil, nil
}

Expand Down Expand Up @@ -189,3 +190,25 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
poller.Poll(missingBlockNumbers)
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
}

func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
// Checks if there are any blocks in staging after the current range end
lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0))
if err != nil {
log.Error().Err(err).Msg("Error checking staged data for missing range")
return
}
if lastStagedBlockNumber == nil || lastStagedBlockNumber.Sign() <= 0 {
log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks.")
return
}
log.Debug().Msgf("Detected missing blocks in staging data starting from %s.", blocksToCommit[0].String())

poller := NewBoundlessPoller(c.rpc, c.storage)
blocksToPoll := blocksToCommit
if len(blocksToCommit) > int(poller.blocksPerPoll) {
blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)]
}
poller.Poll(blocksToPoll)
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
}
107 changes: 88 additions & 19 deletions internal/orchestrator/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ func TestGetBlockNumbersToCommit(t *testing.T) {
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
assert.Equal(t, big.NewInt(101), blockNumbers[0])
assert.Equal(t, big.NewInt(100+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1])

mockRPC.AssertExpectations(t)
mockMainStorage.AssertExpectations(t)
}

func TestGetSequentialBlockDataToCommit(t *testing.T) {
Expand Down Expand Up @@ -87,10 +84,6 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, 3, len(*result))

mockRPC.AssertExpectations(t)
mockMainStorage.AssertExpectations(t)
mockStagingStorage.AssertExpectations(t)
}

func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
Expand Down Expand Up @@ -130,10 +123,6 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
assert.Equal(t, big.NewInt(101), (*result)[0].Block.Number)
assert.Equal(t, big.NewInt(102), (*result)[1].Block.Number)
assert.Equal(t, big.NewInt(103), (*result)[2].Block.Number)

mockRPC.AssertExpectations(t)
mockMainStorage.AssertExpectations(t)
mockStagingStorage.AssertExpectations(t)
}

func TestCommit(t *testing.T) {
Expand All @@ -157,9 +146,6 @@ func TestCommit(t *testing.T) {
err := committer.commit(&blockData)

assert.NoError(t, err)

mockMainStorage.AssertExpectations(t)
mockStagingStorage.AssertExpectations(t)
}

func TestHandleGap(t *testing.T) {
Expand Down Expand Up @@ -206,7 +192,6 @@ func TestStartCommitter(t *testing.T) {
}

committer := NewCommitter(mockRPC, mockStorage)
committer.storage = mockStorage
committer.triggerIntervalMs = 100 // Set a short interval for testing

chainID := big.NewInt(1)
Expand All @@ -226,9 +211,93 @@ func TestStartCommitter(t *testing.T) {

// Wait for a short time to allow the committer to run
time.Sleep(200 * time.Millisecond)
}

func TestHandleMissingStagingData(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.Committer.BlocksPerCommit = 5

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)

mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
}

committer := NewCommitter(mockRPC, mockStorage)

chainID := big.NewInt(1)
mockRPC.EXPECT().GetChainID().Return(chainID)
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
Blocks: 100,
})
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)}).Return([]rpc.GetFullBlockResult{
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
{BlockNumber: big.NewInt(3), Data: common.BlockData{Block: common.Block{Number: big.NewInt(3)}}},
{BlockNumber: big.NewInt(4), Data: common.BlockData{Block: common.Block{Number: big.NewInt(4)}}},
})
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)

mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
expectedEndBlock := big.NewInt(4)
mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil)

blockData := []common.BlockData{}
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
ChainId: chainID,
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
}).Return(&blockData, nil)

result, err := committer.getSequentialBlockDataToCommit()

// Assert that the expected methods were called
mockRPC.AssertExpectations(t)
mockMainStorage.AssertExpectations(t)
mockStagingStorage.AssertExpectations(t)
assert.NoError(t, err)
assert.Nil(t, result)
}

func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.Committer.BlocksPerCommit = 5
config.Cfg.Poller.BlocksPerPoll = 3

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)

mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
}

committer := NewCommitter(mockRPC, mockStorage)

chainID := big.NewInt(1)
mockRPC.EXPECT().GetChainID().Return(chainID)
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
Blocks: 3,
})
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2)}).Return([]rpc.GetFullBlockResult{
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
})
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)

mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
expectedEndBlock := big.NewInt(4)
mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil)

blockData := []common.BlockData{}
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
ChainId: chainID,
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
}).Return(&blockData, nil)

result, err := committer.getSequentialBlockDataToCommit()

assert.NoError(t, err)
assert.Nil(t, result)
}
1 change: 0 additions & 1 deletion internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,6 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe
}
return nil, err
}
zLog.Debug().Msgf("Max block number in main storage is: %s", maxBlockNumber.String())
return maxBlockNumber, nil
}

Expand Down
Loading