Skip to content

Conversation

@nischitpra
Copy link
Collaborator

@nischitpra nischitpra commented Aug 15, 2025

New envs

STORAGE_STAGING_POSTGRES_HOST=localhost
STORAGE_STAGING_POSTGRES_USERNAME=admin
STORAGE_STAGING_POSTGRES_PASSWORD=password
STORAGE_STAGING_POSTGRES_DATABASE=insight
STORAGE_STAGING_POSTGRES_PORT=5432
NEWKAFKA_BROKERS=localhost:9092
NEWKAFKA_USERNAME=
NEWKAFKA_PASSWORD=

Summary by CodeRabbit

  • New Features

    • Migration now publishes block data to Kafka and tracks progress in Postgres for resumable runs.
    • Added CLI flags to configure secondary Kafka (brokers, username, password).
    • Introduced new config section for Kafka credentials.
    • Enhanced full-block retrieval with optional consistency and trace inclusion.
  • Documentation

    • Added sample kafka_config.yml with comprehensive settings.
  • Chores

    • Added database migration to create a migrated block tracking table and index.
    • Added low-level Postgres helpers for raw queries.

@zeet-co
Copy link

zeet-co bot commented Aug 15, 2025

We're building your pull request over on Zeet.
Click me for more info about your build and deployment.
Once built, this branch can be tested at: https://abstract-testnet-indexer-hme8-np--ec9865.insight-prod-gke.zeet.app before merging 😉

@coderabbitai
Copy link

coderabbitai bot commented Aug 15, 2025

Walkthrough

The migrator now publishes block data to Kafka and tracks migrated block ranges in PostgreSQL. Configuration adds a newKafka section and root flags for brokers/credentials. ClickHouse gains an option to fetch full block data with/without FINAL and includes traces. Postgres exposes raw SQL wrappers. A new SQL migration creates the migrated_block_ranges table.

Changes

Cohort / File(s) Summary
Migrator pipeline refactor
cmd/migrate_valid.go
Switches from direct storage writes to Kafka publishing. Adds Postgres-backed migrated_block_ranges tracking with upsert/query helpers. Introduces methods UpdateMigratedBlock and GetMaxBlockNumberInRange. Computes absolute start/end and adjusts migration boundaries using tracker. Manages Kafka and Postgres lifecycles.
CLI flags and config wiring
cmd/root.go, configs/config.go
Adds persistent flags newKafka-brokers/username/password and binds them to viper. Introduces NewKafkaConfig and adds Config.NewKafka field.
New Kafka publisher module
internal/publisher/newkafka/publisherNewKafka.go
Adds Franz kgo-based singleton publisher. Provides PublishBlockData and Close. Defines generic PublishableMessage. Configures brokers/auth/TLS, publishes to topic block_data with composite keys, and records metrics.
Storage enhancements
internal/storage/clickhouse.go, internal/storage/postgres.go
ClickHouse: adds GetFullBlockDataWithOptions(forceConsistentData), includes Traces in aggregation, parameterizes FINAL usage; existing GetFullBlockData delegates with forceConsistentData=true. Postgres: adds ExecRaw, QueryRaw, QueryRowRaw passthroughs.
DB migration
internal/tools/postgres/0011_postgres_create_migrated_block_ranges_table.sql
Creates migrated_block_ranges table, index on (chain_id, block_number DESC), and updated_at trigger.
Example config
configs/kafka_config.yml
Adds comprehensive YAML with rpc, logging, storage, api, publisher toggle, newKafka brokers/creds, validation, and workMode settings.

Sequence Diagram(s)

sequenceDiagram
  participant M as Migrator
  participant CH as ClickHouse
  participant NK as NewKafka Publisher
  participant KB as Kafka Broker
  participant PG as Postgres (Tracker)

  M->>CH: GetFullBlockData(chainId, blocks)
  CH-->>M: []BlockData (with logs/txs/traces)

  M->>NK: PublishBlockData(blockData)
  NK->>KB: Produce block_data records
  KB-->>NK: Acks
  NK-->>M: Publish result

  M->>PG: Upsert migrated_block_ranges (UpdateMigratedBlock)
  PG-->>M: Upsert OK/err
Loading
sequenceDiagram
  participant M as Migrator
  participant RPC as RPC/Main Storage
  participant PG as Postgres (Tracker)

  M->>RPC: Compute absStart/absEnd (chain head etc.)
  RPC-->>M: absStartBlock, absEndBlock

  M->>PG: GetMaxBlockNumberInRange(absStart, absEnd)
  PG-->>M: maxMigrated or absStart-1

  M-->>M: DetermineMigrationBoundaries(start = max+1, end <= absEnd or abort if complete)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch np/backfill_update

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🔭 Outside diff range comments (3)
internal/storage/postgres.go (2)

350-356: Critical: Wrong table used in subquery; DELETE may remove arbitrary rows

The DELETE from block_data uses ctid from block_failures. ctid values are table-local; comparing across tables is invalid and can delete the wrong rows.

Apply this diff:

-	query := fmt.Sprintf(`DELETE FROM block_data
-	WHERE ctid IN (
-		SELECT ctid
-		FROM block_failures
-		WHERE (chain_id, block_number) IN (%s)
-		FOR UPDATE SKIP LOCKED
-	)`, strings.Join(tuples, ","))
+	query := fmt.Sprintf(`DELETE FROM block_data
+	WHERE ctid IN (
+		SELECT ctid
+		FROM block_data
+		WHERE (chain_id, block_number) IN (%s)
+		FOR UPDATE SKIP LOCKED
+	)`, strings.Join(tuples, ","))

62-81: Missing WHERE before AND clauses in GetBlockFailures

The query begins without a WHERE, but later appends AND clauses, yielding invalid SQL.

Apply this diff:

-	query := `SELECT chain_id, block_number, last_error_timestamp, failure_count, reason 
-	          FROM block_failures`
+	query := `SELECT chain_id, block_number, last_error_timestamp, failure_count, reason 
+	          FROM block_failures WHERE 1=1`
configs/config.go (1)

288-291: Bug: Orchestrator chainBasedConfig assigned to Main storage

This mistakenly overrides Main clickhouse config with orchestrator settings.

Apply this diff:

-	if Cfg.Storage.Main.Clickhouse != nil {
-		Cfg.Storage.Main.Clickhouse.ChainBasedConfig = orchestratorChainConfig
-	}
+	if Cfg.Storage.Orchestrator.Clickhouse != nil {
+		Cfg.Storage.Orchestrator.Clickhouse.ChainBasedConfig = orchestratorChainConfig
+	}
🧹 Nitpick comments (11)
internal/storage/postgres.go (1)

451-465: Raw SQL wrappers look fine; consider context-aware variants and annotate internal-only use

These wrappers are straightforward, but they don't allow cancellation/timeouts and could be misused with untrusted input.

Add context variants and a brief comment about internal-only usage:

// ExecRawContext executes a raw SQL statement with context
func (p *PostgresConnector) ExecRawContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
	return p.db.ExecContext(ctx, query, args...)
}

// QueryRawContext executes a raw SQL query with context
func (p *PostgresConnector) QueryRawContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
	return p.db.QueryContext(ctx, query, args...)
}

// QueryRowRawContext executes a raw SQL query with context and returns a single row
func (p *PostgresConnector) QueryRowRawContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
	return p.db.QueryRowContext(ctx, query, args...)
}
internal/storage/clickhouse.go (1)

984-991: Potential trailing comma in IN clause assembly

getBlockNumbersStringArray appends a trailing comma, yielding "IN (1,2,3,)" which is invalid in SQL dialects (incl. ClickHouse). If this function is used as-is, queries may fail.

Apply this safer version:

-func getBlockNumbersStringArray(blockNumbers []*big.Int) string {
-	blockNumbersString := ""
-	for _, blockNumber := range blockNumbers {
-		blockNumbersString += fmt.Sprintf("%s,", blockNumber.String())
-	}
-	return blockNumbersString
-}
+func getBlockNumbersStringArray(blockNumbers []*big.Int) string {
+	parts := make([]string, len(blockNumbers))
+	for i, bn := range blockNumbers {
+		parts[i] = bn.String()
+	}
+	return strings.Join(parts, ",")
+}
internal/tools/postgres/0011_postgres_create_migrated_block_ranges_table.sql (1)

8-9: Redundant index once PRIMARY KEY is added

A PRIMARY KEY on (chain_id, block_number) creates a btree index that supports DESC scans; this extra index is unnecessary.

Apply this diff to drop it:

- -- Create index for efficient querying by chain_id and block ranges
- CREATE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_block ON migrated_block_ranges(chain_id, block_number DESC);
+-- (Removed) Redundant index; primary key covers lookups and sorted scans.
configs/kafka_config.yml (2)

34-34: Fix YAML trailing whitespace (yamllint errors).

YAMLlint flagged trailing spaces. Trim them to keep CI green.

-  
+
...
-  
+
...
-  mode: minimal 
+  mode: minimal

Also applies to: 47-47, 78-78


71-76: Make Kafka security explicit; avoid committing plaintext credentials.

  • Consider adding explicit TLS and SASL mechanism options (e.g., TLS enable flag, CA/cert paths, SASL mechanism) rather than inferring TLS from presence of username/password. This will prevent misconfigurations against clusters using SASL/PLAIN over PLAINTEXT or SASL/SCRAM.
  • Ensure any non-empty username/password are sourced from env/secret manager and not committed in configs.
  • For production, avoid debug log level and default ClickHouse disableTLS.

