Skip to content

Commit 17757d3

Browse files
committed
cleanup blocks on committer start
1 parent 0381b98 commit 17757d3

File tree

5 files changed

+127
-0
lines changed

5 files changed

+127
-0
lines changed

internal/orchestrator/committer.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ func (c *Committer) Start(ctx context.Context) {
7979
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
8080

8181
log.Debug().Msgf("Committer running")
82+
83+
// Clean up staging data before starting the committer
84+
c.cleanupStagingData()
85+
8286
for {
8387
select {
8488
case <-ctx.Done():
@@ -112,6 +116,28 @@ func (c *Committer) Start(ctx context.Context) {
112116
}
113117
}
114118

119+
func (c *Committer) cleanupStagingData() {
120+
// Get the last committed block number from main storage
121+
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
122+
if err != nil {
123+
log.Error().Msgf("Error getting latest committed block number: %v", err)
124+
return
125+
}
126+
127+
if latestCommittedBlockNumber.Sign() == 0 {
128+
log.Debug().Msg("No blocks committed yet, skipping staging data cleanup")
129+
return
130+
}
131+
132+
// Delete all staging data older than the latest committed block number
133+
if err := c.storage.StagingStorage.DeleteOlderThan(c.rpc.GetChainID(), latestCommittedBlockNumber); err != nil {
134+
log.Error().Msgf("Error deleting staging data older than %v: %v", latestCommittedBlockNumber, err)
135+
return
136+
}
137+
138+
log.Info().Msgf("Deleted staging data older than or equal to %v", latestCommittedBlockNumber)
139+
}
140+
115141
func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, error) {
116142
startTime := time.Now()
117143
defer func() {

internal/storage/clickhouse.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2114,6 +2114,53 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []
21142114
return blockData, nil
21152115
}
21162116

2117+
func (c *ClickHouseConnector) DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error {
2118+
// First, get all the block numbers that need to be deleted
2119+
query := fmt.Sprintf(`
2120+
SELECT DISTINCT chain_id, block_number
2121+
FROM %s.block_data
2122+
WHERE chain_id = ? AND block_number <= ? AND is_deleted = 0
2123+
`, c.cfg.Database)
2124+
2125+
rows, err := c.conn.Query(context.Background(), query, chainId, blockNumber)
2126+
if err != nil {
2127+
return err
2128+
}
2129+
defer rows.Close()
2130+
2131+
// Prepare batch for deletion
2132+
deleteQuery := fmt.Sprintf(`
2133+
INSERT INTO %s.block_data (
2134+
chain_id, block_number, is_deleted
2135+
) VALUES (?, ?, ?)
2136+
`, c.cfg.Database)
2137+
2138+
batch, err := c.conn.PrepareBatch(context.Background(), deleteQuery)
2139+
if err != nil {
2140+
return err
2141+
}
2142+
defer batch.Close()
2143+
2144+
// Add each block to the deletion batch
2145+
for rows.Next() {
2146+
var chainIdVal, blockNumberVal *big.Int
2147+
if err := rows.Scan(&chainIdVal, &blockNumberVal); err != nil {
2148+
return err
2149+
}
2150+
2151+
err := batch.Append(
2152+
chainIdVal,
2153+
blockNumberVal,
2154+
1, // is_deleted = 1
2155+
)
2156+
if err != nil {
2157+
return err
2158+
}
2159+
}
2160+
2161+
return batch.Send()
2162+
}
2163+
21172164
// Helper function to test query generation
21182165
func (c *ClickHouseConnector) TestQueryGeneration(table, columns string, qf QueryFilter) string {
21192166
return c.buildQuery(table, columns, qf)

internal/storage/connector.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type IStagingStorage interface {
8383
GetStagingData(qf QueryFilter) (data []common.BlockData, err error)
8484
DeleteStagingData(data []common.BlockData) error
8585
GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
86+
DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error
8687
}
8788

8889
type IMainStorage interface {

internal/storage/postgres.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,12 @@ func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStar
387387
return blockNumber, nil
388388
}
389389

390+
func (p *PostgresConnector) DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error {
391+
query := `DELETE FROM block_data WHERE chain_id = $1 AND block_number < $2`
392+
_, err := p.db.Exec(query, chainId.String(), blockNumber.String())
393+
return err
394+
}
395+
390396
// Close closes the database connection
391397
func (p *PostgresConnector) Close() error {
392398
return p.db.Close()

test/mocks/MockIStagingStorage.go

Lines changed: 47 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)