Skip to content

Commit 93e3d08

Browse files
committed
minor optimizations + refactoring
1 parent 501e265 commit 93e3d08

File tree

7 files changed

+214
-11
lines changed

7 files changed

+214
-11
lines changed

internal/orchestrator/committer.go

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,6 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe
7272
opt(committer)
7373
}
7474

75-
// Clean up any stranded blocks in staging
76-
if err := committer.cleanupStrandedBlocks(); err != nil {
77-
log.Error().Err(err).Msg("Failed to clean up stranded blocks during initialization")
78-
}
79-
8075
return committer
8176
}
8277

@@ -93,7 +88,7 @@ func (c *Committer) cleanupStrandedBlocks() error {
9388
}
9489

9590
// Get block numbers from PostgreSQL that are less than latest committed block
96-
psqlBlockNumbers, err := c.storage.StagingStorage.(*storage.PostgresConnector).GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
91+
psqlBlockNumbers, err := c.storage.StagingStorage.GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
9792
if err != nil {
9893
return fmt.Errorf("error getting block numbers from PostgreSQL: %v", err)
9994
}
@@ -109,15 +104,48 @@ func (c *Committer) cleanupStrandedBlocks() error {
109104
Str("max_block", psqlBlockNumbers[len(psqlBlockNumbers)-1].String()).
110105
Msg("Found stranded blocks in staging")
111106

107+
// Process blocks in batches of c.blocksPerCommit, but max 1000 to avoid ClickHouse query limits
108+
batchSize := c.blocksPerCommit
109+
if batchSize > 1000 {
110+
batchSize = 1000
111+
}
112+
113+
for i := 0; i < len(psqlBlockNumbers); i += batchSize {
114+
end := i + batchSize
115+
if end > len(psqlBlockNumbers) {
116+
end = len(psqlBlockNumbers)
117+
}
118+
119+
batchBlockNumbers := psqlBlockNumbers[i:end]
120+
121+
if err := c.processStrandedBlocksBatch(batchBlockNumbers); err != nil {
122+
return fmt.Errorf("error processing stranded blocks batch %d-%d: %v", i, end-1, err)
123+
}
124+
}
125+
126+
return nil
127+
}
128+
129+
func (c *Committer) processStrandedBlocksBatch(blockNumbers []*big.Int) error {
130+
if len(blockNumbers) == 0 {
131+
return nil
132+
}
133+
134+
log.Debug().
135+
Int("batch_size", len(blockNumbers)).
136+
Str("min_block", blockNumbers[0].String()).
137+
Str("max_block", blockNumbers[len(blockNumbers)-1].String()).
138+
Msg("Processing stranded blocks batch")
139+
112140
// Check which blocks exist in ClickHouse
113-
existsInClickHouse, err := c.storage.MainStorage.(*storage.ClickHouseConnector).CheckBlocksExist(c.rpc.GetChainID(), psqlBlockNumbers)
141+
existsInClickHouse, err := c.storage.MainStorage.CheckBlocksExist(c.rpc.GetChainID(), blockNumbers)
114142
if err != nil {
115143
return fmt.Errorf("error checking blocks in ClickHouse: %v", err)
116144
}
117145

118146
// Get block data from PostgreSQL for blocks that don't exist in ClickHouse
119147
var blocksToCommit []common.BlockData
120-
for _, blockNum := range psqlBlockNumbers {
148+
for _, blockNum := range blockNumbers {
121149
if !existsInClickHouse[blockNum.String()] {
122150
data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
123151
BlockNumbers: []*big.Int{blockNum},
@@ -147,7 +175,7 @@ func (c *Committer) cleanupStrandedBlocks() error {
147175

148176
// Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not)
149177
var blocksToDelete []common.BlockData
150-
for _, blockNum := range psqlBlockNumbers {
178+
for _, blockNum := range blockNumbers {
151179
blocksToDelete = append(blocksToDelete, common.BlockData{
152180
Block: common.Block{
153181
ChainId: c.rpc.GetChainID(),
@@ -232,14 +260,14 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
232260
}
233261

234262
// Get block numbers from PostgreSQL that are less than latest committed block
235-
psqlBlockNumbers, err := c.storage.StagingStorage.(*storage.PostgresConnector).GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
263+
psqlBlockNumbers, err := c.storage.StagingStorage.GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
236264
if err != nil {
237265
return nil, fmt.Errorf("error getting block numbers from PostgreSQL: %v", err)
238266
}
239267

240268
if len(psqlBlockNumbers) > 0 {
241269
// Check which blocks exist in ClickHouse
242-
existsInClickHouse, err := c.storage.MainStorage.(*storage.ClickHouseConnector).CheckBlocksExist(c.rpc.GetChainID(), psqlBlockNumbers)
270+
existsInClickHouse, err := c.storage.MainStorage.CheckBlocksExist(c.rpc.GetChainID(), psqlBlockNumbers)
243271
if err != nil {
244272
return nil, fmt.Errorf("error checking blocks in ClickHouse: %v", err)
245273
}

internal/orchestrator/committer_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func TestGetBlockNumbersToCommit(t *testing.T) {
4646

4747
mockRPC.EXPECT().GetChainID().Return(chainID)
4848
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
49+
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil)
4950

5051
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
5152

@@ -70,6 +71,7 @@ func TestGetBlockNumbersToCommitWithoutConfiguredAndNotStored(t *testing.T) {
7071

7172
mockRPC.EXPECT().GetChainID().Return(chainID)
7273
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
74+
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(-1)).Return([]*big.Int{}, nil)
7375

7476
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
7577

@@ -97,6 +99,7 @@ func TestGetBlockNumbersToCommitWithConfiguredAndNotStored(t *testing.T) {
9799

98100
mockRPC.EXPECT().GetChainID().Return(chainID)
99101
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
102+
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(49)).Return([]*big.Int{}, nil)
100103

101104
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
102105

@@ -124,6 +127,7 @@ func TestGetBlockNumbersToCommitWithConfiguredAndStored(t *testing.T) {
124127

125128
mockRPC.EXPECT().GetChainID().Return(chainID)
126129
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
130+
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil)
127131

128132
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
129133

@@ -148,6 +152,7 @@ func TestGetBlockNumbersToCommitWithoutConfiguredAndStored(t *testing.T) {
148152

149153
mockRPC.EXPECT().GetChainID().Return(chainID)
150154
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
155+
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil)
151156

152157
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
153158

@@ -175,6 +180,7 @@ func TestGetBlockNumbersToCommitWithStoredHigherThanInMemory(t *testing.T) {
175180

176181
mockRPC.EXPECT().GetChainID().Return(chainID)
177182
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
183+
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil)
178184

179185
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
180186

@@ -227,6 +233,7 @@ func TestGetBlockNumbersToCommitWithStoredEqualThanInMemory(t *testing.T) {
227233

228234
mockRPC.EXPECT().GetChainID().Return(chainID)
229235
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
236+
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil)
230237

231238
blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())
232239

@@ -253,6 +260,7 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) {
253260

254261
mockRPC.EXPECT().GetChainID().Return(chainID)
255262
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
263+
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil)
256264

257265
blockData := []common.BlockData{
258266
{Block: common.Block{Number: big.NewInt(101)}},
@@ -288,6 +296,7 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
288296

289297
mockRPC.EXPECT().GetChainID().Return(chainID)
290298
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
299+
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil)
291300

292301
blockData := []common.BlockData{
293302
{Block: common.Block{Number: big.NewInt(101)}},
@@ -403,6 +412,7 @@ func TestStartCommitter(t *testing.T) {
403412
chainID := big.NewInt(1)
404413
mockRPC.EXPECT().GetChainID().Return(chainID)
405414
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
415+
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil)
406416

407417
blockData := []common.BlockData{
408418
{Block: common.Block{Number: big.NewInt(101)}},
@@ -437,6 +447,7 @@ func TestCommitterRespectsSIGTERM(t *testing.T) {
437447
chainID := big.NewInt(1)
438448
mockRPC.EXPECT().GetChainID().Return(chainID)
439449
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
450+
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil)
440451

441452
blockData := []common.BlockData{
442453
{Block: common.Block{Number: big.NewInt(101)}},
@@ -502,6 +513,7 @@ func TestHandleMissingStagingData(t *testing.T) {
502513
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)
503514

504515
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
516+
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(-1)).Return([]*big.Int{}, nil)
505517
expectedEndBlock := big.NewInt(4)
506518
mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil)
507519

@@ -547,6 +559,7 @@ func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) {
547559
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)
548560

549561
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
562+
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(-1)).Return([]*big.Int{}, nil)
550563
expectedEndBlock := big.NewInt(4)
551564
mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil)
552565

internal/orchestrator/orchestrator.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,14 @@ func (o *Orchestrator) Start() {
8787
defer workModeMonitor.UnregisterChannel(committerWorkModeChan)
8888
validator := NewValidator(o.rpc, o.storage)
8989
committer := NewCommitter(o.rpc, o.storage, WithCommitterWorkModeChan(committerWorkModeChan), WithValidator(validator))
90+
91+
// Clean up any stranded blocks in staging in a separate goroutine
92+
go func() {
93+
if err := committer.cleanupStrandedBlocks(); err != nil {
94+
log.Error().Err(err).Msg("Failed to clean up stranded blocks during initialization")
95+
}
96+
}()
97+
9098
committer.Start(ctx)
9199
}()
92100
}

internal/storage/clickhouse.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,6 +1158,37 @@ func (c *ClickHouseConnector) DeleteStagingData(data []common.BlockData) error {
11581158
return batch.Send()
11591159
}
11601160

1161+
func (c *ClickHouseConnector) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int) ([]*big.Int, error) {
1162+
query := fmt.Sprintf(`
1163+
SELECT DISTINCT block_number
1164+
FROM %s.block_data FINAL
1165+
WHERE chain_id = ?
1166+
AND block_number < ?
1167+
AND is_deleted = 0
1168+
ORDER BY block_number ASC`, c.cfg.Database)
1169+
1170+
rows, err := c.conn.Query(context.Background(), query, chainId, blockNumber)
1171+
if err != nil {
1172+
return nil, fmt.Errorf("error querying block_data: %v", err)
1173+
}
1174+
defer rows.Close()
1175+
1176+
var blockNumbers []*big.Int
1177+
for rows.Next() {
1178+
var blockNum *big.Int
1179+
if err := rows.Scan(&blockNum); err != nil {
1180+
return nil, fmt.Errorf("error scanning block number: %v", err)
1181+
}
1182+
blockNumbers = append(blockNumbers, blockNum)
1183+
}
1184+
1185+
if err := rows.Err(); err != nil {
1186+
return nil, fmt.Errorf("error iterating rows: %v", err)
1187+
}
1188+
1189+
return blockNumbers, nil
1190+
}
1191+
11611192
func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
11621193
query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database)
11631194
if chainId.Sign() > 0 {

internal/storage/connector.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type IStagingStorage interface {
8383
GetStagingData(qf QueryFilter) (data []common.BlockData, err error)
8484
DeleteStagingData(data []common.BlockData) error
8585
GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
86+
GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int) ([]*big.Int, error)
8687
}
8788

8889
type IMainStorage interface {
@@ -116,6 +117,10 @@ type IMainStorage interface {
116117
* Gets full block data with transactions, logs and traces.
117118
*/
118119
GetFullBlockData(chainId *big.Int, blockNumbers []*big.Int) (blocks []common.BlockData, err error)
120+
/**
121+
* Checks if blocks exist in the storage.
122+
*/
123+
CheckBlocksExist(chainId *big.Int, blockNumbers []*big.Int) (map[string]bool, error)
119124
}
120125

121126
func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) {

test/mocks/MockIMainStorage.go

Lines changed: 59 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)