Do you want a patch to extend the config schema (and wiring) with:

  • newKafka.tls.enabled (bool), newKafka.tls.insecureSkipVerify (bool)
  • newKafka.sasl.mechanism (e.g., "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512")
  • newKafka.topic (override "block_data")?
internal/publisher/newkafka/publisherNewKafka.go (3)

116-146: Make LastPublishedBlock metric robust to unsorted input.

blocksToInsert is constructed from a map in the migrator; iteration order is random. Using the last element to set LastPublishedBlock yields a misleading gauge. Track the max block number instead.

-  // Prepare messages for blocks, events, transactions and traces
-  blockdataMessages := make([]*kgo.Record, len(blockData))
+  // Prepare messages for blocks, events, transactions and traces
+  blockdataMessages := make([]*kgo.Record, len(blockData))
+  var maxBlockNum int64
   ...
   for i, data := range blockData {
     msgJson, err := json.Marshal(data)
     if err != nil {
       return fmt.Errorf("failed to marshal block data: %v", err)
     }
     blockdataMessages[i] = &kgo.Record{
       Topic: "block_data",
       Key:   []byte(fmt.Sprintf("block-%s-%s-%s", status, data.Block.ChainId.String(), data.Block.Hash)),
       Value: msgJson,
     }
+    if bn := data.Block.Number.Int64(); bn > maxBlockNum {
+      maxBlockNum = bn
+    }
   }
   ...
-  metrics.LastPublishedBlock.Set(float64(blockData[len(blockData)-1].Block.Number.Int64()))
+  metrics.LastPublishedBlock.Set(float64(maxBlockNum))

69-76: Separate TLS configuration from SASL; avoid implicit TLS coupling to credentials.

Currently TLS is enabled only when username/password are set. Some clusters use SASL without TLS; others require TLS without SASL. Make TLS and SASL independently configurable via config.

I can wire new config switches (tls.enabled, tls.insecureSkipVerify, sasl.mechanism) and corresponding franz-go options if you want.


31-34: Remove or use PublishableMessage generic type.

This type is unused. Either use it for topic payloads or delete to reduce clutter.

-type PublishableMessage[T common.BlockModel | common.TransactionModel | common.LogModel | common.TraceModel] struct {
-  Data   T      `json:"data"`
-  Status string `json:"status"`
-}
cmd/migrate_valid.go (3)

78-82: Sort blocks before publish to ensure deterministic ordering and metrics.

blocksToInsert is built from a map; order is random. Sort by block number to get predictability and monotonic metrics.

   blocksToInsert := make([]common.BlockData, 0)
   for _, blockData := range blocksToInsertMap {
     blocksToInsert = append(blocksToInsert, blockData)
   }
+  sort.Slice(blocksToInsert, func(i, j int) bool {
+    return blocksToInsert[i].Block.Number.Cmp(blocksToInsert[j].Block.Number) < 0
+  })

Add import:

// in the import block
import (
  // ...
  "sort"
)

229-229: Fix big.Int formatting in logs.

Using %d with *big.Int results in "%!d(*big.Int=...)" in logs.

- log.Info().Msgf("Latest block in target storage: %d", latestMigratedBlock)
+ log.Info().Msgf("Latest block in target storage: %s", latestMigratedBlock.String())

249-249: Fix big.Int formatting in logs.

Same concern here.

- log.Info().Msgf("Latest block in main storage: %d", latestBlockStored)
+ log.Info().Msgf("Latest block in main storage: %s", latestBlockStored.String())
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 9ca9e74 and 8768acd.

📒 Files selected for processing (8)
  • cmd/migrate_valid.go (7 hunks)
  • cmd/root.go (2 hunks)
  • configs/config.go (2 hunks)
  • configs/kafka_config.yml (1 hunks)
  • internal/publisher/newkafka/publisherNewKafka.go (1 hunks)
  • internal/storage/clickhouse.go (5 hunks)
  • internal/storage/postgres.go (1 hunks)
  • internal/tools/postgres/0011_postgres_create_migrated_block_ranges_table.sql (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (4)
configs/config.go (1)
internal/publisher/newkafka/publisherNewKafka.go (1)
  • NewKafka (21-24)
internal/publisher/newkafka/publisherNewKafka.go (6)
internal/common/block.go (3)
  • BlockModel (36-59)
  • BlockData (61-66)
  • Block (8-33)
internal/common/transaction.go (1)
  • TransactionModel (65-98)
internal/common/log.go (1)
  • LogModel (47-58)
internal/common/trace.go (1)
  • TraceModel (36-58)
configs/config.go (1)
  • Cfg (215-215)
internal/metrics/metrics.go (4)
  • PublishDuration (123-127)
  • PublisherBlockCounter (93-96)
  • LastPublishedBlock (103-106)
  • PublisherReorgedBlockCounter (98-101)
internal/storage/clickhouse.go (1)
internal/common/block.go (1)
  • BlockData (61-66)
cmd/migrate_valid.go (4)
internal/publisher/newkafka/publisherNewKafka.go (2)
  • NewKafka (21-24)
  • GetInstance (37-45)
internal/storage/postgres.go (2)
  • PostgresConnector (17-20)
  • NewPostgresConnector (22-58)
configs/config.go (1)
  • Cfg (215-215)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
🪛 YAMLlint (1.37.1)
configs/kafka_config.yml

[error] 34-34: trailing spaces

(trailing-spaces)


[error] 47-47: trailing spaces

(trailing-spaces)


[error] 78-78: trailing spaces

(trailing-spaces)

🔇 Additional comments (7)
cmd/root.go (2)

152-154: New Kafka flags: LGTM

Flags are clearly named and scoped; no concerns. Ensure secrets are not logged inadvertently elsewhere.


271-273: Viper bindings for newKafka: LGTM

Bindings match the new config keys and should integrate cleanly with viper/env.

configs/config.go (2)

185-191: NewKafkaConfig addition: LGTM

Struct and mapstructure tags align with the introduced flags and config file.


210-211: Config.NewKafka wiring: LGTM

Top-level wiring is consistent with the new flags and YAML.

internal/tools/postgres/0011_postgres_create_migrated_block_ranges_table.sql (1)

11-13: Trigger function verified—no action needed

The update_updated_at_column() function is defined in internal/tools/postgres/postgres_schema.sql (around line 36), so it exists before this migration’s trigger.

internal/publisher/newkafka/publisherNewKafka.go (1)

64-66: UniformBytesPartitioner API is correct

UniformBytesPartitioner is exposed by franz-go (v1.6.0+) and your usage

kgo.RecordPartitioner(kgo.UniformBytesPartitioner(1_000_000, false, false, nil)),

is valid. Choose the partitioner that matches your goals:

Per-key ordering (preserve key affinity):

kgo.RecordPartitioner(kgo.StickyKeyPartitioner(nil))

(uses Kafka-compatible hashing; guarantees all messages with the same key go to the same partition)

Ignore key affinity (even distribution):

kgo.RecordPartitioner(kgo.RoundRobinPartitioner())

(cycles through partitions evenly, regardless of key)

Uniform sticky behavior (batch-size-based distribution with optional key-affinity):

// keys=false: ignore keys entirely
kgo.RecordPartitioner(kgo.UniformBytesPartitioner(1_000_000, adaptive, false, nil))

// keys=true: respect key hashing for non-nil keys
kgo.RecordPartitioner(kgo.UniformBytesPartitioner(1_000_000, adaptive, true, nil))

Let me know which partitioning guarantees you need so we can lock in the safest default.

cmd/migrate_valid.go (1)

83-87: Migration advancement hides Kafka publish failures

The call in cmd/migrate_valid.go (lines 83–87) relies on PublishBlockData returning an error, but:

  • In internal/publisher/newkafka/publisherNewKafka.go, publishMessages logs failures in its callback yet always returns nil.
  • Consequently, err := migrator.newkafka.PublishBlockData(…) can never be non-nil and migration will advance even if Kafka publishes fail.

After you update publishMessages to collect and return errors (e.g., aggregate callback errors and return a non-nil error), this log.Fatal path will correctly stop the migration. No changes needed in migrate_valid.go itself.

• Fix point: publishMessages in internal/publisher/newkafka/publisherNewKafka.go
• Then re-run the migration to verify that publish failures prevent range advancement as intended.

Comment on lines +177 to 214
// createMigratedBlockRangesTable creates the migrated_block_ranges table if it doesn't exist
func createMigratedBlockRangesTable(psql *storage.PostgresConnector) {
createTableSQL := `
CREATE TABLE IF NOT EXISTS migrated_block_ranges (
chain_id BIGINT NOT NULL,
block_number BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05)
`

// Execute the CREATE TABLE statement
_, err := psql.ExecRaw(createTableSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create migrated_block_ranges table")
}

// Create index if it doesn't exist
createIndexSQL := `
CREATE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_block
ON migrated_block_ranges(chain_id, block_number DESC)
`
_, err = psql.ExecRaw(createIndexSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create index on migrated_block_ranges table")
}

// Create trigger if it doesn't exist
createTriggerSQL := `
CREATE TRIGGER update_migrated_block_ranges_updated_at
BEFORE UPDATE ON migrated_block_ranges
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column()
`
_, err = psql.ExecRaw(createTriggerSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create trigger on migrated_block_ranges table")
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Create the trigger function and a unique index to support an upsert strategy.

  • The trigger references update_updated_at_column(), which is not created here. Create it before the trigger.
  • Add a unique index on chain_id to enable an efficient ON CONFLICT upsert pattern (recommended below).
 func createMigratedBlockRangesTable(psql *storage.PostgresConnector) {
   createTableSQL := `
       CREATE TABLE IF NOT EXISTS migrated_block_ranges (
         chain_id BIGINT NOT NULL,
         block_number BIGINT NOT NULL,
         created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
         updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
       ) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05)
   `
   // Execute the CREATE TABLE statement
   _, err := psql.ExecRaw(createTableSQL)
   if err != nil {
     log.Warn().Err(err).Msg("Failed to create migrated_block_ranges table")
   }
 
+  // Ensure a unique index on chain_id to allow ON CONFLICT upsert
+  createUniqueIndexSQL := `
+    CREATE UNIQUE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_unique 
+    ON migrated_block_ranges(chain_id)
+  `
+  _, err = psql.ExecRaw(createUniqueIndexSQL)
+  if err != nil {
+    log.Warn().Err(err).Msg("Failed to create unique index on migrated_block_ranges(chain_id)")
+  }
+
   // Create index if it doesn't exist
   createIndexSQL := `
     CREATE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_block 
     ON migrated_block_ranges(chain_id, block_number DESC)
   `
   _, err = psql.ExecRaw(createIndexSQL)
   if err != nil {
     log.Warn().Err(err).Msg("Failed to create index on migrated_block_ranges table")
   }
 
-  // Create trigger if it doesn't exist
+  // Create/replace trigger function, then create trigger if it doesn't exist
+  createFunctionSQL := `
+    CREATE OR REPLACE FUNCTION update_updated_at_column()
+    RETURNS trigger AS $$
+    BEGIN
+      NEW.updated_at = CURRENT_TIMESTAMP;
+      RETURN NEW;
+    END;
+    $$ LANGUAGE plpgsql;
+  `
+  _, err = psql.ExecRaw(createFunctionSQL)
+  if err != nil {
+    log.Warn().Err(err).Msg("Failed to create trigger function update_updated_at_column()")
+  }
+
   createTriggerSQL := `
     CREATE TRIGGER update_migrated_block_ranges_updated_at 
     BEFORE UPDATE ON migrated_block_ranges 
     FOR EACH ROW EXECUTE FUNCTION update_updated_at_column()
   `
   _, err = psql.ExecRaw(createTriggerSQL)
   if err != nil {
     log.Warn().Err(err).Msg("Failed to create trigger on migrated_block_ranges table")
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// createMigratedBlockRangesTable creates the migrated_block_ranges table if it doesn't exist
func createMigratedBlockRangesTable(psql *storage.PostgresConnector) {
createTableSQL := `
CREATE TABLE IF NOT EXISTS migrated_block_ranges (
chain_id BIGINT NOT NULL,
block_number BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05)
`
// Execute the CREATE TABLE statement
_, err := psql.ExecRaw(createTableSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create migrated_block_ranges table")
}
// Create index if it doesn't exist
createIndexSQL := `
CREATE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_block
ON migrated_block_ranges(chain_id, block_number DESC)
`
_, err = psql.ExecRaw(createIndexSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create index on migrated_block_ranges table")
}
// Create trigger if it doesn't exist
createTriggerSQL := `
CREATE TRIGGER update_migrated_block_ranges_updated_at
BEFORE UPDATE ON migrated_block_ranges
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column()
`
_, err = psql.ExecRaw(createTriggerSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create trigger on migrated_block_ranges table")
}
}
func createMigratedBlockRangesTable(psql *storage.PostgresConnector) {
createTableSQL := `
CREATE TABLE IF NOT EXISTS migrated_block_ranges (
chain_id BIGINT NOT NULL,
block_number BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05)
`
// Execute the CREATE TABLE statement
_, err := psql.ExecRaw(createTableSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create migrated_block_ranges table")
}
// Ensure a unique index on chain_id to allow ON CONFLICT upsert
createUniqueIndexSQL := `
CREATE UNIQUE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_unique
ON migrated_block_ranges(chain_id)
`
_, err = psql.ExecRaw(createUniqueIndexSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create unique index on migrated_block_ranges(chain_id)")
}
// Create index if it doesn't exist
createIndexSQL := `
CREATE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_block
ON migrated_block_ranges(chain_id, block_number DESC)
`
_, err = psql.ExecRaw(createIndexSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create index on migrated_block_ranges table")
}
// Create/replace trigger function, then create trigger if it doesn't exist
createFunctionSQL := `
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS trigger AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
`
_, err = psql.ExecRaw(createFunctionSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create trigger function update_updated_at_column()")
}
createTriggerSQL := `
CREATE TRIGGER update_migrated_block_ranges_updated_at
BEFORE UPDATE ON migrated_block_ranges
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column()
`
_, err = psql.ExecRaw(createTriggerSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create trigger on migrated_block_ranges table")
}
}
🤖 Prompt for AI Agents
In cmd/migrate_valid.go around lines 177 to 214, the SQL creates a trigger that
calls update_updated_at_column() but never defines that function and it also
lacks a unique index to support ON CONFLICT upserts; add a CREATE FUNCTION IF
NOT EXISTS update_updated_at_column() that sets NEW.updated_at =
CURRENT_TIMESTAMP and returns NEW before creating the trigger, and create a
unique index (e.g., UNIQUE INDEX IF NOT EXISTS ux_migrated_block_ranges_chain ON
migrated_block_ranges(chain_id)) so you can use ON CONFLICT (chain_id) DO UPDATE
for upserts; ensure the function is created before the trigger and use IF NOT
EXISTS on both function and index to make the migration idempotent.

Comment on lines +317 to +351
// upsertMigratedBlockRange upserts a row for the given chain_id and block range
func (m *Migrator) upsertMigratedBlockRange(chainID, blockNumber, startBlock, endBlock *big.Int) error {
// First, try to update existing rows that overlap with this range
updateSQL := `
UPDATE migrated_block_ranges
SET block_number = $1, updated_at = CURRENT_TIMESTAMP
WHERE chain_id = $2 AND block_number >= $3 AND block_number <= $4
`

result, err := m.psql.ExecRaw(updateSQL, blockNumber.String(), chainID.String(), startBlock.String(), endBlock.String())
if err != nil {
return fmt.Errorf("failed to update migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
}

return startBlock, endBlock
// Check if any rows were updated
rowsAffected, err := result.RowsAffected()
if err != nil {
log.Warn().Err(err).Msgf("Failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
return fmt.Errorf("failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
}

// If no rows were updated, insert a new row
if rowsAffected == 0 {
insertSQL := `
INSERT INTO migrated_block_ranges (chain_id, block_number, created_at, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
`

_, err := m.psql.ExecRaw(insertSQL, chainID.String(), blockNumber.String())
if err != nil {
return fmt.Errorf("failed to insert migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
}
}
return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace multi-row UPDATE + conditional INSERT with an idempotent UPSERT.

The current UPDATE changes all rows in [startBlock,endBlock] to the same block_number and may create duplicates over time. A single-row “latest progress per chain” model is simpler, faster, and avoids bloat.

 func (m *Migrator) upsertMigratedBlockRange(chainID, blockNumber, startBlock, endBlock *big.Int) error {
-  // First, try to update existing rows that overlap with this range
-  updateSQL := `
-    UPDATE migrated_block_ranges 
-    SET block_number = $1, updated_at = CURRENT_TIMESTAMP
-    WHERE chain_id = $2 AND block_number >= $3 AND block_number <= $4
-  `
-  result, err := m.psql.ExecRaw(updateSQL, blockNumber.String(), chainID.String(), startBlock.String(), endBlock.String())
-  if err != nil {
-    return fmt.Errorf("failed to update migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
-  }
-  // Check if any rows were updated
-  rowsAffected, err := result.RowsAffected()
-  if err != nil {
-    log.Warn().Err(err).Msgf("Failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
-    return fmt.Errorf("failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
-  }
-  // If no rows were updated, insert a new row
-  if rowsAffected == 0 {
-    insertSQL := `
-      INSERT INTO migrated_block_ranges (chain_id, block_number, created_at, updated_at)
-      VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
-    `
-    _, err := m.psql.ExecRaw(insertSQL, chainID.String(), blockNumber.String())
-    if err != nil {
-      return fmt.Errorf("failed to insert migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
-    }
-  }
-  return nil
+  upsertSQL := `
+    INSERT INTO migrated_block_ranges (chain_id, block_number, created_at, updated_at)
+    VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
+    ON CONFLICT (chain_id) DO UPDATE
+    SET block_number = GREATEST(EXCLUDED.block_number, migrated_block_ranges.block_number),
+        updated_at = CURRENT_TIMESTAMP
+  `
+  _, err := m.psql.ExecRaw(upsertSQL, chainID.String(), blockNumber.String())
+  if err != nil {
+    return fmt.Errorf(
+      "failed to upsert migrated block range for chain %s to block %s (abs range %s-%s): %w",
+      chainID.String(), blockNumber.String(), startBlock.String(), endBlock.String(), err,
+    )
+  }
+  return nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// upsertMigratedBlockRange upserts a row for the given chain_id and block range
func (m *Migrator) upsertMigratedBlockRange(chainID, blockNumber, startBlock, endBlock *big.Int) error {
// First, try to update existing rows that overlap with this range
updateSQL := `
UPDATE migrated_block_ranges
SET block_number = $1, updated_at = CURRENT_TIMESTAMP
WHERE chain_id = $2 AND block_number >= $3 AND block_number <= $4
`
result, err := m.psql.ExecRaw(updateSQL, blockNumber.String(), chainID.String(), startBlock.String(), endBlock.String())
if err != nil {
return fmt.Errorf("failed to update migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
}
return startBlock, endBlock
// Check if any rows were updated
rowsAffected, err := result.RowsAffected()
if err != nil {
log.Warn().Err(err).Msgf("Failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
return fmt.Errorf("failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
}
// If no rows were updated, insert a new row
if rowsAffected == 0 {
insertSQL := `
INSERT INTO migrated_block_ranges (chain_id, block_number, created_at, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
`
_, err := m.psql.ExecRaw(insertSQL, chainID.String(), blockNumber.String())
if err != nil {
return fmt.Errorf("failed to insert migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
}
}
return nil
}
func (m *Migrator) upsertMigratedBlockRange(chainID, blockNumber, startBlock, endBlock *big.Int) error {
upsertSQL := `
INSERT INTO migrated_block_ranges (chain_id, block_number, created_at, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (chain_id) DO UPDATE
SET block_number = GREATEST(EXCLUDED.block_number, migrated_block_ranges.block_number),
updated_at = CURRENT_TIMESTAMP
`
_, err := m.psql.ExecRaw(upsertSQL, chainID.String(), blockNumber.String())
if err != nil {
return fmt.Errorf(
"failed to upsert migrated block range for chain %s to block %s (abs range %s-%s): %w",
chainID.String(), blockNumber.String(), startBlock.String(), endBlock.String(), err,
)
}
return nil
}
🤖 Prompt for AI Agents
In cmd/migrate_valid.go around lines 317 to 351, the current logic runs a
multi-row UPDATE across the range and conditionally INSERTs, which can bloat
rows and produce duplicates; replace this with a single idempotent UPSERT that
maintains one latest-progress row per chain. Create/use a unique constraint on
chain_id (if not present) and perform an INSERT ... ON CONFLICT (chain_id) DO
UPDATE that sets block_number = EXCLUDED.block_number and updated_at =
CURRENT_TIMESTAMP (and created_at only on insert), pass chainID and blockNumber
only, and propagate any errors from the Exec call with descriptive messages;
remove the range-based UPDATE/conditional INSERT logic so the operation is a
single atomic upsert per chain.

Comment on lines +152 to +178
func (p *NewKafka) publishMessages(ctx context.Context, messages []*kgo.Record) error {
if len(messages) == 0 {
return nil
}

p.mu.RLock()
defer p.mu.RUnlock()

if p.client == nil {
return nil // Skip if no client configured
}

var wg sync.WaitGroup
wg.Add(len(messages))
// Publish to all configured producers
for _, msg := range messages {
p.client.Produce(ctx, msg, func(_ *kgo.Record, err error) {
defer wg.Done()
if err != nil {
log.Error().Err(err).Msg("Failed to publish message to Kafka")
}
})
}
wg.Wait()

return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Do not swallow Kafka publish errors; migration may advance despite failed publishes.

publishMessages logs per-message failures but always returns nil, so callers cannot detect publish failures. This can falsely mark block ranges as migrated.

Apply this diff to collect and return publish errors, and to avoid holding the RLock during the entire wait:

 func (p *NewKafka) publishMessages(ctx context.Context, messages []*kgo.Record) error {
   if len(messages) == 0 {
     return nil
   }
 
-  p.mu.RLock()
-  defer p.mu.RUnlock()
-
-  if p.client == nil {
+  p.mu.RLock()
+  client := p.client
+  p.mu.RUnlock()
+
+  if client == nil {
     return nil // Skip if no client configured
   }
 
-  var wg sync.WaitGroup
-  wg.Add(len(messages))
-  // Publish to all configured producers
-  for _, msg := range messages {
-    p.client.Produce(ctx, msg, func(_ *kgo.Record, err error) {
-      defer wg.Done()
-      if err != nil {
-        log.Error().Err(err).Msg("Failed to publish message to Kafka")
-      }
-    })
-  }
-  wg.Wait()
-
-  return nil
+  // Ensure we don't wait forever if the broker never acks
+  if _, hasDeadline := ctx.Deadline(); !hasDeadline {
+    var cancel context.CancelFunc
+    ctx, cancel = context.WithTimeout(ctx, 30*time.Second)
+    defer cancel()
+  }
+
+  errCh := make(chan error, len(messages))
+  var wg sync.WaitGroup
+  wg.Add(len(messages))
+  for _, msg := range messages {
+    client.Produce(ctx, msg, func(_ *kgo.Record, err error) {
+      if err != nil {
+        errCh <- err
+        log.Error().Err(err).Msg("Failed to publish message to Kafka")
+      }
+      wg.Done()
+    })
+  }
+  wg.Wait()
+  close(errCh)
+
+  if len(errCh) > 0 {
+    first := <-errCh
+    count := 1
+    for range errCh {
+      count++
+    }
+    return fmt.Errorf("kafka publish failed for %d/%d messages: %w", count, len(messages), first)
+  }
+  return nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (p *NewKafka) publishMessages(ctx context.Context, messages []*kgo.Record) error {
if len(messages) == 0 {
return nil
}
p.mu.RLock()
defer p.mu.RUnlock()
if p.client == nil {
return nil // Skip if no client configured
}
var wg sync.WaitGroup
wg.Add(len(messages))
// Publish to all configured producers
for _, msg := range messages {
p.client.Produce(ctx, msg, func(_ *kgo.Record, err error) {
defer wg.Done()
if err != nil {
log.Error().Err(err).Msg("Failed to publish message to Kafka")
}
})
}
wg.Wait()
return nil
}
func (p *NewKafka) publishMessages(ctx context.Context, messages []*kgo.Record) error {
if len(messages) == 0 {
return nil
}
// Grab the client under lock, then release
p.mu.RLock()
client := p.client
p.mu.RUnlock()
if client == nil {
return nil // Skip if no client configured
}
// Ensure we don't wait forever if the broker never acks
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 30*time.Second)
defer cancel()
}
errCh := make(chan error, len(messages))
var wg sync.WaitGroup
wg.Add(len(messages))
// Publish to all configured producers and collect errors
for _, msg := range messages {
client.Produce(ctx, msg, func(_ *kgo.Record, err error) {
if err != nil {
errCh <- err
log.Error().Err(err).Msg("Failed to publish message to Kafka")
}
wg.Done()
})
}
wg.Wait()
close(errCh)
// If any errors occurred, return a summary
if len(errCh) > 0 {
first := <-errCh
count := 1
for range errCh {
count++
}
return fmt.Errorf("kafka publish failed for %d/%d messages: %w", count, len(messages), first)
}
return nil
}

Comment on lines +2019 to +2021
// For migration purposes, we don't need ForceConsistentData which causes FINAL keyword issues
return c.GetFullBlockDataWithOptions(chainId, blockNumbers, true)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Flag inversion: comment says no FINAL, but forceConsistentData is true

The comment states ForceConsistentData (FINAL) is not needed for migration, yet the call passes true, enabling FINAL. This is likely inverted and will impact performance/availability.

Apply this diff:

-	// For migration purposes, we don't need ForceConsistentData which causes FINAL keyword issues
-	return c.GetFullBlockDataWithOptions(chainId, blockNumbers, true)
+	// For migration purposes, we don't need ForceConsistentData (avoids FINAL in queries)
+	return c.GetFullBlockDataWithOptions(chainId, blockNumbers, false)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// For migration purposes, we don't need ForceConsistentData which causes FINAL keyword issues
return c.GetFullBlockDataWithOptions(chainId, blockNumbers, true)
}
// For migration purposes, we don't need ForceConsistentData (avoids FINAL in queries)
return c.GetFullBlockDataWithOptions(chainId, blockNumbers, false)
}
🤖 Prompt for AI Agents
In internal/storage/clickhouse.go around lines 2019 to 2021, the comment says
ForceConsistentData (FINAL) is not needed for migration but the call passes
true, enabling FINAL; change the argument from true to false so
GetFullBlockDataWithOptions(chainId, blockNumbers, false) is used (and update
the comment if desired to remain accurate).

Comment on lines +1 to +6
CREATE TABLE IF NOT EXISTS migrated_block_ranges (
chain_id BIGINT NOT NULL,
block_number BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix SQL syntax and add a primary key for upsert support

  • Trailing comma before the closing parenthesis is invalid SQL.
  • Upserts require a UNIQUE or PRIMARY KEY on (chain_id, block_number). Currently missing.

Apply this diff:

-CREATE TABLE IF NOT EXISTS migrated_block_ranges (
-    chain_id BIGINT NOT NULL,
-    block_number BIGINT NOT NULL,
-    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05);
+CREATE TABLE IF NOT EXISTS migrated_block_ranges (
+    chain_id BIGINT NOT NULL,
+    block_number BIGINT NOT NULL,
+    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+    PRIMARY KEY (chain_id, block_number)
+) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
CREATE TABLE IF NOT EXISTS migrated_block_ranges (
chain_id BIGINT NOT NULL,
block_number BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05);
CREATE TABLE IF NOT EXISTS migrated_block_ranges (
chain_id BIGINT NOT NULL,
block_number BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (chain_id, block_number)
) WITH (
fillfactor = 80,
autovacuum_vacuum_scale_factor = 0.1,
autovacuum_analyze_scale_factor = 0.05
);
🤖 Prompt for AI Agents
In internal/tools/postgres/0011_postgres_create_migrated_block_ranges_table.sql
around lines 1-6, remove the trailing comma before the closing parenthesis
(invalid SQL) and add a primary key on (chain_id, block_number) so upserts work;
update the CREATE TABLE definition to drop the extra comma and include "PRIMARY
KEY (chain_id, block_number)" in the column/constraint list.

@jakeloo jakeloo closed this Aug 29, 2025
@nischitpra nischitpra deleted the np/backfill_update branch October 8, 2025 15:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants