Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed insight
Binary file not shown.
158 changes: 158 additions & 0 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,105 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe
opt(committer)
}

// Clean up any stranded blocks in staging
if err := committer.cleanupStrandedBlocks(); err != nil {
log.Error().Err(err).Msg("Failed to clean up stranded blocks during initialization")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider failing initialization if cleanup fails.

The cleanup of stranded blocks during initialization only logs errors but doesn't fail the initialization. This could lead to data inconsistency if the cleanup is critical for proper operation.

Consider returning the error to prevent the committer from starting with potentially inconsistent state:

 	// Clean up any stranded blocks in staging
 	if err := committer.cleanupStrandedBlocks(); err != nil {
-		log.Error().Err(err).Msg("Failed to clean up stranded blocks during initialization")
+		return nil, fmt.Errorf("failed to clean up stranded blocks during initialization: %w", err)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Clean up any stranded blocks in staging
if err := committer.cleanupStrandedBlocks(); err != nil {
log.Error().Err(err).Msg("Failed to clean up stranded blocks during initialization")
}
// Clean up any stranded blocks in staging
if err := committer.cleanupStrandedBlocks(); err != nil {
return nil, fmt.Errorf("failed to clean up stranded blocks during initialization: %w", err)
}
🤖 Prompt for AI Agents
In internal/orchestrator/committer.go around lines 75 to 78, the
cleanupStrandedBlocks error is only logged but does not cause initialization to
fail. Modify the code to return the error from the initialization function when
cleanupStrandedBlocks fails, ensuring the committer does not start with a
potentially inconsistent state.


return committer
}

func (c *Committer) cleanupStrandedBlocks() error {
// Get the current max block from main storage
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
if err != nil {
return fmt.Errorf("error getting max block number from main storage: %v", err)
}

if latestCommittedBlockNumber.Sign() == 0 {
// No blocks in main storage yet, nothing to clean up
return nil
}

// Get block numbers from PostgreSQL that are less than latest committed block
psqlBlockNumbers, err := c.storage.StagingStorage.(*storage.PostgresConnector).GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
if err != nil {
return fmt.Errorf("error getting block numbers from PostgreSQL: %v", err)
}

if len(psqlBlockNumbers) == 0 {
// No stranded blocks in staging
return nil
}

log.Info().
Int("block_count", len(psqlBlockNumbers)).
Str("min_block", psqlBlockNumbers[0].String()).
Str("max_block", psqlBlockNumbers[len(psqlBlockNumbers)-1].String()).
Msg("Found stranded blocks in staging")

// Check which blocks exist in ClickHouse
existsInClickHouse, err := c.storage.MainStorage.(*storage.ClickHouseConnector).CheckBlocksExist(c.rpc.GetChainID(), psqlBlockNumbers)
if err != nil {
return fmt.Errorf("error checking blocks in ClickHouse: %v", err)
}

// Get block data from PostgreSQL for blocks that don't exist in ClickHouse
var blocksToCommit []common.BlockData
for _, blockNum := range psqlBlockNumbers {
if !existsInClickHouse[blockNum.String()] {
data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
BlockNumbers: []*big.Int{blockNum},
ChainId: c.rpc.GetChainID(),
})
if err != nil {
return fmt.Errorf("error getting block data from PostgreSQL: %v", err)
}
if len(data) > 0 {
blocksToCommit = append(blocksToCommit, data[0])
}
}
}

// Insert blocks into ClickHouse
if len(blocksToCommit) > 0 {
log.Info().
Int("block_count", len(blocksToCommit)).
Str("min_block", blocksToCommit[0].Block.Number.String()).
Str("max_block", blocksToCommit[len(blocksToCommit)-1].Block.Number.String()).
Msg("Committing stranded blocks to ClickHouse")

if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil {
return fmt.Errorf("error inserting blocks into ClickHouse: %v", err)
}
}

// Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not)
var blocksToDelete []common.BlockData
for _, blockNum := range psqlBlockNumbers {
blocksToDelete = append(blocksToDelete, common.BlockData{
Block: common.Block{
ChainId: c.rpc.GetChainID(),
Number: blockNum,
},
})
}

if len(blocksToDelete) > 0 {
log.Info().
Int("block_count", len(blocksToDelete)).
Str("min_block", blocksToDelete[0].Block.Number.String()).
Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()).
Msg("Deleting stranded blocks from PostgreSQL")

if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil {
return fmt.Errorf("error deleting blocks from PostgreSQL: %v", err)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider transactional semantics for cleanup operations.

The current implementation deletes blocks from PostgreSQL even if they already exist in ClickHouse (lines 149-169). This could lead to data loss if there's a failure between checking existence and deletion.

Consider:

  1. Only delete blocks that were successfully inserted into ClickHouse
  2. Use database transactions if supported
  3. Add a verification step after insertion before deletion
-	// Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not)
-	var blocksToDelete []common.BlockData
-	for _, blockNum := range psqlBlockNumbers {
+	// Only delete blocks that were successfully inserted or already exist
+	var blocksToDelete []common.BlockData
+	for _, blockNum := range psqlBlockNumbers {
+		// Only delete if block exists in ClickHouse or was just inserted
+		if existsInClickHouse[blockNum.String()] || wasJustInserted[blockNum.String()] {

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In internal/orchestrator/committer.go around lines 149 to 169, the deletion of
blocks from PostgreSQL occurs without ensuring they were successfully inserted
into ClickHouse, risking data loss. Modify the code to first verify that each
block has been successfully inserted into ClickHouse before including it in the
deletion list. Implement database transactions around the insertion and deletion
steps if supported to ensure atomicity. Add a verification step after insertion
to confirm success before proceeding with deletion from PostgreSQL.


return nil
}

func (c *Committer) Start(ctx context.Context) {
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond

Expand Down Expand Up @@ -135,6 +231,68 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
}
}

// Get block numbers from PostgreSQL that are less than latest committed block
psqlBlockNumbers, err := c.storage.StagingStorage.(*storage.PostgresConnector).GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
if err != nil {
return nil, fmt.Errorf("error getting block numbers from PostgreSQL: %v", err)
}

if len(psqlBlockNumbers) > 0 {
// Check which blocks exist in ClickHouse
existsInClickHouse, err := c.storage.MainStorage.(*storage.ClickHouseConnector).CheckBlocksExist(c.rpc.GetChainID(), psqlBlockNumbers)
if err != nil {
return nil, fmt.Errorf("error checking blocks in ClickHouse: %v", err)
}

// Get block data from PostgreSQL for blocks that don't exist in ClickHouse
var blocksToCommit []common.BlockData
for _, blockNum := range psqlBlockNumbers {
if !existsInClickHouse[blockNum.String()] {
data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
BlockNumbers: []*big.Int{blockNum},
ChainId: c.rpc.GetChainID(),
})
if err != nil {
return nil, fmt.Errorf("error getting block data from PostgreSQL: %v", err)
}
if len(data) > 0 {
blocksToCommit = append(blocksToCommit, data[0])
}
}
}

// Insert blocks into ClickHouse
if len(blocksToCommit) > 0 {
if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil {
return nil, fmt.Errorf("error inserting blocks into ClickHouse: %v", err)
}
}

// Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not)
var blocksToDelete []common.BlockData
for _, blockNum := range psqlBlockNumbers {
blocksToDelete = append(blocksToDelete, common.BlockData{
Block: common.Block{
ChainId: c.rpc.GetChainID(),
Number: blockNum,
},
})
}

if len(blocksToDelete) > 0 {
log.Info().
Int("block_count", len(blocksToDelete)).
Str("min_block", blocksToDelete[0].Block.Number.String()).
Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()).
Msg("Deleting stranded blocks from PostgreSQL")

if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil {
log.Error().Err(err).Msg("Failed to delete blocks from PostgreSQL")
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Extract duplicate cleanup logic to avoid code duplication.

The cleanup logic in getBlockNumbersToCommit (lines 234-293) is nearly identical to cleanupStrandedBlocks. This violates the DRY principle and makes maintenance harder.

Extract the common logic into a helper method:

func (c *Committer) cleanupBlocksLessThan(blockNumber *big.Int) error {
    // Common cleanup logic here
    // Return both the error and whether any blocks were cleaned up
}

Then call it from both methods:

-	// Get block numbers from PostgreSQL that are less than latest committed block
-	psqlBlockNumbers, err := c.storage.StagingStorage.(*storage.PostgresConnector).GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
-	// ... rest of the duplicate code ...
+	if err := c.cleanupBlocksLessThan(latestCommittedBlockNumber); err != nil {
+		return nil, fmt.Errorf("cleanup failed: %w", err)
+	}

Also note that line 290 only logs the deletion error but doesn't return it, which is inconsistent with the error handling elsewhere in this method.

🤖 Prompt for AI Agents
In internal/orchestrator/committer.go around lines 234 to 293, the block cleanup
logic is duplicated in getBlockNumbersToCommit and cleanupStrandedBlocks,
violating DRY principles. Extract this common cleanup code into a new helper
method named cleanupBlocksLessThan that accepts a block number, performs the
cleanup, and returns an error and a boolean indicating if any blocks were
cleaned. Replace the duplicated code in both methods with calls to this helper.
Additionally, modify the error handling on line 290 to return the deletion error
instead of only logging it, ensuring consistent error propagation.


// Continue with normal block range processing
startBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(1))
endBlock, err := c.getBlockToCommitUntil(ctx, latestCommittedBlockNumber)
if err != nil {
Expand Down
83 changes: 83 additions & 0 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,89 @@ func scanTrace(rows driver.Rows) (common.Trace, error) {
return trace, nil
}

func (c *ClickHouseConnector) CheckBlocksExist(chainId *big.Int, blockNumbers []*big.Int) (map[string]bool, error) {
if len(blockNumbers) == 0 {
return make(map[string]bool), nil
}

// Convert block numbers to strings for the query
blockNumberStrings := make([]string, len(blockNumbers))
for i, bn := range blockNumbers {
blockNumberStrings[i] = bn.String()
}

// First check blocks table
blocksQuery := fmt.Sprintf(`
SELECT DISTINCT toString(block_number) as block_number_str
FROM %s.blocks
WHERE chain_id = '%s'
AND block_number IN (%s)
AND sign = 1`, c.cfg.Database, chainId.String(), strings.Join(blockNumberStrings, ","))

blocksRows, err := c.conn.Query(context.Background(), blocksQuery)
if err != nil {
return nil, fmt.Errorf("error querying blocks table: %v", err)
}
defer blocksRows.Close()

// Create a map of all block numbers initially set to false
exists := make(map[string]bool)
for _, bn := range blockNumbers {
exists[bn.String()] = false
}

// Mark blocks that exist in blocks table as true
for blocksRows.Next() {
var blockNumberStr string
if err := blocksRows.Scan(&blockNumberStr); err != nil {
return nil, fmt.Errorf("error scanning blocks table: %v", err)
}
exists[blockNumberStr] = true
}

if err := blocksRows.Err(); err != nil {
return nil, fmt.Errorf("error iterating blocks table: %v", err)
}

// Then check inserts_null_table for any remaining blocks
var remainingBlocks []string
for blockNum, found := range exists {
if !found {
remainingBlocks = append(remainingBlocks, blockNum)
}
}

if len(remainingBlocks) > 0 {
nullQuery := fmt.Sprintf(`
SELECT DISTINCT toString(block.block_number) as block_number_str
FROM %s.inserts_null_table
WHERE chain_id = '%s'
AND block.block_number IN (%s)
AND sign = 1`, c.cfg.Database, chainId.String(), strings.Join(remainingBlocks, ","))

nullRows, err := c.conn.Query(context.Background(), nullQuery)
if err != nil {
return nil, fmt.Errorf("error querying inserts_null_table: %v", err)
}
defer nullRows.Close()

// Mark blocks that exist in inserts_null_table as true
for nullRows.Next() {
var blockNumberStr string
if err := nullRows.Scan(&blockNumberStr); err != nil {
return nil, fmt.Errorf("error scanning inserts_null_table: %v", err)
}
exists[blockNumberStr] = true
}

if err := nullRows.Err(); err != nil {
return nil, fmt.Errorf("error iterating inserts_null_table: %v", err)
}
}

return exists, nil
}
Comment on lines +861 to +942
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider combining queries and improving safety.

The CheckBlocksExist method implementation has a few areas for improvement:

  1. SQL Injection Risk: While big.Int.String() is generally safe, directly concatenating values into the IN clause is not ideal practice
  2. Performance: Two separate queries could be combined with UNION for better performance
  3. Large Lists: No batching for very large block number lists could cause query size issues

Consider using parameterized queries or at least validating the block numbers:

-	blocksQuery := fmt.Sprintf(`
-		SELECT DISTINCT toString(block_number) as block_number_str
-		FROM %s.blocks
-		WHERE chain_id = '%s' 
-		AND block_number IN (%s)
-		AND sign = 1`, c.cfg.Database, chainId.String(), strings.Join(blockNumberStrings, ","))
+	// Consider using a single UNION query for better performance
+	combinedQuery := fmt.Sprintf(`
+		SELECT DISTINCT toString(block_number) as block_number_str
+		FROM (
+			SELECT block_number FROM %s.blocks
+			WHERE chain_id = ? AND block_number IN (%s) AND sign = 1
+			UNION ALL
+			SELECT block.block_number FROM %s.inserts_null_table
+			WHERE chain_id = ? AND block.block_number IN (%s) AND sign = 1
+		)`, c.cfg.Database, strings.Join(blockNumberStrings, ","), 
+		    c.cfg.Database, strings.Join(blockNumberStrings, ","))

Also consider batching for very large lists:

const maxBlocksPerQuery = 1000
if len(blockNumbers) > maxBlocksPerQuery {
    // Process in batches
}
🤖 Prompt for AI Agents
In internal/storage/clickhouse.go from lines 861 to 942, the CheckBlocksExist
method currently concatenates block numbers directly into the SQL IN clause,
posing a SQL injection risk and lacks batching for large input lists. To fix
this, refactor the method to use parameterized queries with placeholders for
block numbers instead of string concatenation, and combine the two queries into
a single UNION query for better performance. Additionally, implement batching
logic to split large blockNumbers slices into smaller chunks (e.g., 1000 per
batch) and aggregate results across batches before returning.


func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) {
tableName := c.getTableName(chainId, "blocks")
query := fmt.Sprintf("SELECT block_number FROM %s.%s WHERE chain_id = ? ORDER BY block_number DESC LIMIT 1", c.cfg.Database, tableName)
Expand Down
44 changes: 40 additions & 4 deletions internal/storage/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewPostgresConnector(cfg *config.PostgresConfig) (*PostgresConnector, error

func (p *PostgresConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error) {
query := `SELECT chain_id, block_number, last_error_timestamp, failure_count, reason
FROM block_failures`
FROM block_failures WHERE 1=1`

args := []interface{}{}
argCount := 0
Expand All @@ -73,11 +73,13 @@ func (p *PostgresConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFail
}

if len(qf.BlockNumbers) > 0 {
blockNumberStrs := make([]string, len(qf.BlockNumbers))
placeholders := make([]string, len(qf.BlockNumbers))
for i, bn := range qf.BlockNumbers {
blockNumberStrs[i] = bn.String()
argCount++
placeholders[i] = fmt.Sprintf("$%d", argCount)
args = append(args, bn.String())
}
query += fmt.Sprintf(" AND block_number IN (%s)", strings.Join(blockNumberStrs, ","))
query += fmt.Sprintf(" AND block_number IN (%s)", strings.Join(placeholders, ","))
}

if qf.SortBy != "" {
Expand Down Expand Up @@ -263,6 +265,40 @@ func (p *PostgresConnector) InsertStagingData(data []common.BlockData) error {
return err
}

func (p *PostgresConnector) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int) ([]*big.Int, error) {
query := `SELECT DISTINCT block_number
FROM block_data
WHERE chain_id = $1
AND block_number < $2
ORDER BY block_number ASC`

rows, err := p.db.Query(query, chainId.String(), blockNumber.String())
if err != nil {
return nil, fmt.Errorf("error querying block_data: %v", err)
}
defer rows.Close()

var blockNumbers []*big.Int
for rows.Next() {
var blockNumberStr string
if err := rows.Scan(&blockNumberStr); err != nil {
return nil, fmt.Errorf("error scanning block number: %v", err)
}

blockNum, ok := new(big.Int).SetString(blockNumberStr, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
}
blockNumbers = append(blockNumbers, blockNum)
}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating rows: %v", err)
}

return blockNumbers, nil
}
Comment on lines +268 to +300
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding pagination or limits to prevent memory issues.

The GetBlockNumbersLessThan method could potentially return a very large number of blocks if there are many stranded blocks in staging. This could lead to memory issues and slow query performance.

Consider:

  1. Adding a LIMIT clause with pagination support
  2. Processing blocks in batches
  3. Adding an index on (chain_id, block_number) if not already present
-func (p *PostgresConnector) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int) ([]*big.Int, error) {
+func (p *PostgresConnector) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int, limit int) ([]*big.Int, error) {
 	query := `SELECT DISTINCT block_number 
 	          FROM block_data 
 	          WHERE chain_id = $1 
 	          AND block_number < $2
 	          ORDER BY block_number ASC`
+	
+	if limit > 0 {
+		query += fmt.Sprintf(" LIMIT %d", limit)
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (p *PostgresConnector) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int) ([]*big.Int, error) {
query := `SELECT DISTINCT block_number
FROM block_data
WHERE chain_id = $1
AND block_number < $2
ORDER BY block_number ASC`
rows, err := p.db.Query(query, chainId.String(), blockNumber.String())
if err != nil {
return nil, fmt.Errorf("error querying block_data: %v", err)
}
defer rows.Close()
var blockNumbers []*big.Int
for rows.Next() {
var blockNumberStr string
if err := rows.Scan(&blockNumberStr); err != nil {
return nil, fmt.Errorf("error scanning block number: %v", err)
}
blockNum, ok := new(big.Int).SetString(blockNumberStr, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
}
blockNumbers = append(blockNumbers, blockNum)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating rows: %v", err)
}
return blockNumbers, nil
}
func (p *PostgresConnector) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int, limit int) ([]*big.Int, error) {
query := `SELECT DISTINCT block_number
FROM block_data
WHERE chain_id = $1
AND block_number < $2
ORDER BY block_number ASC`
if limit > 0 {
query += fmt.Sprintf(" LIMIT %d", limit)
}
rows, err := p.db.Query(query, chainId.String(), blockNumber.String())
if err != nil {
return nil, fmt.Errorf("error querying block_data: %v", err)
}
defer rows.Close()
var blockNumbers []*big.Int
for rows.Next() {
var blockNumberStr string
if err := rows.Scan(&blockNumberStr); err != nil {
return nil, fmt.Errorf("error scanning block number: %v", err)
}
blockNum, ok := new(big.Int).SetString(blockNumberStr, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
}
blockNumbers = append(blockNumbers, blockNum)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating rows: %v", err)
}
return blockNumbers, nil
}
🤖 Prompt for AI Agents
In internal/storage/postgres.go around lines 268 to 300, the
GetBlockNumbersLessThan method may return too many rows causing memory issues.
Modify the SQL query to include a LIMIT clause and add parameters for pagination
(e.g., offset and limit). Update the method signature to accept these pagination
parameters and adjust the query execution accordingly. Also, ensure the database
has an index on (chain_id, block_number) to optimize query performance.


func (p *PostgresConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error) {
// No need to check is_deleted since we're using hard deletes for staging data
query := `SELECT data FROM block_data WHERE 1=1`
Expand Down
Loading