Skip to content

Commit 019163f

Browse files
committed
cleanup staging on boot
1 parent 0381b98 commit 019163f

File tree

4 files changed

+281
-4
lines changed

4 files changed

+281
-4
lines changed

insight

-50.8 MB
Binary file not shown.

internal/orchestrator/committer.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,105 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe
7272
opt(committer)
7373
}
7474

75+
// Clean up any stranded blocks in staging
76+
if err := committer.cleanupStrandedBlocks(); err != nil {
77+
log.Error().Err(err).Msg("Failed to clean up stranded blocks during initialization")
78+
}
79+
7580
return committer
7681
}
7782

83+
func (c *Committer) cleanupStrandedBlocks() error {
84+
// Get the current max block from main storage
85+
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
86+
if err != nil {
87+
return fmt.Errorf("error getting max block number from main storage: %v", err)
88+
}
89+
90+
if latestCommittedBlockNumber.Sign() == 0 {
91+
// No blocks in main storage yet, nothing to clean up
92+
return nil
93+
}
94+
95+
// Get block numbers from PostgreSQL that are less than or equal to the latest committed block
96+
psqlBlockNumbers, err := c.storage.StagingStorage.(*storage.PostgresConnector).GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
97+
if err != nil {
98+
return fmt.Errorf("error getting block numbers from PostgreSQL: %v", err)
99+
}
100+
101+
if len(psqlBlockNumbers) == 0 {
102+
// No stranded blocks in staging
103+
return nil
104+
}
105+
106+
log.Info().
107+
Int("block_count", len(psqlBlockNumbers)).
108+
Str("min_block", psqlBlockNumbers[0].String()).
109+
Str("max_block", psqlBlockNumbers[len(psqlBlockNumbers)-1].String()).
110+
Msg("Found stranded blocks in staging")
111+
112+
// Check which blocks exist in ClickHouse
113+
existsInClickHouse, err := c.storage.MainStorage.(*storage.ClickHouseConnector).CheckBlocksExist(c.rpc.GetChainID(), psqlBlockNumbers)
114+
if err != nil {
115+
return fmt.Errorf("error checking blocks in ClickHouse: %v", err)
116+
}
117+
118+
// Get block data from PostgreSQL for blocks that don't exist in ClickHouse
119+
var blocksToCommit []common.BlockData
120+
for _, blockNum := range psqlBlockNumbers {
121+
if !existsInClickHouse[blockNum.String()] {
122+
data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
123+
BlockNumbers: []*big.Int{blockNum},
124+
ChainId: c.rpc.GetChainID(),
125+
})
126+
if err != nil {
127+
return fmt.Errorf("error getting block data from PostgreSQL: %v", err)
128+
}
129+
if len(data) > 0 {
130+
blocksToCommit = append(blocksToCommit, data[0])
131+
}
132+
}
133+
}
134+
135+
// Insert blocks into ClickHouse
136+
if len(blocksToCommit) > 0 {
137+
log.Info().
138+
Int("block_count", len(blocksToCommit)).
139+
Str("min_block", blocksToCommit[0].Block.Number.String()).
140+
Str("max_block", blocksToCommit[len(blocksToCommit)-1].Block.Number.String()).
141+
Msg("Committing stranded blocks to ClickHouse")
142+
143+
if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil {
144+
return fmt.Errorf("error inserting blocks into ClickHouse: %v", err)
145+
}
146+
}
147+
148+
// Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not)
149+
var blocksToDelete []common.BlockData
150+
for _, blockNum := range psqlBlockNumbers {
151+
blocksToDelete = append(blocksToDelete, common.BlockData{
152+
Block: common.Block{
153+
ChainId: c.rpc.GetChainID(),
154+
Number: blockNum,
155+
},
156+
})
157+
}
158+
159+
if len(blocksToDelete) > 0 {
160+
log.Info().
161+
Int("block_count", len(blocksToDelete)).
162+
Str("min_block", blocksToDelete[0].Block.Number.String()).
163+
Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()).
164+
Msg("Deleting stranded blocks from PostgreSQL")
165+
166+
if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil {
167+
return fmt.Errorf("error deleting blocks from PostgreSQL: %v", err)
168+
}
169+
}
170+
171+
return nil
172+
}
173+
78174
func (c *Committer) Start(ctx context.Context) {
79175
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
80176

@@ -135,6 +231,68 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
135231
}
136232
}
137233

