Skip to content

Commit 7bfa6de

Browse files
authored
cleanup blocks on committer start (#260)
1 parent 0381b98 commit 7bfa6de

File tree

6 files changed

+97
-0
lines changed

6 files changed

+97
-0
lines changed

internal/orchestrator/committer.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ func (c *Committer) Start(ctx context.Context) {
7979
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
8080

8181
log.Debug().Msgf("Committer running")
82+
83+
// Clean up staging data before starting the committer
84+
c.cleanupStagingData()
85+
8286
for {
8387
select {
8488
case <-ctx.Done():
@@ -112,6 +116,28 @@ func (c *Committer) Start(ctx context.Context) {
112116
}
113117
}
114118

119+
func (c *Committer) cleanupStagingData() {
120+
// Get the last committed block number from main storage
121+
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
122+
if err != nil {
123+
log.Error().Msgf("Error getting latest committed block number: %v", err)
124+
return
125+
}
126+
127+
if latestCommittedBlockNumber.Sign() == 0 {
128+
log.Debug().Msg("No blocks committed yet, skipping staging data cleanup")
129+
return
130+
}
131+
132+
// Delete all staging data older than the latest committed block number
133+
if err := c.storage.StagingStorage.DeleteOlderThan(c.rpc.GetChainID(), latestCommittedBlockNumber); err != nil {
134+
log.Error().Msgf("Error deleting staging data older than %v: %v", latestCommittedBlockNumber, err)
135+
return
136+
}
137+
138+
log.Info().Msgf("Deleted staging data older than or equal to %v", latestCommittedBlockNumber)
139+
}
140+
115141
func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, error) {
116142
startTime := time.Now()
117143
defer func() {

internal/orchestrator/committer_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,9 @@ func TestStartCommitter(t *testing.T) {
404404
mockRPC.EXPECT().GetChainID().Return(chainID)
405405
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
406406

407+
// Add expectation for DeleteOlderThan call during cleanup
408+
mockStagingStorage.On("DeleteOlderThan", chainID, big.NewInt(100)).Return(nil)
409+
407410
blockData := []common.BlockData{
408411
{Block: common.Block{Number: big.NewInt(101)}},
409412
{Block: common.Block{Number: big.NewInt(102)}},
@@ -438,6 +441,9 @@ func TestCommitterRespectsSIGTERM(t *testing.T) {
438441
mockRPC.EXPECT().GetChainID().Return(chainID)
439442
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
440443

444+
// Add expectation for DeleteOlderThan call during cleanup
445+
mockStagingStorage.On("DeleteOlderThan", chainID, big.NewInt(100)).Return(nil)
446+
441447
blockData := []common.BlockData{
442448
{Block: common.Block{Number: big.NewInt(101)}},
443449
{Block: common.Block{Number: big.NewInt(102)}},

internal/storage/clickhouse.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2114,6 +2114,17 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []
21142114
return blockData, nil
21152115
}
21162116

2117+
func (c *ClickHouseConnector) DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error {
2118+
query := fmt.Sprintf(`
2119+
INSERT INTO %s.block_data (chain_id, block_number, is_deleted)
2120+
SELECT chain_id, block_number, 1
2121+
FROM %s.block_data
2122+
WHERE chain_id = ? AND block_number <= ? AND is_deleted = 0
2123+
GROUP BY chain_id, block_number
2124+
`, c.cfg.Database, c.cfg.Database)
2125+
return c.conn.Exec(context.Background(), query, chainId, blockNumber)
2126+
}
2127+
21172128
// Helper function to test query generation
21182129
func (c *ClickHouseConnector) TestQueryGeneration(table, columns string, qf QueryFilter) string {
21192130
return c.buildQuery(table, columns, qf)

internal/storage/connector.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type IStagingStorage interface {
8383
GetStagingData(qf QueryFilter) (data []common.BlockData, err error)
8484
DeleteStagingData(data []common.BlockData) error
8585
GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
86+
DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error
8687
}
8788

8889
type IMainStorage interface {

internal/storage/postgres.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,12 @@ func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStar
387387
return blockNumber, nil
388388
}
389389

390+
func (p *PostgresConnector) DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error {
391+
query := `DELETE FROM block_data WHERE chain_id = $1 AND block_number <= $2`
392+
_, err := p.db.Exec(query, chainId.String(), blockNumber.String())
393+
return err
394+
}
395+
390396
// Close closes the database connection
391397
func (p *PostgresConnector) Close() error {
392398
return p.db.Close()

test/mocks/MockIStagingStorage.go

Lines changed: 47 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)