Skip to content

Commit 90abac4

Browse files
committed
fix committing when duplicated blocks in staging
1 parent 18df7d3 commit 90abac4

File tree

3 files changed

+249
-1
lines changed

3 files changed

+249
-1
lines changed

internal/orchestrator/committer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
126126
expectedBlockNumber := new(big.Int).Add((*blocksData)[0].Block.Number, big.NewInt(1))
127127

128128
for i := 1; i < len(*blocksData); i++ {
129+
if (*blocksData)[i].Block.Number.Cmp((*blocksData)[i-1].Block.Number) == 0 {
130+
// Duplicate block, skip -- might happen if block has been polled multiple times
131+
continue
132+
}
129133
if (*blocksData)[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
130134
// Note: Gap detected, stop here
131135
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String())
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
package orchestrator
2+
3+
import (
4+
"math/big"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/mock"
10+
config "github.com/thirdweb-dev/indexer/configs"
11+
"github.com/thirdweb-dev/indexer/internal/common"
12+
"github.com/thirdweb-dev/indexer/internal/storage"
13+
mocks "github.com/thirdweb-dev/indexer/test/mocks"
14+
)
15+
16+
func TestNewCommitter(t *testing.T) {
17+
mockRPC := mocks.NewMockIRPCClient(t)
18+
mockMainStorage := mocks.NewMockIMainStorage(t)
19+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
20+
21+
mockStorage := storage.IStorage{
22+
MainStorage: mockMainStorage,
23+
StagingStorage: mockStagingStorage,
24+
}
25+
committer := NewCommitter(mockRPC, mockStorage)
26+
27+
assert.NotNil(t, committer)
28+
assert.Equal(t, DEFAULT_COMMITTER_TRIGGER_INTERVAL, committer.triggerIntervalMs)
29+
assert.Equal(t, DEFAULT_BLOCKS_PER_COMMIT, committer.blocksPerCommit)
30+
}
31+
32+
func TestGetBlockNumbersToCommit(t *testing.T) {
33+
mockRPC := mocks.NewMockIRPCClient(t)
34+
mockMainStorage := mocks.NewMockIMainStorage(t)
35+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
36+
mockStorage := storage.IStorage{
37+
MainStorage: mockMainStorage,
38+
StagingStorage: mockStagingStorage,
39+
}
40+
committer := NewCommitter(mockRPC, mockStorage)
41+
chainID := big.NewInt(1)
42+
43+
mockRPC.EXPECT().GetChainID().Return(chainID)
44+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
45+
46+
blockNumbers, err := committer.getBlockNumbersToCommit()
47+
48+
assert.NoError(t, err)
49+
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
50+
assert.Equal(t, big.NewInt(101), blockNumbers[0])
51+
assert.Equal(t, big.NewInt(100+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1])
52+
53+
mockRPC.AssertExpectations(t)
54+
mockMainStorage.AssertExpectations(t)
55+
}
56+
57+
func TestGetSequentialBlockDataToCommit(t *testing.T) {
58+
defer func() { config.Cfg = config.Config{} }()
59+
config.Cfg.Committer.BlocksPerCommit = 3
60+
61+
mockRPC := mocks.NewMockIRPCClient(t)
62+
mockMainStorage := mocks.NewMockIMainStorage(t)
63+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
64+
mockStorage := storage.IStorage{
65+
MainStorage: mockMainStorage,
66+
StagingStorage: mockStagingStorage,
67+
}
68+
committer := NewCommitter(mockRPC, mockStorage)
69+
chainID := big.NewInt(1)
70+
71+
mockRPC.EXPECT().GetChainID().Return(chainID)
72+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
73+
74+
blockData := []common.BlockData{
75+
{Block: common.Block{Number: big.NewInt(101)}},
76+
{Block: common.Block{Number: big.NewInt(102)}},
77+
{Block: common.Block{Number: big.NewInt(103)}},
78+
}
79+
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
80+
ChainId: chainID,
81+
BlockNumbers: []*big.Int{big.NewInt(101), big.NewInt(102), big.NewInt(103)},
82+
}).Return(&blockData, nil)
83+
84+
result, err := committer.getSequentialBlockDataToCommit()
85+
86+
assert.NoError(t, err)
87+
assert.NotNil(t, result)
88+
assert.Equal(t, 3, len(*result))
89+
90+
mockRPC.AssertExpectations(t)
91+
mockMainStorage.AssertExpectations(t)
92+
mockStagingStorage.AssertExpectations(t)
93+
}
94+
95+
func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
96+
defer func() { config.Cfg = config.Config{} }()
97+
config.Cfg.Committer.BlocksPerCommit = 3
98+
99+
mockRPC := mocks.NewMockIRPCClient(t)
100+
mockMainStorage := mocks.NewMockIMainStorage(t)
101+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
102+
mockStorage := storage.IStorage{
103+
MainStorage: mockMainStorage,
104+
StagingStorage: mockStagingStorage,
105+
}
106+
committer := NewCommitter(mockRPC, mockStorage)
107+
chainID := big.NewInt(1)
108+
109+
mockRPC.EXPECT().GetChainID().Return(chainID)
110+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
111+
112+
blockData := []common.BlockData{
113+
{Block: common.Block{Number: big.NewInt(101)}},
114+
{Block: common.Block{Number: big.NewInt(102)}},
115+
{Block: common.Block{Number: big.NewInt(102)}},
116+
{Block: common.Block{Number: big.NewInt(103)}},
117+
{Block: common.Block{Number: big.NewInt(103)}},
118+
}
119+
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
120+
ChainId: chainID,
121+
BlockNumbers: []*big.Int{big.NewInt(101), big.NewInt(102), big.NewInt(103)},
122+
}).Return(&blockData, nil)
123+
124+
result, err := committer.getSequentialBlockDataToCommit()
125+
126+
assert.NoError(t, err)
127+
assert.NotNil(t, result)
128+
assert.Equal(t, 3, len(*result))
129+
assert.Equal(t, big.NewInt(101), (*result)[0].Block.Number)
130+
assert.Equal(t, big.NewInt(102), (*result)[1].Block.Number)
131+
assert.Equal(t, big.NewInt(103), (*result)[2].Block.Number)
132+
133+
mockRPC.AssertExpectations(t)
134+
mockMainStorage.AssertExpectations(t)
135+
mockStagingStorage.AssertExpectations(t)
136+
}
137+
138+
func TestCommit(t *testing.T) {
139+
mockRPC := mocks.NewMockIRPCClient(t)
140+
mockMainStorage := mocks.NewMockIMainStorage(t)
141+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
142+
mockStorage := storage.IStorage{
143+
MainStorage: mockMainStorage,
144+
StagingStorage: mockStagingStorage,
145+
}
146+
committer := NewCommitter(mockRPC, mockStorage)
147+
148+
blockData := []common.BlockData{
149+
{Block: common.Block{Number: big.NewInt(101)}},
150+
{Block: common.Block{Number: big.NewInt(102)}},
151+
}
152+
153+
mockMainStorage.EXPECT().InsertBlockData(&blockData).Return(nil)
154+
mockStagingStorage.EXPECT().DeleteStagingData(&blockData).Return(nil)
155+
156+
err := committer.commit(&blockData)
157+
158+
assert.NoError(t, err)
159+
160+
mockMainStorage.AssertExpectations(t)
161+
mockStagingStorage.AssertExpectations(t)
162+
}
163+
164+
func TestHandleGap(t *testing.T) {
165+
mockRPC := mocks.NewMockIRPCClient(t)
166+
mockMainStorage := mocks.NewMockIMainStorage(t)
167+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
168+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
169+
mockStorage := storage.IStorage{
170+
MainStorage: mockMainStorage,
171+
StagingStorage: mockStagingStorage,
172+
OrchestratorStorage: mockOrchestratorStorage,
173+
}
174+
committer := NewCommitter(mockRPC, mockStorage)
175+
176+
chainID := big.NewInt(1)
177+
mockRPC.EXPECT().GetChainID().Return(chainID)
178+
179+
expectedStartBlockNumber := big.NewInt(100)
180+
actualFirstBlock := common.Block{Number: big.NewInt(105)}
181+
182+
mockOrchestratorStorage.EXPECT().GetBlockFailures(storage.QueryFilter{
183+
ChainId: chainID,
184+
BlockNumbers: []*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)},
185+
}).Return([]common.BlockFailure{}, nil)
186+
mockOrchestratorStorage.On("StoreBlockFailures", mock.MatchedBy(func(failures []common.BlockFailure) bool {
187+
return len(failures) == 5 && failures[0].ChainId == chainID && failures[0].BlockNumber.Cmp(big.NewInt(100)) == 0 &&
188+
failures[0].FailureCount == 1 && failures[0].FailureReason == "Gap detected for this block" &&
189+
failures[1].ChainId == chainID && failures[1].BlockNumber.Cmp(big.NewInt(101)) == 0 &&
190+
failures[1].FailureCount == 1 && failures[1].FailureReason == "Gap detected for this block" &&
191+
failures[2].ChainId == chainID && failures[2].BlockNumber.Cmp(big.NewInt(102)) == 0 &&
192+
failures[2].FailureCount == 1 && failures[2].FailureReason == "Gap detected for this block" &&
193+
failures[3].ChainId == chainID && failures[3].BlockNumber.Cmp(big.NewInt(103)) == 0 &&
194+
failures[3].FailureCount == 1 && failures[3].FailureReason == "Gap detected for this block" &&
195+
failures[4].ChainId == chainID && failures[4].BlockNumber.Cmp(big.NewInt(104)) == 0 &&
196+
failures[4].FailureCount == 1 && failures[4].FailureReason == "Gap detected for this block"
197+
})).Return(nil)
198+
199+
err := committer.handleGap(expectedStartBlockNumber, actualFirstBlock)
200+
201+
assert.Error(t, err)
202+
assert.Contains(t, err.Error(), "first block number (105) in commit batch does not match expected (100)")
203+
204+
mockRPC.AssertExpectations(t)
205+
mockOrchestratorStorage.AssertExpectations(t)
206+
}
207+
208+
func TestStartCommitter(t *testing.T) {
209+
mockRPC := mocks.NewMockIRPCClient(t)
210+
mockMainStorage := mocks.NewMockIMainStorage(t)
211+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
212+
213+
mockStorage := storage.IStorage{
214+
MainStorage: mockMainStorage,
215+
StagingStorage: mockStagingStorage,
216+
}
217+
218+
committer := NewCommitter(mockRPC, mockStorage)
219+
committer.storage = mockStorage
220+
committer.triggerIntervalMs = 100 // Set a short interval for testing
221+
222+
chainID := big.NewInt(1)
223+
mockRPC.EXPECT().GetChainID().Return(chainID)
224+
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
225+
226+
blockData := []common.BlockData{
227+
{Block: common.Block{Number: big.NewInt(101)}},
228+
{Block: common.Block{Number: big.NewInt(102)}},
229+
}
230+
mockStagingStorage.On("GetStagingData", mock.Anything).Return(&blockData, nil)
231+
mockMainStorage.On("InsertBlockData", &blockData).Return(nil)
232+
mockStagingStorage.On("DeleteStagingData", &blockData).Return(nil)
233+
234+
// Start the committer in a goroutine
235+
go committer.Start()
236+
237+
// Wait for a short time to allow the committer to run
238+
time.Sleep(200 * time.Millisecond)
239+
240+
// Assert that the expected methods were called
241+
mockRPC.AssertExpectations(t)
242+
mockMainStorage.AssertExpectations(t)
243+
mockStagingStorage.AssertExpectations(t)
244+
}

internal/orchestrator/reorg_handler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ func TestHandleReorg(t *testing.T) {
287287
assert.NoError(t, err)
288288
}
289289

290-
func TestStart(t *testing.T) {
290+
func TestStartReorgHandler(t *testing.T) {
291291
mockRPC := mocks.NewMockIRPCClient(t)
292292
mockMainStorage := mocks.NewMockIMainStorage(t)
293293
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)

0 commit comments

Comments
 (0)