Skip to content

Conversation

@nischitpra
Copy link
Collaborator

@nischitpra nischitpra commented Aug 7, 2025

Summary by CodeRabbit

  • New Features

    • Improved reliability by automatically detecting and recovering "stranded" data blocks from staging storage to main storage during initialization and normal operation.
    • Enhanced cleanup processes to remove outdated blocks from staging storage, reducing clutter and potential inconsistencies.
    • Added asynchronous cleanup of stranded blocks during committer startup for smoother operation.
  • Bug Fixes

    • Improved error handling and logging for storage operations to provide better visibility into cleanup activities.
  • Refactor

    • Optimized database queries for safer and more efficient parameter handling.

@coderabbitai
Copy link

coderabbitai bot commented Aug 7, 2025

Walkthrough

This change introduces logic to clean up "stranded" blocks in the staging PostgreSQL database that are older than the latest committed block in ClickHouse. New methods are added to both storage connectors to support this: ClickHouse can now check for the existence of multiple blocks, and PostgreSQL can fetch block numbers less than a given value. Cleanup is performed during committer initialization and before each commit cycle, with detailed logging and error handling.

Changes

Cohort / File(s) Change Summary
Committer stranded blocks cleanup logic
internal/orchestrator/committer.go
Adds cleanupStrandedBlocks to remove blocks from PostgreSQL that are older than the latest ClickHouse block, inserting any missing ones into ClickHouse before deletion. Integrates this cleanup into both initialization and commit cycles, with enhanced logging and error handling.
ClickHouse block existence checks
internal/storage/clickhouse.go
Adds CheckBlocksExist method to determine which block numbers exist in ClickHouse or its null-inserts table for a given chain, supporting the committer's stranded block cleanup logic.
Postgres block number queries and query refactoring
internal/storage/postgres.go
Adds GetBlockNumbersLessThan to fetch distinct block numbers below a threshold for a chain. Refactors query parameterization in GetBlockFailures for safety and maintainability.
Storage interface updates
internal/storage/connector.go
Adds GetBlockNumbersLessThan to IStagingStorage and CheckBlocksExist to IMainStorage interfaces to support stranded block cleanup functionality.
Committer initialization enhancement
internal/orchestrator/orchestrator.go
Adds asynchronous invocation of cleanupStrandedBlocks during committer startup, logging errors without blocking startup.
Mock updates for new storage methods
test/mocks/MockIMainStorage.go, test/mocks/MockIStagingStorage.go
Adds mock implementations and typed call helpers for CheckBlocksExist and GetBlockNumbersLessThan methods in main and staging storage mocks, respectively, supporting new cleanup logic testing.
Committer tests update
internal/orchestrator/committer_test.go
Adds expectations for GetBlockNumbersLessThan calls on staging storage mocks in multiple test cases to align with new cleanup logic.

Sequence Diagram(s)

sequenceDiagram
    participant Committer
    participant Postgres
    participant ClickHouse

    Committer->>ClickHouse: Get latest committed block number
    Committer->>Postgres: Get block numbers less than latest committed
    Postgres-->>Committer: [blockNumbers]
    Committer->>ClickHouse: CheckBlocksExist(blockNumbers)
    ClickHouse-->>Committer: {blockNumber: exists}
    alt Missing blocks in ClickHouse
        Committer->>Postgres: Fetch missing blocks
        Postgres-->>Committer: [blockData]
        Committer->>ClickHouse: Insert missing blocks
    end
    Committer->>Postgres: Delete all stranded blocks
    Postgres-->>Committer: (done)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~35 minutes

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch np/cleanup_staging_on_boot

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0381b98 and 501e265.

📒 Files selected for processing (3)
  • internal/orchestrator/committer.go (2 hunks)
  • internal/storage/clickhouse.go (1 hunks)
  • internal/storage/postgres.go (3 hunks)
🔇 Additional comments (1)
internal/storage/postgres.go (1)

64-64: Good security improvement with parameterized queries.

The addition of WHERE 1=1 is a common pattern for building dynamic queries, and the change to use positional placeholders with argument binding improves SQL injection protection.

Comment on lines 75 to 78
// Clean up any stranded blocks in staging
if err := committer.cleanupStrandedBlocks(); err != nil {
log.Error().Err(err).Msg("Failed to clean up stranded blocks during initialization")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider failing initialization if cleanup fails.

The cleanup of stranded blocks during initialization only logs errors but doesn't fail the initialization. This could lead to data inconsistency if the cleanup is critical for proper operation.

Consider returning the error to prevent the committer from starting with potentially inconsistent state:

 	// Clean up any stranded blocks in staging
 	if err := committer.cleanupStrandedBlocks(); err != nil {
-		log.Error().Err(err).Msg("Failed to clean up stranded blocks during initialization")
+		return nil, fmt.Errorf("failed to clean up stranded blocks during initialization: %w", err)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Clean up any stranded blocks in staging
if err := committer.cleanupStrandedBlocks(); err != nil {
log.Error().Err(err).Msg("Failed to clean up stranded blocks during initialization")
}
// Clean up any stranded blocks in staging
if err := committer.cleanupStrandedBlocks(); err != nil {
return nil, fmt.Errorf("failed to clean up stranded blocks during initialization: %w", err)
}
🤖 Prompt for AI Agents
In internal/orchestrator/committer.go around lines 75 to 78, the
cleanupStrandedBlocks error is only logged but does not cause initialization to
fail. Modify the code to return the error from the initialization function when
cleanupStrandedBlocks fails, ensuring the committer does not start with a
potentially inconsistent state.

Comment on lines 149 to 169
var blocksToDelete []common.BlockData
for _, blockNum := range psqlBlockNumbers {
blocksToDelete = append(blocksToDelete, common.BlockData{
Block: common.Block{
ChainId: c.rpc.GetChainID(),
Number: blockNum,
},
})
}

if len(blocksToDelete) > 0 {
log.Info().
Int("block_count", len(blocksToDelete)).
Str("min_block", blocksToDelete[0].Block.Number.String()).
Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()).
Msg("Deleting stranded blocks from PostgreSQL")

if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil {
return fmt.Errorf("error deleting blocks from PostgreSQL: %v", err)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider transactional semantics for cleanup operations.

The current implementation deletes blocks from PostgreSQL even if they already exist in ClickHouse (lines 149-169). This could lead to data loss if there's a failure between checking existence and deletion.

Consider:

  1. Only delete blocks that were successfully inserted into ClickHouse
  2. Use database transactions if supported
  3. Add a verification step after insertion before deletion
-	// Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not)
-	var blocksToDelete []common.BlockData
-	for _, blockNum := range psqlBlockNumbers {
+	// Only delete blocks that were successfully inserted or already exist
+	var blocksToDelete []common.BlockData
+	for _, blockNum := range psqlBlockNumbers {
+		// Only delete if block exists in ClickHouse or was just inserted
+		if existsInClickHouse[blockNum.String()] || wasJustInserted[blockNum.String()] {

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In internal/orchestrator/committer.go around lines 149 to 169, the deletion of
blocks from PostgreSQL occurs without ensuring they were successfully inserted
into ClickHouse, risking data loss. Modify the code to first verify that each
block has been successfully inserted into ClickHouse before including it in the
deletion list. Implement database transactions around the insertion and deletion
steps if supported to ensure atomicity. Add a verification step after insertion
to confirm success before proceeding with deletion from PostgreSQL.

Comment on lines 234 to 293
// Get block numbers from PostgreSQL that are less than latest committed block
psqlBlockNumbers, err := c.storage.StagingStorage.(*storage.PostgresConnector).GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
if err != nil {
return nil, fmt.Errorf("error getting block numbers from PostgreSQL: %v", err)
}

if len(psqlBlockNumbers) > 0 {
// Check which blocks exist in ClickHouse
existsInClickHouse, err := c.storage.MainStorage.(*storage.ClickHouseConnector).CheckBlocksExist(c.rpc.GetChainID(), psqlBlockNumbers)
if err != nil {
return nil, fmt.Errorf("error checking blocks in ClickHouse: %v", err)
}

// Get block data from PostgreSQL for blocks that don't exist in ClickHouse
var blocksToCommit []common.BlockData
for _, blockNum := range psqlBlockNumbers {
if !existsInClickHouse[blockNum.String()] {
data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
BlockNumbers: []*big.Int{blockNum},
ChainId: c.rpc.GetChainID(),
})
if err != nil {
return nil, fmt.Errorf("error getting block data from PostgreSQL: %v", err)
}
if len(data) > 0 {
blocksToCommit = append(blocksToCommit, data[0])
}
}
}

// Insert blocks into ClickHouse
if len(blocksToCommit) > 0 {
if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil {
return nil, fmt.Errorf("error inserting blocks into ClickHouse: %v", err)
}
}

// Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not)
var blocksToDelete []common.BlockData
for _, blockNum := range psqlBlockNumbers {
blocksToDelete = append(blocksToDelete, common.BlockData{
Block: common.Block{
ChainId: c.rpc.GetChainID(),
Number: blockNum,
},
})
}

if len(blocksToDelete) > 0 {
log.Info().
Int("block_count", len(blocksToDelete)).
Str("min_block", blocksToDelete[0].Block.Number.String()).
Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()).
Msg("Deleting stranded blocks from PostgreSQL")

if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil {
log.Error().Err(err).Msg("Failed to delete blocks from PostgreSQL")
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Extract duplicate cleanup logic to avoid code duplication.

The cleanup logic in getBlockNumbersToCommit (lines 234-293) is nearly identical to cleanupStrandedBlocks. This violates the DRY principle and makes maintenance harder.

Extract the common logic into a helper method:

func (c *Committer) cleanupBlocksLessThan(blockNumber *big.Int) error {
    // Common cleanup logic here
    // Return both the error and whether any blocks were cleaned up
}

Then call it from both methods:

-	// Get block numbers from PostgreSQL that are less than latest committed block
-	psqlBlockNumbers, err := c.storage.StagingStorage.(*storage.PostgresConnector).GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
-	// ... rest of the duplicate code ...
+	if err := c.cleanupBlocksLessThan(latestCommittedBlockNumber); err != nil {
+		return nil, fmt.Errorf("cleanup failed: %w", err)
+	}

Also note that line 290 only logs the deletion error but doesn't return it, which is inconsistent with the error handling elsewhere in this method.

🤖 Prompt for AI Agents
In internal/orchestrator/committer.go around lines 234 to 293, the block cleanup
logic is duplicated in getBlockNumbersToCommit and cleanupStrandedBlocks,
violating DRY principles. Extract this common cleanup code into a new helper
method named cleanupBlocksLessThan that accepts a block number, performs the
cleanup, and returns an error and a boolean indicating if any blocks were
cleaned. Replace the duplicated code in both methods with calls to this helper.
Additionally, modify the error handling on line 290 to return the deletion error
instead of only logging it, ensuring consistent error propagation.

Comment on lines +861 to +942
func (c *ClickHouseConnector) CheckBlocksExist(chainId *big.Int, blockNumbers []*big.Int) (map[string]bool, error) {
if len(blockNumbers) == 0 {
return make(map[string]bool), nil
}

// Convert block numbers to strings for the query
blockNumberStrings := make([]string, len(blockNumbers))
for i, bn := range blockNumbers {
blockNumberStrings[i] = bn.String()
}

// First check blocks table
blocksQuery := fmt.Sprintf(`
SELECT DISTINCT toString(block_number) as block_number_str
FROM %s.blocks
WHERE chain_id = '%s'
AND block_number IN (%s)
AND sign = 1`, c.cfg.Database, chainId.String(), strings.Join(blockNumberStrings, ","))

blocksRows, err := c.conn.Query(context.Background(), blocksQuery)
if err != nil {
return nil, fmt.Errorf("error querying blocks table: %v", err)
}
defer blocksRows.Close()

// Create a map of all block numbers initially set to false
exists := make(map[string]bool)
for _, bn := range blockNumbers {
exists[bn.String()] = false
}

// Mark blocks that exist in blocks table as true
for blocksRows.Next() {
var blockNumberStr string
if err := blocksRows.Scan(&blockNumberStr); err != nil {
return nil, fmt.Errorf("error scanning blocks table: %v", err)
}
exists[blockNumberStr] = true
}

if err := blocksRows.Err(); err != nil {
return nil, fmt.Errorf("error iterating blocks table: %v", err)
}

// Then check inserts_null_table for any remaining blocks
var remainingBlocks []string
for blockNum, found := range exists {
if !found {
remainingBlocks = append(remainingBlocks, blockNum)
}
}

if len(remainingBlocks) > 0 {
nullQuery := fmt.Sprintf(`
SELECT DISTINCT toString(block.block_number) as block_number_str
FROM %s.inserts_null_table
WHERE chain_id = '%s'
AND block.block_number IN (%s)
AND sign = 1`, c.cfg.Database, chainId.String(), strings.Join(remainingBlocks, ","))

nullRows, err := c.conn.Query(context.Background(), nullQuery)
if err != nil {
return nil, fmt.Errorf("error querying inserts_null_table: %v", err)
}
defer nullRows.Close()

// Mark blocks that exist in inserts_null_table as true
for nullRows.Next() {
var blockNumberStr string
if err := nullRows.Scan(&blockNumberStr); err != nil {
return nil, fmt.Errorf("error scanning inserts_null_table: %v", err)
}
exists[blockNumberStr] = true
}

if err := nullRows.Err(); err != nil {
return nil, fmt.Errorf("error iterating inserts_null_table: %v", err)
}
}

return exists, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider combining queries and improving safety.

The CheckBlocksExist method implementation has a few areas for improvement:

  1. SQL Injection Risk: While big.Int.String() is generally safe, directly concatenating values into the IN clause is not ideal practice
  2. Performance: Two separate queries could be combined with UNION for better performance
  3. Large Lists: No batching for very large block number lists could cause query size issues

Consider using parameterized queries or at least validating the block numbers:

-	blocksQuery := fmt.Sprintf(`
-		SELECT DISTINCT toString(block_number) as block_number_str
-		FROM %s.blocks
-		WHERE chain_id = '%s' 
-		AND block_number IN (%s)
-		AND sign = 1`, c.cfg.Database, chainId.String(), strings.Join(blockNumberStrings, ","))
+	// Consider using a single UNION query for better performance
+	combinedQuery := fmt.Sprintf(`
+		SELECT DISTINCT toString(block_number) as block_number_str
+		FROM (
+			SELECT block_number FROM %s.blocks
+			WHERE chain_id = ? AND block_number IN (%s) AND sign = 1
+			UNION ALL
+			SELECT block.block_number FROM %s.inserts_null_table
+			WHERE chain_id = ? AND block.block_number IN (%s) AND sign = 1
+		)`, c.cfg.Database, strings.Join(blockNumberStrings, ","), 
+		    c.cfg.Database, strings.Join(blockNumberStrings, ","))

Also consider batching for very large lists:

const maxBlocksPerQuery = 1000
if len(blockNumbers) > maxBlocksPerQuery {
    // Process in batches
}
🤖 Prompt for AI Agents
In internal/storage/clickhouse.go from lines 861 to 942, the CheckBlocksExist
method currently concatenates block numbers directly into the SQL IN clause,
posing a SQL injection risk and lacks batching for large input lists. To fix
this, refactor the method to use parameterized queries with placeholders for
block numbers instead of string concatenation, and combine the two queries into
a single UNION query for better performance. Additionally, implement batching
logic to split large blockNumbers slices into smaller chunks (e.g., 1000 per
batch) and aggregate results across batches before returning.

Comment on lines +268 to +300
func (p *PostgresConnector) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int) ([]*big.Int, error) {
query := `SELECT DISTINCT block_number
FROM block_data
WHERE chain_id = $1
AND block_number < $2
ORDER BY block_number ASC`

rows, err := p.db.Query(query, chainId.String(), blockNumber.String())
if err != nil {
return nil, fmt.Errorf("error querying block_data: %v", err)
}
defer rows.Close()

var blockNumbers []*big.Int
for rows.Next() {
var blockNumberStr string
if err := rows.Scan(&blockNumberStr); err != nil {
return nil, fmt.Errorf("error scanning block number: %v", err)
}

blockNum, ok := new(big.Int).SetString(blockNumberStr, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
}
blockNumbers = append(blockNumbers, blockNum)
}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating rows: %v", err)
}

return blockNumbers, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding pagination or limits to prevent memory issues.

The GetBlockNumbersLessThan method could potentially return a very large number of blocks if there are many stranded blocks in staging. This could lead to memory issues and slow query performance.

Consider:

  1. Adding a LIMIT clause with pagination support
  2. Processing blocks in batches
  3. Adding an index on (chain_id, block_number) if not already present
-func (p *PostgresConnector) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int) ([]*big.Int, error) {
+func (p *PostgresConnector) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int, limit int) ([]*big.Int, error) {
 	query := `SELECT DISTINCT block_number 
 	          FROM block_data 
 	          WHERE chain_id = $1 
 	          AND block_number < $2
 	          ORDER BY block_number ASC`
+	
+	if limit > 0 {
+		query += fmt.Sprintf(" LIMIT %d", limit)
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (p *PostgresConnector) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int) ([]*big.Int, error) {
query := `SELECT DISTINCT block_number
FROM block_data
WHERE chain_id = $1
AND block_number < $2
ORDER BY block_number ASC`
rows, err := p.db.Query(query, chainId.String(), blockNumber.String())
if err != nil {
return nil, fmt.Errorf("error querying block_data: %v", err)
}
defer rows.Close()
var blockNumbers []*big.Int
for rows.Next() {
var blockNumberStr string
if err := rows.Scan(&blockNumberStr); err != nil {
return nil, fmt.Errorf("error scanning block number: %v", err)
}
blockNum, ok := new(big.Int).SetString(blockNumberStr, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
}
blockNumbers = append(blockNumbers, blockNum)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating rows: %v", err)
}
return blockNumbers, nil
}
func (p *PostgresConnector) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int, limit int) ([]*big.Int, error) {
query := `SELECT DISTINCT block_number
FROM block_data
WHERE chain_id = $1
AND block_number < $2
ORDER BY block_number ASC`
if limit > 0 {
query += fmt.Sprintf(" LIMIT %d", limit)
}
rows, err := p.db.Query(query, chainId.String(), blockNumber.String())
if err != nil {
return nil, fmt.Errorf("error querying block_data: %v", err)
}
defer rows.Close()
var blockNumbers []*big.Int
for rows.Next() {
var blockNumberStr string
if err := rows.Scan(&blockNumberStr); err != nil {
return nil, fmt.Errorf("error scanning block number: %v", err)
}
blockNum, ok := new(big.Int).SetString(blockNumberStr, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
}
blockNumbers = append(blockNumbers, blockNum)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating rows: %v", err)
}
return blockNumbers, nil
}
🤖 Prompt for AI Agents
In internal/storage/postgres.go around lines 268 to 300, the
GetBlockNumbersLessThan method may return too many rows causing memory issues.
Modify the SQL query to include a LIMIT clause and add parameters for pagination
(e.g., offset and limit). Update the method signature to accept these pagination
parameters and adjust the query execution accordingly. Also, ensure the database
has an index on (chain_id, block_number) to optimize query performance.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
internal/orchestrator/orchestrator.go (1)

91-97: Avoid fire-and-forget cleanup; run it synchronously and abort on failure to prevent races and inconsistent state.

Starting cleanup in a separate goroutine while the committer also performs similar cleanup in its loop can cause concurrent inserts/deletes of the same blocks and duplicate work. If cleanup is critical, fail fast and don’t start the committer on error.

Apply this diff to run cleanup synchronously and skip starting the committer on failure:

-			// Clean up any stranded blocks in staging in a separate goroutine
-			go func() {
-				if err := committer.cleanupStrandedBlocks(); err != nil {
-					log.Error().Err(err).Msg("Failed to clean up stranded blocks during initialization")
-				}
-			}()
+			// Clean up any stranded blocks in staging before starting
+			if err := committer.cleanupStrandedBlocks(); err != nil {
+				log.Error().Err(err).Msg("Failed to clean up stranded blocks during initialization, skipping committer start")
+				return
+			}

Optionally, remove the duplicate cleanup inside the committer loop (see comment in committer.go) to keep logic DRY and single-sourced.

internal/orchestrator/committer.go (1)

262-321: Deduplicate cleanup logic: call cleanupStrandedBlocks() instead of inlining it.

This block replicates the cleanup logic already implemented in cleanupStrandedBlocks(), violating DRY and increasing maintenance cost. Replace it with a direct call to the helper.

Apply this diff:

-	// Get block numbers from PostgreSQL that are less than latest committed block
-	psqlBlockNumbers, err := c.storage.StagingStorage.GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
-	if err != nil {
-		return nil, fmt.Errorf("error getting block numbers from PostgreSQL: %v", err)
-	}
-
-	if len(psqlBlockNumbers) > 0 {
-		// Check which blocks exist in ClickHouse
-		existsInClickHouse, err := c.storage.MainStorage.CheckBlocksExist(c.rpc.GetChainID(), psqlBlockNumbers)
-		if err != nil {
-			return nil, fmt.Errorf("error checking blocks in ClickHouse: %v", err)
-		}
-
-        // (Inline N+1 fetching + unconditional delete...)
-        // ... removed ...
-	}
+	// Clean up any stranded blocks before proceeding
+	if err := c.cleanupStrandedBlocks(); err != nil {
+		return nil, fmt.Errorf("cleanup stranded blocks: %w", err)
+	}
🧹 Nitpick comments (4)
internal/storage/connector.go (2)

86-86: Document strict less-than semantics and large-result behavior.

Please add a doc comment clarifying:

  • Whether “LessThan” is strict (<) vs inclusive (<=),
  • Ordering of the returned slice (ascending),
  • Expected behavior when the result set is very large (consider pagination/limits to avoid unbounded memory).

119-124: Clarify the key format and idempotency expectations of CheckBlocksExist.

Document that the returned map is keyed by the decimal string of the block number (e.g., big.Int.String()) to avoid mismatches. Also note any idempotency guarantees (e.g., consistent results during concurrent writes).

internal/orchestrator/committer.go (1)

121-124: Minor: batch bounds in log message use slice indices, not block numbers.

The error message “batch %d-%d” uses index positions, not actual block numbers. Consider including the min/max block number for clarity.

test/mocks/MockIMainStorage.go (1)

973-1030: LGTM — mock added correctly and matches the interface.

Optional: add a short comment in tests clarifying that keys in the returned map are decimal strings of block numbers to prevent mismatches.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 501e265 and 93e3d08.

📒 Files selected for processing (7)
  • internal/orchestrator/committer.go (2 hunks)
  • internal/orchestrator/committer_test.go (13 hunks)
  • internal/orchestrator/orchestrator.go (1 hunks)
  • internal/storage/clickhouse.go (2 hunks)
  • internal/storage/connector.go (2 hunks)
  • test/mocks/MockIMainStorage.go (1 hunks)
  • test/mocks/MockIStagingStorage.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/storage/clickhouse.go
🧰 Additional context used
🧬 Code Graph Analysis (1)
internal/orchestrator/committer.go (2)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
internal/storage/connector.go (1)
  • QueryFilter (11-27)
🔇 Additional comments (1)
test/mocks/MockIStagingStorage.go (1)

239-296: LGTM: mock method addition matches interface and mockery patterns

Implementation and typed helpers are consistent with existing mocks and mockery’s templates. No issues found.

Comment on lines +49 to 50
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Add tests for the stranded-blocks cleanup path and reduce stub duplication.

Right now all tests stub GetBlockNumbersLessThan to return empty, so the new logic (CheckBlocksExist → fetch missing → insert → delete) isn’t exercised. Add a test that:

  • Returns a non-empty set of stranded blocks from staging,
  • Mocks CheckBlocksExist to simulate partial existence in main storage,
  • Verifies Staging.GetStagingData is called once with all missing blocks (batch, not N+1),
  • Verifies Main.InsertBlockData and Staging.DeleteStagingData are called with the correct sets.

Also consider a small helper to DRY the repeated “no stranded blocks” stub across tests.

Also applies to: 74-75, 102-103, 130-131, 155-156, 236-237, 263-264, 299-300, 416-416, 516-517, 562-563


🏁 Script executed:

#!/bin/bash
set -e
# Find all occurrences of GetBlockNumbersLessThan in the committer tests
rg -n "GetBlockNumbersLessThan" internal/orchestrator/committer_test.go

Length of output: 1495


🏁 Script executed:

#!/bin/bash
set -e
# Search for CheckBlocksExist usage in committer tests
rg -n "CheckBlocksExist" internal/orchestrator/committer_test.go

Length of output: 73


Add stranded-blocks cleanup tests and DRY the GetBlockNumbersLessThan stub

Currently, all committer_test.go tests stub

mockStagingStorage.EXPECT().GetBlockNumbersLessThan(...).Return([]*big.Int{}, nil)

so the cleanup path (CheckBlocksExist → GetStagingData → InsertBlockData → DeleteStagingData) never runs. To address this:

  • In internal/orchestrator/committer_test.go, add a test case that:
    • Mocks GetBlockNumbersLessThan to return a non-empty slice of block numbers.
    • Mocks mockMainStorage.CheckBlocksExist to mark some blocks as already present and others missing.
    • Verifies mockStagingStorage.GetStagingData is called exactly once with the full batch of missing blocks (not one per block).
    • Verifies mockMainStorage.InsertBlockData is called for the missing blocks and mockStagingStorage.DeleteStagingData is called for the ones that already existed.

  • Extract a helper (e.g. stubEmptyStrandedBlocks()) to centralize the repeated
    GetBlockNumbersLessThan(...).Return([]*big.Int{}, nil)
    stub across all tests (lines 49, 74, 102, 130, 155, 183, 236, 263, 299, 415, 450, 516, 562).

This will ensure the new cleanup logic is covered and reduce boilerplate.

🤖 Prompt for AI Agents
In internal/orchestrator/committer_test.go around lines 49 to 50, add a new test
case that mocks GetBlockNumbersLessThan to return a non-empty slice of block
numbers, mocks mockMainStorage.CheckBlocksExist to mark some blocks as present
and others missing, and verifies that mockStagingStorage.GetStagingData is
called once with all missing blocks, mockMainStorage.InsertBlockData is called
for missing blocks, and mockStagingStorage.DeleteStagingData is called for
existing blocks. Also, create a helper function (e.g., stubEmptyStrandedBlocks)
to centralize the repeated stub of GetBlockNumbersLessThan returning an empty
slice and nil error, and replace all existing instances of this stub at lines
49, 74, 102, 130, 155, 183, 236, 263, 299, 415, 450, 516, and 562 with calls to
this helper to reduce boilerplate.

Comment on lines +78 to +127
func (c *Committer) cleanupStrandedBlocks() error {
// Get the current max block from main storage
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
if err != nil {
return fmt.Errorf("error getting max block number from main storage: %v", err)
}

if latestCommittedBlockNumber.Sign() == 0 {
// No blocks in main storage yet, nothing to clean up
return nil
}

// Get block numbers from PostgreSQL that are less than latest committed block
psqlBlockNumbers, err := c.storage.StagingStorage.GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
if err != nil {
return fmt.Errorf("error getting block numbers from PostgreSQL: %v", err)
}

if len(psqlBlockNumbers) == 0 {
// No stranded blocks in staging
return nil
}

log.Info().
Int("block_count", len(psqlBlockNumbers)).
Str("min_block", psqlBlockNumbers[0].String()).
Str("max_block", psqlBlockNumbers[len(psqlBlockNumbers)-1].String()).
Msg("Found stranded blocks in staging")

// Process blocks in batches of c.blocksPerCommit, but max 1000 to avoid ClickHouse query limits
batchSize := c.blocksPerCommit
if batchSize > 1000 {
batchSize = 1000
}

for i := 0; i < len(psqlBlockNumbers); i += batchSize {
end := i + batchSize
if end > len(psqlBlockNumbers) {
end = len(psqlBlockNumbers)
}

batchBlockNumbers := psqlBlockNumbers[i:end]

if err := c.processStrandedBlocksBatch(batchBlockNumbers); err != nil {
return fmt.Errorf("error processing stranded blocks batch %d-%d: %v", i, end-1, err)
}
}

return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Sort and guard against nil to make logging/batching safe and deterministic.

  • latestCommittedBlockNumber can be nil; guard before use.
  • Sort psqlBlockNumbers before logging/batching so min/max reflect reality and batches are contiguous.

Apply this diff:

 func (c *Committer) cleanupStrandedBlocks() error {
 	// Get the current max block from main storage
 	latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
-	if err != nil {
+	if err != nil {
 		return fmt.Errorf("error getting max block number from main storage: %v", err)
 	}
 
-	if latestCommittedBlockNumber.Sign() == 0 {
+	if latestCommittedBlockNumber == nil || latestCommittedBlockNumber.Sign() == 0 {
 		// No blocks in main storage yet, nothing to clean up
 		return nil
 	}
 
 	// Get block numbers from PostgreSQL that are less than latest committed block
 	psqlBlockNumbers, err := c.storage.StagingStorage.GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
 	if err != nil {
 		return fmt.Errorf("error getting block numbers from PostgreSQL: %v", err)
 	}
 
 	if len(psqlBlockNumbers) == 0 {
 		// No stranded blocks in staging
 		return nil
 	}
 
+	// Ensure deterministic ordering for logging and batching
+	sort.Slice(psqlBlockNumbers, func(i, j int) bool {
+		return psqlBlockNumbers[i].Cmp(psqlBlockNumbers[j]) < 0
+	})
+
 	log.Info().
 		Int("block_count", len(psqlBlockNumbers)).
 		Str("min_block", psqlBlockNumbers[0].String()).
 		Str("max_block", psqlBlockNumbers[len(psqlBlockNumbers)-1].String()).
 		Msg("Found stranded blocks in staging")
🤖 Prompt for AI Agents
In internal/orchestrator/committer.go lines 78 to 127,
latestCommittedBlockNumber should be checked for nil before calling Sign() to
avoid nil pointer dereference. Also, sort the psqlBlockNumbers slice before
logging and batching to ensure min and max block numbers are accurate and
batches are processed in order. Add a nil check for latestCommittedBlockNumber
and use a sorting function on psqlBlockNumbers before the log.Info() call and
batch processing loop.

Comment on lines +146 to +173
// Get block data from PostgreSQL for blocks that don't exist in ClickHouse
var blocksToCommit []common.BlockData
for _, blockNum := range blockNumbers {
if !existsInClickHouse[blockNum.String()] {
data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
BlockNumbers: []*big.Int{blockNum},
ChainId: c.rpc.GetChainID(),
})
if err != nil {
return fmt.Errorf("error getting block data from PostgreSQL: %v", err)
}
if len(data) > 0 {
blocksToCommit = append(blocksToCommit, data[0])
}
}
}

// Insert blocks into ClickHouse
if len(blocksToCommit) > 0 {
log.Info().
Int("block_count", len(blocksToCommit)).
Str("min_block", blocksToCommit[0].Block.Number.String()).
Str("max_block", blocksToCommit[len(blocksToCommit)-1].Block.Number.String()).
Msg("Committing stranded blocks to ClickHouse")

if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil {
return fmt.Errorf("error inserting blocks into ClickHouse: %v", err)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix N+1 staging queries and delete only blocks that exist or were inserted.

Batch fetch missing blocks in one call and restrict deletion to blocks that either existed in ClickHouse or were successfully inserted just now. This avoids heavy N+1 patterns and reduces risk of data loss on partial failures.

Apply this diff:

-	// Get block data from PostgreSQL for blocks that don't exist in ClickHouse
-	var blocksToCommit []common.BlockData
-	for _, blockNum := range blockNumbers {
-		if !existsInClickHouse[blockNum.String()] {
-			data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
-				BlockNumbers: []*big.Int{blockNum},
-				ChainId:      c.rpc.GetChainID(),
-			})
-			if err != nil {
-				return fmt.Errorf("error getting block data from PostgreSQL: %v", err)
-			}
-			if len(data) > 0 {
-				blocksToCommit = append(blocksToCommit, data[0])
-			}
-		}
-	}
+	// Collect all missing blocks and fetch them in a single query
+	var missingBlockNumbers []*big.Int
+	for _, bn := range blockNumbers {
+		if !existsInClickHouse[bn.String()] {
+			missingBlockNumbers = append(missingBlockNumbers, bn)
+		}
+	}
+
+	var blocksToCommit []common.BlockData
+	if len(missingBlockNumbers) > 0 {
+		data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
+			BlockNumbers: missingBlockNumbers,
+			ChainId:      c.rpc.GetChainID(),
+		})
+		if err != nil {
+			return fmt.Errorf("error getting block data from PostgreSQL: %v", err)
+		}
+		blocksToCommit = data
+	}
 
 	// Insert blocks into ClickHouse
 	if len(blocksToCommit) > 0 {
 		log.Info().
 			Int("block_count", len(blocksToCommit)).
 			Str("min_block", blocksToCommit[0].Block.Number.String()).
 			Str("max_block", blocksToCommit[len(blocksToCommit)-1].Block.Number.String()).
 			Msg("Committing stranded blocks to ClickHouse")
 
 		if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil {
 			return fmt.Errorf("error inserting blocks into ClickHouse: %v", err)
 		}
 	}
 
-	// Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not)
-	var blocksToDelete []common.BlockData
-	for _, blockNum := range blockNumbers {
-		blocksToDelete = append(blocksToDelete, common.BlockData{
-			Block: common.Block{
-				ChainId: c.rpc.GetChainID(),
-				Number:  blockNum,
-			},
-		})
-	}
+	// Delete only blocks that exist in ClickHouse or were just inserted
+	inserted := make(map[string]struct{}, len(blocksToCommit))
+	for _, bd := range blocksToCommit {
+		inserted[bd.Block.Number.String()] = struct{}{}
+	}
+	var blocksToDelete []common.BlockData
+	for _, bn := range blockNumbers {
+		if existsInClickHouse[bn.String()] {
+			blocksToDelete = append(blocksToDelete, common.BlockData{Block: common.Block{ChainId: c.rpc.GetChainID(), Number: bn}})
+			continue
+		}
+		if _, ok := inserted[bn.String()]; ok {
+			blocksToDelete = append(blocksToDelete, common.BlockData{Block: common.Block{ChainId: c.rpc.GetChainID(), Number: bn}})
+		}
+	}
 
 	if len(blocksToDelete) > 0 {
 		log.Info().
 			Int("block_count", len(blocksToDelete)).
 			Str("min_block", blocksToDelete[0].Block.Number.String()).
 			Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()).
 			Msg("Deleting stranded blocks from PostgreSQL")
 
 		if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil {
 			return fmt.Errorf("error deleting blocks from PostgreSQL: %v", err)
 		}
 	}

Also applies to: 176-197

🤖 Prompt for AI Agents
In internal/orchestrator/committer.go from lines 146 to 173 and also lines 176
to 197, the current code performs N+1 queries to fetch staging data for each
missing block individually and deletes blocks without verifying their existence
or successful insertion. To fix this, modify the code to batch fetch all missing
blocks in a single query instead of looping with individual calls. Then, when
deleting blocks, restrict the deletion only to those blocks that either already
existed in ClickHouse or were successfully inserted in the current operation.
This will eliminate the N+1 query pattern and prevent accidental deletion of
blocks that were not inserted due to partial failures.

@nischitpra nischitpra closed this Aug 14, 2025
@nischitpra nischitpra deleted the np/cleanup_staging_on_boot branch October 8, 2025 15:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants