Skip to content

Commit 6e89bdc

Browse files
committed
keep track of last committed block in memory
1 parent d3f0977 commit 6e89bdc

File tree

2 files changed

+204
-14
lines changed

2 files changed

+204
-14
lines changed

internal/orchestrator/committer.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ const DEFAULT_COMMITTER_TRIGGER_INTERVAL = 2000
1919
const DEFAULT_BLOCKS_PER_COMMIT = 1000
2020

2121
type Committer struct {
22-
triggerIntervalMs int
23-
blocksPerCommit int
24-
storage storage.IStorage
25-
pollFromBlock *big.Int
26-
rpc rpc.IRPCClient
22+
triggerIntervalMs int
23+
blocksPerCommit int
24+
storage storage.IStorage
25+
commitFromBlock *big.Int
26+
rpc rpc.IRPCClient
27+
lastCommittedBlock *big.Int
2728
}
2829

2930
func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
@@ -36,12 +37,14 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
3637
blocksPerCommit = DEFAULT_BLOCKS_PER_COMMIT
3738
}
3839

40+
commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock))
3941
return &Committer{
40-
triggerIntervalMs: triggerInterval,
41-
blocksPerCommit: blocksPerCommit,
42-
storage: storage,
43-
pollFromBlock: big.NewInt(int64(config.Cfg.Committer.FromBlock)),
44-
rpc: rpc,
42+
triggerIntervalMs: triggerInterval,
43+
blocksPerCommit: blocksPerCommit,
44+
storage: storage,
45+
commitFromBlock: commitFromBlock,
46+
rpc: rpc,
47+
lastCommittedBlock: commitFromBlock,
4548
}
4649
}
4750

@@ -80,8 +83,13 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
8083
}
8184

8285
if latestCommittedBlockNumber.Sign() == 0 {
83-
// If no blocks have been committed yet, start from the fromBlock specified in the config (same start for the poller)
84-
latestCommittedBlockNumber = new(big.Int).Sub(c.pollFromBlock, big.NewInt(1))
86+
// If no blocks have been committed yet, start from the fromBlock specified in the config
87+
latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
88+
} else {
89+
if latestCommittedBlockNumber.Cmp(c.lastCommittedBlock) < 0 {
90+
log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), c.lastCommittedBlock.String())
91+
return []*big.Int{}, nil
92+
}
8593
}
8694

8795
startBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(1))
@@ -166,10 +174,18 @@ func (c *Committer) commit(blockData *[]common.BlockData) error {
166174
return fmt.Errorf("error deleting data from staging storage: %v", err)
167175
}
168176

177+
// Find highest block number from committed blocks
178+
highestBlockNumber := (*blockData)[0].Block.Number
179+
for _, block := range *blockData {
180+
if block.Block.Number.Cmp(highestBlockNumber) > 0 {
181+
highestBlockNumber = block.Block.Number
182+
}
183+
}
184+
c.lastCommittedBlock = new(big.Int).Set(highestBlockNumber)
185+
169186
// Update metrics for successful commits
170187
metrics.SuccessfulCommits.Add(float64(len(*blockData)))
171-
metrics.LastCommittedBlock.Set(float64((*blockData)[len(*blockData)-1].Block.Number.Int64()))
172-
188+
metrics.LastCommittedBlock.Set(float64(highestBlockNumber.Int64()))
173189
return nil
174190
}
175191

internal/orchestrator/committer_test.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,180 @@ func TestGetBlockNumbersToCommit(t *testing.T) {
5353
assert.Equal(t, big.NewInt(100+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1])
5454
}
5555

