Skip to content

Commit 82449ed

Browse files
committed
make committer detect if it is stuck and solve it
1 parent 7187f40 commit 82449ed

File tree

3 files changed

+111
-20
lines changed

3 files changed

+111
-20
lines changed

internal/orchestrator/committer.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
109109
}
110110
if blocksData == nil || len(*blocksData) == 0 {
111111
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
112+
c.handleMissingStagingData(blocksToCommit)
112113
return nil, nil
113114
}
114115

@@ -189,3 +190,25 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
189190
poller.Poll(missingBlockNumbers)
190191
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
191192
}
193+
194+
func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
195+
// Checks if there are any blocks in staging after the current range end
196+
lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0))
197+
if err != nil {
198+
log.Error().Err(err).Msg("Error checking staged data for missing range")
199+
return
200+
}
201+
if lastStagedBlockNumber == nil || lastStagedBlockNumber.Sign() <= 0 {
202+
log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks.")
203+
return
204+
}
205+
log.Debug().Msgf("Detected missing blocks in staging data starting from %s.", blocksToCommit[0].String())
206+
207+
poller := NewBoundlessPoller(c.rpc, c.storage)
208+
blocksToPoll := blocksToCommit
209+
if len(blocksToCommit) > int(poller.blocksPerPoll) {
210+
blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)]
211+
}
212+
poller.Poll(blocksToPoll)
213+
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
214+
}

internal/orchestrator/committer_test.go

Lines changed: 88 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,6 @@ func TestGetBlockNumbersToCommit(t *testing.T) {
5050
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
5151
assert.Equal(t, big.NewInt(101), blockNumbers[0])
5252
assert.Equal(t, big.NewInt(100+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1])
53-
54-
mockRPC.AssertExpectations(t)
55-
mockMainStorage.AssertExpectations(t)
5653
}
5754

5855
func TestGetSequentialBlockDataToCommit(t *testing.T) {
@@ -87,10 +84,6 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) {
8784
assert.NoError(t, err)
8885
assert.NotNil(t, result)
8986
assert.Equal(t, 3, len(*result))
90-
91-
mockRPC.AssertExpectations(t)
92-
mockMainStorage.AssertExpectations(t)
93-
mockStagingStorage.AssertExpectations(t)
9487
}
9588

9689
func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
@@ -130,10 +123,6 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
130123
assert.Equal(t, big.NewInt(101), (*result)[0].Block.Number)
131124
assert.Equal(t, big.NewInt(102), (*result)[1].Block.Number)
132125
assert.Equal(t, big.NewInt(103), (*result)[2].Block.Number)
133-
134-
mockRPC.AssertExpectations(t)
135-
mockMainStorage.AssertExpectations(t)
136-
mockStagingStorage.AssertExpectations(t)
137126
}
138127

139128
func TestCommit(t *testing.T) {
@@ -157,9 +146,6 @@ func TestCommit(t *testing.T) {
157146
err := committer.commit(&blockData)
158147

159148
assert.NoError(t, err)
160-
161-
mockMainStorage.AssertExpectations(t)
162-
mockStagingStorage.AssertExpectations(t)
163149
}
164150

165151
func TestHandleGap(t *testing.T) {
@@ -206,7 +192,6 @@ func TestStartCommitter(t *testing.T) {
206192
}
207193

208194
committer := NewCommitter(mockRPC, mockStorage)
209-
committer.storage = mockStorage
210195
committer.triggerIntervalMs = 100 // Set a short interval for testing
211196

212197
chainID := big.NewInt(1)
@@ -226,9 +211,93 @@ func TestStartCommitter(t *testing.T) {
226211

227212
// Wait for a short time to allow the committer to run
228213
time.Sleep(200 * time.Millisecond)
214+
}
215+
216+
func TestHandleMissingStagingData(t *testing.T) {
217+
defer func() { config.Cfg = config.Config{} }()
218+
config.Cfg.Committer.BlocksPerCommit = 5
219+
220+
mockRPC := mocks.NewMockIRPCClient(t)
221+
mockMainStorage := mocks.NewMockIMainStorage(t)
222+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
223+
224+
mockStorage := storage.IStorage{
225+
MainStorage: mockMainStorage,
226+
StagingStorage: mockStagingStorage,
227+
}
228+
229+
committer := NewCommitter(mockRPC, mockStorage)
230+
231+
chainID := big.NewInt(1)
232+
mockRPC.EXPECT().GetChainID().Return(chainID)
233+
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
234+
Blocks: 100,
235+
})
236+
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)}).Return([]rpc.GetFullBlockResult{
237+
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
238+
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
239+
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
240+
{BlockNumber: big.NewInt(3), Data: common.BlockData{Block: common.Block{Number: big.NewInt(3)}}},
241+
{BlockNumber: big.NewInt(4), Data: common.BlockData{Block: common.Block{Number: big.NewInt(4)}}},
242+
})
243+
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)
244+
245+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
246+
expectedEndBlock := big.NewInt(4)
247+
mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil)
248+
249+
blockData := []common.BlockData{}
250+
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
251+
ChainId: chainID,
252+
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
253+
}).Return(&blockData, nil)
254+
255+
result, err := committer.getSequentialBlockDataToCommit()
229256

230-
// Assert that the expected methods were called
231-
mockRPC.AssertExpectations(t)
232-
mockMainStorage.AssertExpectations(t)
233-
mockStagingStorage.AssertExpectations(t)
257+
assert.NoError(t, err)
258+
assert.Nil(t, result)
259+
}
260+
261+
func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) {
262+
defer func() { config.Cfg = config.Config{} }()
263+
config.Cfg.Committer.BlocksPerCommit = 5
264+
config.Cfg.Poller.BlocksPerPoll = 3
265+
266+
mockRPC := mocks.NewMockIRPCClient(t)
267+
mockMainStorage := mocks.NewMockIMainStorage(t)
268+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
269+
270+
mockStorage := storage.IStorage{
271+
MainStorage: mockMainStorage,
272+
StagingStorage: mockStagingStorage,
273+
}
274+
275+
committer := NewCommitter(mockRPC, mockStorage)
276+
277+
chainID := big.NewInt(1)
278+
mockRPC.EXPECT().GetChainID().Return(chainID)
279+
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
280+
Blocks: 3,
281+
})
282+
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2)}).Return([]rpc.GetFullBlockResult{
283+
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
284+
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
285+
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
286+
})
287+
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)
288+
289+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
290+
expectedEndBlock := big.NewInt(4)
291+
mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil)
292+
293+
blockData := []common.BlockData{}
294+
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
295+
ChainId: chainID,
296+
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
297+
}).Return(&blockData, nil)
298+
299+
result, err := committer.getSequentialBlockDataToCommit()
300+
301+
assert.NoError(t, err)
302+
assert.Nil(t, result)
234303
}

internal/storage/clickhouse.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,6 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe
563563
}
564564
return nil, err
565565
}
566-
zLog.Debug().Msgf("Max block number in main storage is: %s", maxBlockNumber.String())
567566
return maxBlockNumber, nil
568567
}
569568

0 commit comments

Comments
 (0)