Skip to content

Commit 37e7b71

Browse files
committed
make committer detect if it is stuck and solve it
1 parent 7187f40 commit 37e7b71

File tree

2 files changed

+63
-19
lines changed

2 files changed

+63
-19
lines changed

internal/orchestrator/committer.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
109109
}
110110
if blocksData == nil || len(*blocksData) == 0 {
111111
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
112+
c.handleMissingStagingData(blocksToCommit)
112113
return nil, nil
113114
}
114115

@@ -189,3 +190,21 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
189190
poller.Poll(missingBlockNumbers)
190191
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
191192
}
193+
194+
func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
195+
// Checks if there are any blocks in staging after the current range end
196+
lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0))
197+
if err != nil {
198+
log.Error().Err(err).Msg("Error checking staged data for missing range")
199+
return
200+
}
201+
if lastStagedBlockNumber == nil || lastStagedBlockNumber.Sign() <= 0 {
202+
log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks.")
203+
return
204+
}
205+
log.Debug().Msgf("Detected missing blocks in staging data starting from %s.", blocksToCommit[0].String())
206+
207+
poller := NewBoundlessPoller(c.rpc, c.storage)
208+
poller.Poll(blocksToCommit)
209+
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToCommit), blocksToCommit[0].String(), blocksToCommit[len(blocksToCommit)-1].String())
210+
}

internal/orchestrator/committer_test.go

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,6 @@ func TestGetBlockNumbersToCommit(t *testing.T) {
5050
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
5151
assert.Equal(t, big.NewInt(101), blockNumbers[0])
5252
assert.Equal(t, big.NewInt(100+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1])
53-
54-
mockRPC.AssertExpectations(t)
55-
mockMainStorage.AssertExpectations(t)
5653
}
5754

5855
func TestGetSequentialBlockDataToCommit(t *testing.T) {
@@ -87,10 +84,6 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) {
8784
assert.NoError(t, err)
8885
assert.NotNil(t, result)
8986
assert.Equal(t, 3, len(*result))
90-
91-
mockRPC.AssertExpectations(t)
92-
mockMainStorage.AssertExpectations(t)
93-
mockStagingStorage.AssertExpectations(t)
9487
}
9588

9689
func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
@@ -130,10 +123,6 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
130123
assert.Equal(t, big.NewInt(101), (*result)[0].Block.Number)
131124
assert.Equal(t, big.NewInt(102), (*result)[1].Block.Number)
132125
assert.Equal(t, big.NewInt(103), (*result)[2].Block.Number)
133-
134-
mockRPC.AssertExpectations(t)
135-
mockMainStorage.AssertExpectations(t)
136-
mockStagingStorage.AssertExpectations(t)
137126
}
138127

139128
func TestCommit(t *testing.T) {
@@ -157,9 +146,6 @@ func TestCommit(t *testing.T) {
157146
err := committer.commit(&blockData)
158147

159148
assert.NoError(t, err)
160-
161-
mockMainStorage.AssertExpectations(t)
162-
mockStagingStorage.AssertExpectations(t)
163149
}
164150

165151
func TestHandleGap(t *testing.T) {
@@ -206,7 +192,6 @@ func TestStartCommitter(t *testing.T) {
206192
}
207193

208194
committer := NewCommitter(mockRPC, mockStorage)
209-
committer.storage = mockStorage
210195
committer.triggerIntervalMs = 100 // Set a short interval for testing
211196

212197
chainID := big.NewInt(1)
@@ -226,9 +211,49 @@ func TestStartCommitter(t *testing.T) {
226211

227212
// Wait for a short time to allow the committer to run
228213
time.Sleep(200 * time.Millisecond)
214+
}
229215

230-
// Assert that the expected methods were called
231-
mockRPC.AssertExpectations(t)
232-
mockMainStorage.AssertExpectations(t)
233-
mockStagingStorage.AssertExpectations(t)
216+
func TestHandleMissingStagingData(t *testing.T) {
217+
defer func() { config.Cfg = config.Config{} }()
218+
config.Cfg.Committer.BlocksPerCommit = 5
219+
220+
mockRPC := mocks.NewMockIRPCClient(t)
221+
mockMainStorage := mocks.NewMockIMainStorage(t)
222+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
223+
224+
mockStorage := storage.IStorage{
225+
MainStorage: mockMainStorage,
226+
StagingStorage: mockStagingStorage,
227+
}
228+
229+
committer := NewCommitter(mockRPC, mockStorage)
230+
231+
chainID := big.NewInt(1)
232+
mockRPC.EXPECT().GetChainID().Return(chainID)
233+
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
234+
Blocks: 100,
235+
})
236+
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)}).Return([]rpc.GetFullBlockResult{
237+
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
238+
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
239+
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
240+
{BlockNumber: big.NewInt(3), Data: common.BlockData{Block: common.Block{Number: big.NewInt(3)}}},
241+
{BlockNumber: big.NewInt(4), Data: common.BlockData{Block: common.Block{Number: big.NewInt(4)}}},
242+
})
243+
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)
244+
245+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
246+
expectedEndBlock := big.NewInt(4)
247+
mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil)
248+
249+
blockData := []common.BlockData{}
250+
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
251+
ChainId: chainID,
252+
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
253+
}).Return(&blockData, nil)
254+
255+
result, err := committer.getSequentialBlockDataToCommit()
256+
257+
assert.NoError(t, err)
258+
assert.Nil(t, result)
234259
}

0 commit comments

Comments
 (0)