234+
// Get block numbers from PostgreSQL that are less than or equal to the latest committed block
235+
psqlBlockNumbers, err := c.storage.StagingStorage.(*storage.PostgresConnector).GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
236+
if err != nil {
237+
return nil, fmt.Errorf("error getting block numbers from PostgreSQL: %v", err)
238+
}
239+
240+
if len(psqlBlockNumbers) > 0 {
241+
// Check which blocks exist in ClickHouse
242+
existsInClickHouse, err := c.storage.MainStorage.(*storage.ClickHouseConnector).CheckBlocksExist(c.rpc.GetChainID(), psqlBlockNumbers)
243+
if err != nil {
244+
return nil, fmt.Errorf("error checking blocks in ClickHouse: %v", err)
245+
}
246+
247+
// Get block data from PostgreSQL for blocks that don't exist in ClickHouse
248+
var blocksToCommit []common.BlockData
249+
for _, blockNum := range psqlBlockNumbers {
250+
if !existsInClickHouse[blockNum.String()] {
251+
data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
252+
BlockNumbers: []*big.Int{blockNum},
253+
ChainId: c.rpc.GetChainID(),
254+
})
255+
if err != nil {
256+
return nil, fmt.Errorf("error getting block data from PostgreSQL: %v", err)
257+
}
258+
if len(data) > 0 {
259+
blocksToCommit = append(blocksToCommit, data[0])
260+
}
261+
}
262+
}
263+
264+
// Insert blocks into ClickHouse
265+
if len(blocksToCommit) > 0 {
266+
if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil {
267+
return nil, fmt.Errorf("error inserting blocks into ClickHouse: %v", err)
268+
}
269+
}
270+
271+
// Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not)
272+
var blocksToDelete []common.BlockData
273+
for _, blockNum := range psqlBlockNumbers {
274+
blocksToDelete = append(blocksToDelete, common.BlockData{
275+
Block: common.Block{
276+
ChainId: c.rpc.GetChainID(),
277+
Number: blockNum,
278+
},
279+
})
280+
}
281+
282+
if len(blocksToDelete) > 0 {
283+
log.Info().
284+
Int("block_count", len(blocksToDelete)).
285+
Str("min_block", blocksToDelete[0].Block.Number.String()).
286+
Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()).
287+
Msg("Deleting stranded blocks from PostgreSQL")
288+
289+
if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil {
290+
log.Error().Err(err).Msg("Failed to delete blocks from PostgreSQL")
291+
}
292+
}
293+
}
294+
295+
// Continue with normal block range processing
138296
startBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(1))
139297
endBlock, err := c.getBlockToCommitUntil(ctx, latestCommittedBlockNumber)
140298
if err != nil {

internal/storage/clickhouse.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,89 @@ func scanTrace(rows driver.Rows) (common.Trace, error) {
858858
return trace, nil
859859
}
860860

861+
func (c *ClickHouseConnector) CheckBlocksExist(chainId *big.Int, blockNumbers []*big.Int) (map[string]bool, error) {
862+
if len(blockNumbers) == 0 {
863+
return make(map[string]bool), nil
864+
}
865+
866+
// Convert block numbers to strings for the query
867+
blockNumberStrings := make([]string, len(blockNumbers))
868+
for i, bn := range blockNumbers {
869+
blockNumberStrings[i] = bn.String()
870+
}
871+
872+
// First check blocks table
873+
blocksQuery := fmt.Sprintf(`
874+
SELECT DISTINCT toString(block_number) as block_number_str
875+
FROM %s.blocks
876+
WHERE chain_id = '%s'
877+
AND block_number IN (%s)
878+
AND sign = 1`, c.cfg.Database, chainId.String(), strings.Join(blockNumberStrings, ","))
879+
880+
blocksRows, err := c.conn.Query(context.Background(), blocksQuery)
881+
if err != nil {
882+
return nil, fmt.Errorf("error querying blocks table: %v", err)
883+
}
884+
defer blocksRows.Close()
885+
886+
// Create a map of all block numbers initially set to false
887+
exists := make(map[string]bool)
888+
for _, bn := range blockNumbers {
889+
exists[bn.String()] = false
890+
}
891+
892+
// Mark blocks that exist in blocks table as true
893+
for blocksRows.Next() {
894+
var blockNumberStr string
895+
if err := blocksRows.Scan(&blockNumberStr); err != nil {
896+
return nil, fmt.Errorf("error scanning blocks table: %v", err)
897+
}
898+
exists[blockNumberStr] = true
899+
}
900+
901+
if err := blocksRows.Err(); err != nil {
902+
return nil, fmt.Errorf("error iterating blocks table: %v", err)
903+
}
904+
905+
// Then check inserts_null_table for any remaining blocks
906+
var remainingBlocks []string
907+
for blockNum, found := range exists {
908+
if !found {
909+
remainingBlocks = append(remainingBlocks, blockNum)
910+
}
911+
}
912+
913+
if len(remainingBlocks) > 0 {
914+
nullQuery := fmt.Sprintf(`
915+
SELECT DISTINCT toString(block.block_number) as block_number_str
916+
FROM %s.inserts_null_table
917+
WHERE chain_id = '%s'
918+
AND block.block_number IN (%s)
919+
AND sign = 1`, c.cfg.Database, chainId.String(), strings.Join(remainingBlocks, ","))
920+
921+
nullRows, err := c.conn.Query(context.Background(), nullQuery)
922+
if err != nil {
923+
return nil, fmt.Errorf("error querying inserts_null_table: %v", err)
924+
}
925+
defer nullRows.Close()
926+
927+
// Mark blocks that exist in inserts_null_table as true
928+
for nullRows.Next() {
929+
var blockNumberStr string
930+
if err := nullRows.Scan(&blockNumberStr); err != nil {
931+
return nil, fmt.Errorf("error scanning inserts_null_table: %v", err)
932+
}
933+
exists[blockNumberStr] = true
934+
}
935+
936+
if err := nullRows.Err(); err != nil {
937+
return nil, fmt.Errorf("error iterating inserts_null_table: %v", err)
938+
}
939+
}
940+
941+
return exists, nil
942+
}
943+
861944
func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) {
862945
tableName := c.getTableName(chainId, "blocks")
863946
query := fmt.Sprintf("SELECT block_number FROM %s.%s WHERE chain_id = ? ORDER BY block_number DESC LIMIT 1", c.cfg.Database, tableName)

internal/storage/postgres.go

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func NewPostgresConnector(cfg *config.PostgresConfig) (*PostgresConnector, error
6161

6262
func (p *PostgresConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error) {
6363
query := `SELECT chain_id, block_number, last_error_timestamp, failure_count, reason
64-
FROM block_failures`
64+
FROM block_failures WHERE 1=1`
6565

6666
args := []interface{}{}
6767
argCount := 0
@@ -73,11 +73,13 @@ func (p *PostgresConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFail
7373
}
7474

7575
if len(qf.BlockNumbers) > 0 {
76-
blockNumberStrs := make([]string, len(qf.BlockNumbers))
76+
placeholders := make([]string, len(qf.BlockNumbers))
7777
for i, bn := range qf.BlockNumbers {
78-
blockNumberStrs[i] = bn.String()
78+
argCount++
79+
placeholders[i] = fmt.Sprintf("$%d", argCount)
80+
args = append(args, bn.String())
7981
}
80-
query += fmt.Sprintf(" AND block_number IN (%s)", strings.Join(blockNumberStrs, ","))
82+
query += fmt.Sprintf(" AND block_number IN (%s)", strings.Join(placeholders, ","))
8183
}
8284

8385
if qf.SortBy != "" {
@@ -263,6 +265,40 @@ func (p *PostgresConnector) InsertStagingData(data []common.BlockData) error {
263265
return err
264266
}
265267

268+
func (p *PostgresConnector) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int) ([]*big.Int, error) {
269+
query := `SELECT DISTINCT block_number
270+
FROM block_data
271+
WHERE chain_id = $1
272+
AND block_number < $2
273+
ORDER BY block_number ASC`
274+
275+
rows, err := p.db.Query(query, chainId.String(), blockNumber.String())
276+
if err != nil {
277+
return nil, fmt.Errorf("error querying block_data: %v", err)
278+
}
279+
defer rows.Close()
280+
281+
var blockNumbers []*big.Int
282+
for rows.Next() {
283+
var blockNumberStr string
284+
if err := rows.Scan(&blockNumberStr); err != nil {
285+
return nil, fmt.Errorf("error scanning block number: %v", err)
286+
}
287+
288+
blockNum, ok := new(big.Int).SetString(blockNumberStr, 10)
289+
if !ok {
290+
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
291+
}
292+
blockNumbers = append(blockNumbers, blockNum)
293+
}
294+
295+
if err := rows.Err(); err != nil {
296+
return nil, fmt.Errorf("error iterating rows: %v", err)
297+
}
298+
299+
return blockNumbers, nil
300+
}
301+
266302
func (p *PostgresConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error) {
267303
// No need to check is_deleted since we're using hard deletes for staging data
268304
query := `SELECT data FROM block_data WHERE 1=1`

0 commit comments

Comments
 (0)