56+
func TestGetBlockNumbersToCommitWithoutConfiguredAndNotStored(t *testing.T) {
57+
// start from 0
58+
mockRPC := mocks.NewMockIRPCClient(t)
59+
mockMainStorage := mocks.NewMockIMainStorage(t)
60+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
61+
mockStorage := storage.IStorage{
62+
MainStorage: mockMainStorage,
63+
StagingStorage: mockStagingStorage,
64+
}
65+
committer := NewCommitter(mockRPC, mockStorage)
66+
chainID := big.NewInt(1)
67+
68+
mockRPC.EXPECT().GetChainID().Return(chainID)
69+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
70+
71+
blockNumbers, err := committer.getBlockNumbersToCommit()
72+
73+
assert.NoError(t, err)
74+
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
75+
assert.Equal(t, big.NewInt(0), blockNumbers[0])
76+
assert.Equal(t, big.NewInt(int64(committer.blocksPerCommit)-1), blockNumbers[len(blockNumbers)-1])
77+
}
78+
79+
func TestGetBlockNumbersToCommitWithConfiguredAndNotStored(t *testing.T) {
80+
// start from configured
81+
defer func() { config.Cfg = config.Config{} }()
82+
config.Cfg.Committer.FromBlock = 50
83+
84+
mockRPC := mocks.NewMockIRPCClient(t)
85+
mockMainStorage := mocks.NewMockIMainStorage(t)
86+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
87+
mockStorage := storage.IStorage{
88+
MainStorage: mockMainStorage,
89+
StagingStorage: mockStagingStorage,
90+
}
91+
committer := NewCommitter(mockRPC, mockStorage)
92+
chainID := big.NewInt(1)
93+
94+
mockRPC.EXPECT().GetChainID().Return(chainID)
95+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
96+
97+
blockNumbers, err := committer.getBlockNumbersToCommit()
98+
99+
assert.NoError(t, err)
100+
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
101+
assert.Equal(t, big.NewInt(50), blockNumbers[0])
102+
assert.Equal(t, big.NewInt(50+int64(committer.blocksPerCommit)-1), blockNumbers[len(blockNumbers)-1])
103+
}
104+
105+
func TestGetBlockNumbersToCommitWithConfiguredAndStored(t *testing.T) {
106+
// start from stored + 1
107+
defer func() { config.Cfg = config.Config{} }()
108+
config.Cfg.Committer.FromBlock = 50
109+
110+
mockRPC := mocks.NewMockIRPCClient(t)
111+
mockMainStorage := mocks.NewMockIMainStorage(t)
112+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
113+
mockStorage := storage.IStorage{
114+
MainStorage: mockMainStorage,
115+
StagingStorage: mockStagingStorage,
116+
}
117+
committer := NewCommitter(mockRPC, mockStorage)
118+
chainID := big.NewInt(1)
119+
120+
mockRPC.EXPECT().GetChainID().Return(chainID)
121+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
122+
123+
blockNumbers, err := committer.getBlockNumbersToCommit()
124+
125+
assert.NoError(t, err)
126+
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
127+
assert.Equal(t, big.NewInt(2001), blockNumbers[0])
128+
assert.Equal(t, big.NewInt(2000+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1])
129+
}
130+
131+
func TestGetBlockNumbersToCommitWithoutConfiguredAndStored(t *testing.T) {
132+
// start from stored + 1
133+
mockRPC := mocks.NewMockIRPCClient(t)
134+
mockMainStorage := mocks.NewMockIMainStorage(t)
135+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
136+
mockStorage := storage.IStorage{
137+
MainStorage: mockMainStorage,
138+
StagingStorage: mockStagingStorage,
139+
}
140+
committer := NewCommitter(mockRPC, mockStorage)
141+
chainID := big.NewInt(1)
142+
143+
mockRPC.EXPECT().GetChainID().Return(chainID)
144+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
145+
146+
blockNumbers, err := committer.getBlockNumbersToCommit()
147+
148+
assert.NoError(t, err)
149+
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
150+
assert.Equal(t, big.NewInt(2001), blockNumbers[0])
151+
assert.Equal(t, big.NewInt(2000+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1])
152+
}
153+
154+
func TestGetBlockNumbersToCommitWithStoredHigherThanInMemory(t *testing.T) {
155+
// start from stored + 1
156+
defer func() { config.Cfg = config.Config{} }()
157+
config.Cfg.Committer.FromBlock = 100
158+
159+
mockRPC := mocks.NewMockIRPCClient(t)
160+
mockMainStorage := mocks.NewMockIMainStorage(t)
161+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
162+
mockStorage := storage.IStorage{
163+
MainStorage: mockMainStorage,
164+
StagingStorage: mockStagingStorage,
165+
}
166+
committer := NewCommitter(mockRPC, mockStorage)
167+
chainID := big.NewInt(1)
168+
169+
mockRPC.EXPECT().GetChainID().Return(chainID)
170+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
171+
172+
blockNumbers, err := committer.getBlockNumbersToCommit()
173+
174+
assert.NoError(t, err)
175+
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
176+
assert.Equal(t, big.NewInt(2001), blockNumbers[0])
177+
assert.Equal(t, big.NewInt(2000+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1])
178+
}
179+
180+
func TestGetBlockNumbersToCommitWithStoredLowerThanInMemory(t *testing.T) {
181+
// return empty array
182+
defer func() { config.Cfg = config.Config{} }()
183+
config.Cfg.Committer.FromBlock = 100
184+
185+
mockRPC := mocks.NewMockIRPCClient(t)
186+
mockMainStorage := mocks.NewMockIMainStorage(t)
187+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
188+
mockStorage := storage.IStorage{
189+
MainStorage: mockMainStorage,
190+
StagingStorage: mockStagingStorage,
191+
}
192+
committer := NewCommitter(mockRPC, mockStorage)
193+
chainID := big.NewInt(1)
194+
195+
mockRPC.EXPECT().GetChainID().Return(chainID)
196+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(99), nil)
197+
198+
blockNumbers, err := committer.getBlockNumbersToCommit()
199+
200+
assert.NoError(t, err)
201+
assert.Equal(t, 0, len(blockNumbers))
202+
}
203+
204+
func TestGetBlockNumbersToCommitWithStoredEqualThanInMemory(t *testing.T) {
205+
// start from stored + 1
206+
defer func() { config.Cfg = config.Config{} }()
207+
config.Cfg.Committer.FromBlock = 2000
208+
209+
mockRPC := mocks.NewMockIRPCClient(t)
210+
mockMainStorage := mocks.NewMockIMainStorage(t)
211+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
212+
mockStorage := storage.IStorage{
213+
MainStorage: mockMainStorage,
214+
StagingStorage: mockStagingStorage,
215+
}
216+
committer := NewCommitter(mockRPC, mockStorage)
217+
chainID := big.NewInt(1)
218+
219+
mockRPC.EXPECT().GetChainID().Return(chainID)
220+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
221+
222+
blockNumbers, err := committer.getBlockNumbersToCommit()
223+
224+
assert.NoError(t, err)
225+
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
226+
assert.Equal(t, big.NewInt(2001), blockNumbers[0])
227+
assert.Equal(t, big.NewInt(2000+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1])
228+
}
229+
56230
func TestGetSequentialBlockDataToCommit(t *testing.T) {
57231
defer func() { config.Cfg = config.Config{} }()
58232
config.Cfg.Committer.BlocksPerCommit = 3

0 commit comments

Comments
 (0)