From 6e89bdca558622bb3c8f98019c5bbb531ad7d928 Mon Sep 17 00:00:00 2001 From: iuwqyir Date: Wed, 19 Feb 2025 15:42:14 +0200 Subject: [PATCH] keep track of last committed block in memory --- internal/orchestrator/committer.go | 44 ++++-- internal/orchestrator/committer_test.go | 174 ++++++++++++++++++++++++ 2 files changed, 204 insertions(+), 14 deletions(-) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 14183ae..72700c0 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -19,11 +19,12 @@ const DEFAULT_COMMITTER_TRIGGER_INTERVAL = 2000 const DEFAULT_BLOCKS_PER_COMMIT = 1000 type Committer struct { - triggerIntervalMs int - blocksPerCommit int - storage storage.IStorage - pollFromBlock *big.Int - rpc rpc.IRPCClient + triggerIntervalMs int + blocksPerCommit int + storage storage.IStorage + commitFromBlock *big.Int + rpc rpc.IRPCClient + lastCommittedBlock *big.Int } func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer { @@ -36,12 +37,14 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer { blocksPerCommit = DEFAULT_BLOCKS_PER_COMMIT } + commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock)) return &Committer{ - triggerIntervalMs: triggerInterval, - blocksPerCommit: blocksPerCommit, - storage: storage, - pollFromBlock: big.NewInt(int64(config.Cfg.Committer.FromBlock)), - rpc: rpc, + triggerIntervalMs: triggerInterval, + blocksPerCommit: blocksPerCommit, + storage: storage, + commitFromBlock: commitFromBlock, + rpc: rpc, + lastCommittedBlock: commitFromBlock, } } @@ -80,8 +83,13 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) { } if latestCommittedBlockNumber.Sign() == 0 { - // If no blocks have been committed yet, start from the fromBlock specified in the config (same start for the poller) - latestCommittedBlockNumber = new(big.Int).Sub(c.pollFromBlock, big.NewInt(1)) + // If no blocks have been committed yet, start from the fromBlock specified in the config + latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) + } else { + if latestCommittedBlockNumber.Cmp(c.lastCommittedBlock) < 0 { + log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), c.lastCommittedBlock.String()) + return []*big.Int{}, nil + } } startBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(1)) @@ -166,10 +174,18 @@ func (c *Committer) commit(blockData *[]common.BlockData) error { return fmt.Errorf("error deleting data from staging storage: %v", err) } + // Find highest block number from committed blocks + highestBlockNumber := (*blockData)[0].Block.Number + for _, block := range *blockData { + if block.Block.Number.Cmp(highestBlockNumber) > 0 { + highestBlockNumber = block.Block.Number + } + } + c.lastCommittedBlock = new(big.Int).Set(highestBlockNumber) + // Update metrics for successful commits metrics.SuccessfulCommits.Add(float64(len(*blockData))) - metrics.LastCommittedBlock.Set(float64((*blockData)[len(*blockData)-1].Block.Number.Int64())) - + metrics.LastCommittedBlock.Set(float64(highestBlockNumber.Int64())) return nil } diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index 6a79061..4a2352d 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -53,6 +53,180 @@ func TestGetBlockNumbersToCommit(t *testing.T) { assert.Equal(t, big.NewInt(100+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1]) } +func TestGetBlockNumbersToCommitWithoutConfiguredAndNotStored(t *testing.T) { + // start from 0 + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + chainID := big.NewInt(1) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil) + + blockNumbers, err := committer.getBlockNumbersToCommit() + + assert.NoError(t, err) + assert.Equal(t, committer.blocksPerCommit, len(blockNumbers)) + assert.Equal(t, big.NewInt(0), blockNumbers[0]) + assert.Equal(t, big.NewInt(int64(committer.blocksPerCommit)-1), blockNumbers[len(blockNumbers)-1]) +} + +func TestGetBlockNumbersToCommitWithConfiguredAndNotStored(t *testing.T) { + // start from configured + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Committer.FromBlock = 50 + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + chainID := big.NewInt(1) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil) + + blockNumbers, err := committer.getBlockNumbersToCommit() + + assert.NoError(t, err) + assert.Equal(t, committer.blocksPerCommit, len(blockNumbers)) + assert.Equal(t, big.NewInt(50), blockNumbers[0]) + assert.Equal(t, big.NewInt(50+int64(committer.blocksPerCommit)-1), blockNumbers[len(blockNumbers)-1]) +} + +func TestGetBlockNumbersToCommitWithConfiguredAndStored(t *testing.T) { + // start from stored + 1 + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Committer.FromBlock = 50 + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + chainID := big.NewInt(1) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil) + + blockNumbers, err := committer.getBlockNumbersToCommit() + + assert.NoError(t, err) + assert.Equal(t, committer.blocksPerCommit, len(blockNumbers)) + assert.Equal(t, big.NewInt(2001), blockNumbers[0]) + assert.Equal(t, big.NewInt(2000+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1]) +} + +func TestGetBlockNumbersToCommitWithoutConfiguredAndStored(t *testing.T) { + // start from stored + 1 + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + chainID := big.NewInt(1) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil) + + blockNumbers, err := committer.getBlockNumbersToCommit() + + assert.NoError(t, err) + assert.Equal(t, committer.blocksPerCommit, len(blockNumbers)) + assert.Equal(t, big.NewInt(2001), blockNumbers[0]) + assert.Equal(t, big.NewInt(2000+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1]) +} + +func TestGetBlockNumbersToCommitWithStoredHigherThanInMemory(t *testing.T) { + // start from stored + 1 + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Committer.FromBlock = 100 + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + chainID := big.NewInt(1) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil) + + blockNumbers, err := committer.getBlockNumbersToCommit() + + assert.NoError(t, err) + assert.Equal(t, committer.blocksPerCommit, len(blockNumbers)) + assert.Equal(t, big.NewInt(2001), blockNumbers[0]) + assert.Equal(t, big.NewInt(2000+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1]) +} + +func TestGetBlockNumbersToCommitWithStoredLowerThanInMemory(t *testing.T) { + // return empty array + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Committer.FromBlock = 100 + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + chainID := big.NewInt(1) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(99), nil) + + blockNumbers, err := committer.getBlockNumbersToCommit() + + assert.NoError(t, err) + assert.Equal(t, 0, len(blockNumbers)) +} + +func TestGetBlockNumbersToCommitWithStoredEqualThanInMemory(t *testing.T) { + // start from stored + 1 + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Committer.FromBlock = 2000 + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + chainID := big.NewInt(1) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil) + + blockNumbers, err := committer.getBlockNumbersToCommit() + + assert.NoError(t, err) + assert.Equal(t, committer.blocksPerCommit, len(blockNumbers)) + assert.Equal(t, big.NewInt(2001), blockNumbers[0]) + assert.Equal(t, big.NewInt(2000+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1]) +} + func TestGetSequentialBlockDataToCommit(t *testing.T) { defer func() { config.Cfg = config.Config{} }() config.Cfg.Committer.BlocksPerCommit = 3