Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 181 additions & 12 deletions cmd/migrate_valid.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"fmt"
"math/big"
"os"
"strconv"
Expand All @@ -11,6 +12,7 @@ import (
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/orchestrator"
"github.com/thirdweb-dev/indexer/internal/publisher/newkafka"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
)
Expand All @@ -36,6 +38,9 @@ func RunValidationMigration(cmd *cobra.Command, args []string) {
migrator := NewMigrator()
defer migrator.Close()

// get absolute start and end block for the migration. eg, 0-10M or 10M-20M
absStartBlock, absEndBlock := migrator.getAbsStartAndEndBlock()

rangeStartBlock, rangeEndBlock := migrator.DetermineMigrationBoundaries()

log.Info().Msgf("Migrating blocks from %s to %s (both ends inclusive)", rangeStartBlock.String(), rangeEndBlock.String())
Expand Down Expand Up @@ -75,9 +80,14 @@ func RunValidationMigration(cmd *cobra.Command, args []string) {
blocksToInsert = append(blocksToInsert, blockData)
}

err := migrator.targetConn.InsertBlockData(blocksToInsert)
err := migrator.newkafka.PublishBlockData(blocksToInsert)
if err != nil {
log.Fatal().Err(err).Msg("Failed to insert blocks to target storage")
log.Fatal().Err(err).Msg("Failed to publish block data")
}

err = migrator.UpdateMigratedBlock(absStartBlock, absEndBlock, blocksToInsert)
if err != nil {
log.Fatal().Err(err).Msg("Failed to update migrated block range")
}

currentBlock = new(big.Int).Add(endBlock, big.NewInt(1))
Expand All @@ -94,6 +104,8 @@ type Migrator struct {
targetConn *storage.ClickHouseConnector
migrationBatchSize int
rpcBatchSize int
newkafka *newkafka.NewKafka
psql *storage.PostgresConnector
}

func NewMigrator() *Migrator {
Expand Down Expand Up @@ -138,23 +150,99 @@ func NewMigrator() *Migrator {
log.Fatal().Err(err).Msg("Failed to initialize target storage")
}

// publish to new kafka stream i.e new clickhouse database
newpublisher := newkafka.GetInstance()

// psql cursor for new kafka
psql, err := storage.NewPostgresConnector(config.Cfg.Storage.Staging.Postgres)
if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize psql cursor")
}

// Create migrated_block_ranges table if it doesn't exist
createMigratedBlockRangesTable(psql)

return &Migrator{
migrationBatchSize: batchSize,
rpcBatchSize: rpcBatchSize,
rpcClient: rpcClient,
storage: s,
validator: validator,
targetConn: targetConn,
newkafka: newpublisher,
psql: psql,
}
}

// createMigratedBlockRangesTable creates the migrated_block_ranges table if it doesn't exist
func createMigratedBlockRangesTable(psql *storage.PostgresConnector) {
createTableSQL := `
CREATE TABLE IF NOT EXISTS migrated_block_ranges (
chain_id BIGINT NOT NULL,
block_number BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05)
`

// Execute the CREATE TABLE statement
_, err := psql.ExecRaw(createTableSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create migrated_block_ranges table")
}

// Create index if it doesn't exist
createIndexSQL := `
CREATE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_block
ON migrated_block_ranges(chain_id, block_number DESC)
`
_, err = psql.ExecRaw(createIndexSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create index on migrated_block_ranges table")
}

// Create trigger if it doesn't exist
createTriggerSQL := `
CREATE TRIGGER update_migrated_block_ranges_updated_at
BEFORE UPDATE ON migrated_block_ranges
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column()
`
_, err = psql.ExecRaw(createTriggerSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create trigger on migrated_block_ranges table")
}
}
Comment on lines +177 to 214
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Create the trigger function and a unique index to support an upsert strategy.

  • The trigger references update_updated_at_column(), which is not created here. Create it before the trigger.
  • Add a unique index on chain_id to enable an efficient ON CONFLICT upsert pattern (recommended below).
 func createMigratedBlockRangesTable(psql *storage.PostgresConnector) {
   createTableSQL := `
       CREATE TABLE IF NOT EXISTS migrated_block_ranges (
         chain_id BIGINT NOT NULL,
         block_number BIGINT NOT NULL,
         created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
         updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
       ) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05)
   `
   // Execute the CREATE TABLE statement
   _, err := psql.ExecRaw(createTableSQL)
   if err != nil {
     log.Warn().Err(err).Msg("Failed to create migrated_block_ranges table")
   }
 
+  // Ensure a unique index on chain_id to allow ON CONFLICT upsert
+  createUniqueIndexSQL := `
+    CREATE UNIQUE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_unique 
+    ON migrated_block_ranges(chain_id)
+  `
+  _, err = psql.ExecRaw(createUniqueIndexSQL)
+  if err != nil {
+    log.Warn().Err(err).Msg("Failed to create unique index on migrated_block_ranges(chain_id)")
+  }
+
   // Create index if it doesn't exist
   createIndexSQL := `
     CREATE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_block 
     ON migrated_block_ranges(chain_id, block_number DESC)
   `
   _, err = psql.ExecRaw(createIndexSQL)
   if err != nil {
     log.Warn().Err(err).Msg("Failed to create index on migrated_block_ranges table")
   }
 
-  // Create trigger if it doesn't exist
+  // Create/replace trigger function, then create trigger if it doesn't exist
+  createFunctionSQL := `
+    CREATE OR REPLACE FUNCTION update_updated_at_column()
+    RETURNS trigger AS $$
+    BEGIN
+      NEW.updated_at = CURRENT_TIMESTAMP;
+      RETURN NEW;
+    END;
+    $$ LANGUAGE plpgsql;
+  `
+  _, err = psql.ExecRaw(createFunctionSQL)
+  if err != nil {
+    log.Warn().Err(err).Msg("Failed to create trigger function update_updated_at_column()")
+  }
+
   createTriggerSQL := `
     CREATE TRIGGER update_migrated_block_ranges_updated_at 
     BEFORE UPDATE ON migrated_block_ranges 
     FOR EACH ROW EXECUTE FUNCTION update_updated_at_column()
   `
   _, err = psql.ExecRaw(createTriggerSQL)
   if err != nil {
     log.Warn().Err(err).Msg("Failed to create trigger on migrated_block_ranges table")
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// createMigratedBlockRangesTable creates the migrated_block_ranges table if it doesn't exist
func createMigratedBlockRangesTable(psql *storage.PostgresConnector) {
createTableSQL := `
CREATE TABLE IF NOT EXISTS migrated_block_ranges (
chain_id BIGINT NOT NULL,
block_number BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05)
`
// Execute the CREATE TABLE statement
_, err := psql.ExecRaw(createTableSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create migrated_block_ranges table")
}
// Create index if it doesn't exist
createIndexSQL := `
CREATE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_block
ON migrated_block_ranges(chain_id, block_number DESC)
`
_, err = psql.ExecRaw(createIndexSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create index on migrated_block_ranges table")
}
// Create trigger if it doesn't exist
createTriggerSQL := `
CREATE TRIGGER update_migrated_block_ranges_updated_at
BEFORE UPDATE ON migrated_block_ranges
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column()
`
_, err = psql.ExecRaw(createTriggerSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create trigger on migrated_block_ranges table")
}
}
func createMigratedBlockRangesTable(psql *storage.PostgresConnector) {
createTableSQL := `
CREATE TABLE IF NOT EXISTS migrated_block_ranges (
chain_id BIGINT NOT NULL,
block_number BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05)
`
// Execute the CREATE TABLE statement
_, err := psql.ExecRaw(createTableSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create migrated_block_ranges table")
}
// Ensure a unique index on chain_id to allow ON CONFLICT upsert
createUniqueIndexSQL := `
CREATE UNIQUE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_unique
ON migrated_block_ranges(chain_id)
`
_, err = psql.ExecRaw(createUniqueIndexSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create unique index on migrated_block_ranges(chain_id)")
}
// Create index if it doesn't exist
createIndexSQL := `
CREATE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_block
ON migrated_block_ranges(chain_id, block_number DESC)
`
_, err = psql.ExecRaw(createIndexSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create index on migrated_block_ranges table")
}
// Create/replace trigger function, then create trigger if it doesn't exist
createFunctionSQL := `
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS trigger AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
`
_, err = psql.ExecRaw(createFunctionSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create trigger function update_updated_at_column()")
}
createTriggerSQL := `
CREATE TRIGGER update_migrated_block_ranges_updated_at
BEFORE UPDATE ON migrated_block_ranges
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column()
`
_, err = psql.ExecRaw(createTriggerSQL)
if err != nil {
log.Warn().Err(err).Msg("Failed to create trigger on migrated_block_ranges table")
}
}
🤖 Prompt for AI Agents
In cmd/migrate_valid.go around lines 177 to 214, the SQL creates a trigger that
calls update_updated_at_column() but never defines that function and it also
lacks a unique index to support ON CONFLICT upserts; add a CREATE FUNCTION IF
NOT EXISTS update_updated_at_column() that sets NEW.updated_at =
CURRENT_TIMESTAMP and returns NEW before creating the trigger, and create a
unique index (e.g., UNIQUE INDEX IF NOT EXISTS ux_migrated_block_ranges_chain ON
migrated_block_ranges(chain_id)) so you can use ON CONFLICT (chain_id) DO UPDATE
for upserts; ensure the function is created before the trigger and use IF NOT
EXISTS on both function and index to make the migration idempotent.


func (m *Migrator) Close() {
m.rpcClient.Close()
m.newkafka.Close()
m.psql.Close()
}

func (m *Migrator) DetermineMigrationBoundaries() (*big.Int, *big.Int) {
startBlock, endBlock := m.getAbsStartAndEndBlock()

latestMigratedBlock, err := m.GetMaxBlockNumberInRange(startBlock, endBlock)
if err != nil {
log.Fatal().Err(err).Msg("Failed to get latest block from target storage")
}
log.Info().Msgf("Latest block in target storage: %d", latestMigratedBlock)

if latestMigratedBlock.Cmp(endBlock) >= 0 {
log.Fatal().Msgf("Full range is already migrated")
}

// if configured start block is less than or equal to already migrated and migrated block is not 0, start from last migrated + 1
if startBlock.Cmp(latestMigratedBlock) <= 0 && latestMigratedBlock.Sign() > 0 {
startBlock = new(big.Int).Add(latestMigratedBlock, big.NewInt(1))
}

return startBlock, endBlock
}

func (m *Migrator) getAbsStartAndEndBlock() (*big.Int, *big.Int) {
// get latest block from main storage
latestBlockStored, err := m.storage.MainStorage.GetMaxBlockNumber(m.rpcClient.GetChainID())
latestBlockStored, err := m.rpcClient.GetLatestBlockNumber(context.Background())
if err != nil {
log.Fatal().Err(err).Msg("Failed to get latest block from main storage")
}
Expand Down Expand Up @@ -187,22 +275,103 @@ func (m *Migrator) DetermineMigrationBoundaries() (*big.Int, *big.Int) {
startBlock = configuredStartBlock
}

latestMigratedBlock, err := m.targetConn.GetMaxBlockNumberInRange(m.rpcClient.GetChainID(), startBlock, endBlock)
return startBlock, endBlock
}

func (m *Migrator) UpdateMigratedBlock(startBlock *big.Int, endBlock *big.Int, blockData []common.BlockData) error {
if len(blockData) == 0 {
return nil
}

maxBlockNumber := big.NewInt(0)
for _, block := range blockData {
if block.Block.Number.Cmp(maxBlockNumber) > 0 {
maxBlockNumber = block.Block.Number
}
}

chainID := blockData[0].Block.ChainId
err := m.upsertMigratedBlockRange(chainID, maxBlockNumber, startBlock, endBlock)
if err != nil {
log.Fatal().Err(err).Msg("Failed to get latest block from target storage")
return fmt.Errorf("failed to update migrated block range: %w", err)
}
log.Info().Msgf("Latest block in target storage: %d", latestMigratedBlock)
return nil
}

if latestMigratedBlock.Cmp(endBlock) >= 0 {
log.Fatal().Msgf("Full range is already migrated")
func (m *Migrator) GetMaxBlockNumberInRange(startBlock *big.Int, endBlock *big.Int) (*big.Int, error) {
// Get chain ID from RPC client
chainID := m.rpcClient.GetChainID()

// Get the maximum end_block for the given chain_id
maxBlock, err := m.getMaxMigratedBlock(chainID, startBlock, endBlock)
if err != nil || maxBlock.Cmp(startBlock) < 0 {
log.Warn().Err(err).Msg("Failed to get last migrated block, returning start block - 1")
// Return startBlock - 1 so that the next block to migrate is startBlock
return new(big.Int).Sub(startBlock, big.NewInt(1)), nil
}

// if configured start block is less than or equal to already migrated and migrated block is not 0, start from last migrated + 1
if startBlock.Cmp(latestMigratedBlock) <= 0 && latestMigratedBlock.Sign() > 0 {
startBlock = new(big.Int).Add(latestMigratedBlock, big.NewInt(1))
// Return the actual maxBlock (not +1) since this represents the last migrated block
return maxBlock, nil
}

// upsertMigratedBlockRange upserts a row for the given chain_id and block range
func (m *Migrator) upsertMigratedBlockRange(chainID, blockNumber, startBlock, endBlock *big.Int) error {
// First, try to update existing rows that overlap with this range
updateSQL := `
UPDATE migrated_block_ranges
SET block_number = $1, updated_at = CURRENT_TIMESTAMP
WHERE chain_id = $2 AND block_number >= $3 AND block_number <= $4
`

result, err := m.psql.ExecRaw(updateSQL, blockNumber.String(), chainID.String(), startBlock.String(), endBlock.String())
if err != nil {
return fmt.Errorf("failed to update migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
}

return startBlock, endBlock
// Check if any rows were updated
rowsAffected, err := result.RowsAffected()
if err != nil {
log.Warn().Err(err).Msgf("Failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
return fmt.Errorf("failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
}

// If no rows were updated, insert a new row
if rowsAffected == 0 {
insertSQL := `
INSERT INTO migrated_block_ranges (chain_id, block_number, created_at, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
`

_, err := m.psql.ExecRaw(insertSQL, chainID.String(), blockNumber.String())
if err != nil {
return fmt.Errorf("failed to insert migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
}
}
return nil
}
Comment on lines +317 to +351
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace multi-row UPDATE + conditional INSERT with an idempotent UPSERT.

The current UPDATE changes all rows in [startBlock,endBlock] to the same block_number and may create duplicates over time. A single-row “latest progress per chain” model is simpler, faster, and avoids bloat.

 func (m *Migrator) upsertMigratedBlockRange(chainID, blockNumber, startBlock, endBlock *big.Int) error {
-  // First, try to update existing rows that overlap with this range
-  updateSQL := `
-    UPDATE migrated_block_ranges 
-    SET block_number = $1, updated_at = CURRENT_TIMESTAMP
-    WHERE chain_id = $2 AND block_number >= $3 AND block_number <= $4
-  `
-  result, err := m.psql.ExecRaw(updateSQL, blockNumber.String(), chainID.String(), startBlock.String(), endBlock.String())
-  if err != nil {
-    return fmt.Errorf("failed to update migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
-  }
-  // Check if any rows were updated
-  rowsAffected, err := result.RowsAffected()
-  if err != nil {
-    log.Warn().Err(err).Msgf("Failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
-    return fmt.Errorf("failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
-  }
-  // If no rows were updated, insert a new row
-  if rowsAffected == 0 {
-    insertSQL := `
-      INSERT INTO migrated_block_ranges (chain_id, block_number, created_at, updated_at)
-      VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
-    `
-    _, err := m.psql.ExecRaw(insertSQL, chainID.String(), blockNumber.String())
-    if err != nil {
-      return fmt.Errorf("failed to insert migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
-    }
-  }
-  return nil
+  upsertSQL := `
+    INSERT INTO migrated_block_ranges (chain_id, block_number, created_at, updated_at)
+    VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
+    ON CONFLICT (chain_id) DO UPDATE
+    SET block_number = GREATEST(EXCLUDED.block_number, migrated_block_ranges.block_number),
+        updated_at = CURRENT_TIMESTAMP
+  `
+  _, err := m.psql.ExecRaw(upsertSQL, chainID.String(), blockNumber.String())
+  if err != nil {
+    return fmt.Errorf(
+      "failed to upsert migrated block range for chain %s to block %s (abs range %s-%s): %w",
+      chainID.String(), blockNumber.String(), startBlock.String(), endBlock.String(), err,
+    )
+  }
+  return nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// upsertMigratedBlockRange upserts a row for the given chain_id and block range
func (m *Migrator) upsertMigratedBlockRange(chainID, blockNumber, startBlock, endBlock *big.Int) error {
// First, try to update existing rows that overlap with this range
updateSQL := `
UPDATE migrated_block_ranges
SET block_number = $1, updated_at = CURRENT_TIMESTAMP
WHERE chain_id = $2 AND block_number >= $3 AND block_number <= $4
`
result, err := m.psql.ExecRaw(updateSQL, blockNumber.String(), chainID.String(), startBlock.String(), endBlock.String())
if err != nil {
return fmt.Errorf("failed to update migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
}
return startBlock, endBlock
// Check if any rows were updated
rowsAffected, err := result.RowsAffected()
if err != nil {
log.Warn().Err(err).Msgf("Failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
return fmt.Errorf("failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
}
// If no rows were updated, insert a new row
if rowsAffected == 0 {
insertSQL := `
INSERT INTO migrated_block_ranges (chain_id, block_number, created_at, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
`
_, err := m.psql.ExecRaw(insertSQL, chainID.String(), blockNumber.String())
if err != nil {
return fmt.Errorf("failed to insert migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
}
}
return nil
}
func (m *Migrator) upsertMigratedBlockRange(chainID, blockNumber, startBlock, endBlock *big.Int) error {
upsertSQL := `
INSERT INTO migrated_block_ranges (chain_id, block_number, created_at, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (chain_id) DO UPDATE
SET block_number = GREATEST(EXCLUDED.block_number, migrated_block_ranges.block_number),
updated_at = CURRENT_TIMESTAMP
`
_, err := m.psql.ExecRaw(upsertSQL, chainID.String(), blockNumber.String())
if err != nil {
return fmt.Errorf(
"failed to upsert migrated block range for chain %s to block %s (abs range %s-%s): %w",
chainID.String(), blockNumber.String(), startBlock.String(), endBlock.String(), err,
)
}
return nil
}
🤖 Prompt for AI Agents
In cmd/migrate_valid.go around lines 317 to 351, the current logic runs a
multi-row UPDATE across the range and conditionally INSERTs, which can bloat
rows and produce duplicates; replace this with a single idempotent UPSERT that
maintains one latest-progress row per chain. Create/use a unique constraint on
chain_id (if not present) and perform an INSERT ... ON CONFLICT (chain_id) DO
UPDATE that sets block_number = EXCLUDED.block_number and updated_at =
CURRENT_TIMESTAMP (and created_at only on insert), pass chainID and blockNumber
only, and propagate any errors from the Exec call with descriptive messages;
remove the range-based UPDATE/conditional INSERT logic so the operation is a
single atomic upsert per chain.


// getMaxMigratedBlock gets the maximum block number within the given range for the given chain_id
func (m *Migrator) getMaxMigratedBlock(chainID, startBlock, endBlock *big.Int) (*big.Int, error) {
querySQL := `
SELECT COALESCE(MAX(block_number), 0) as max_block
FROM migrated_block_ranges
WHERE chain_id = $1
AND block_number >= $2
AND block_number <= $3
`

var maxBlockStr string
err := m.psql.QueryRowRaw(querySQL, chainID.String(), startBlock.String(), endBlock.String()).Scan(&maxBlockStr)
if err != nil {
return nil, fmt.Errorf("failed to query migrated block ranges: %w", err)
}

maxBlock, ok := new(big.Int).SetString(maxBlockStr, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", maxBlockStr)
}

return maxBlock, nil
}

func (m *Migrator) FetchBlocksFromRPC(blockNumbers []*big.Int) ([]common.BlockData, error) {
Expand Down
6 changes: 6 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func init() {
rootCmd.PersistentFlags().String("publisher-events-topicName", "", "Kafka topic name for events")
rootCmd.PersistentFlags().String("publisher-events-addressFilter", "", "Filter events by address")
rootCmd.PersistentFlags().String("publisher-events-topic0Filter", "", "Filter events by topic0")
rootCmd.PersistentFlags().String("newKafka-brokers", "", "New Kafka brokers (comma-separated)")
rootCmd.PersistentFlags().String("newKafka-username", "", "New Kafka username")
rootCmd.PersistentFlags().String("newKafka-password", "", "New Kafka password")
rootCmd.PersistentFlags().Int("workMode-checkIntervalMinutes", 10, "How often to check work mode in minutes")
rootCmd.PersistentFlags().Int64("workMode-liveModeThreshold", 500, "How many blocks the indexer can be behind before switching to live mode")
rootCmd.PersistentFlags().String("validation-mode", "strict", "Validation mode. Strict will validate logsBloom and transactionsRoot. Minimal will validate transaction count and logs existence.")
Expand Down Expand Up @@ -265,6 +268,9 @@ func init() {
viper.BindPFlag("publisher.events.topicName", rootCmd.PersistentFlags().Lookup("publisher-events-topicName"))
viper.BindPFlag("publisher.events.addressFilter", rootCmd.PersistentFlags().Lookup("publisher-events-addressFilter"))
viper.BindPFlag("publisher.events.topic0Filter", rootCmd.PersistentFlags().Lookup("publisher-events-topic0Filter"))
viper.BindPFlag("newKafka.brokers", rootCmd.PersistentFlags().Lookup("newKafka-brokers"))
viper.BindPFlag("newKafka.username", rootCmd.PersistentFlags().Lookup("newKafka-username"))
viper.BindPFlag("newKafka.password", rootCmd.PersistentFlags().Lookup("newKafka-password"))
viper.BindPFlag("workMode.checkIntervalMinutes", rootCmd.PersistentFlags().Lookup("workMode-checkIntervalMinutes"))
viper.BindPFlag("workMode.liveModeThreshold", rootCmd.PersistentFlags().Lookup("workMode-liveModeThreshold"))
viper.BindPFlag("validation.mode", rootCmd.PersistentFlags().Lookup("validation-mode"))
Expand Down
7 changes: 7 additions & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ type PublisherConfig struct {
Events EventPublisherConfig `mapstructure:"events"`
}

type NewKafkaConfig struct {
Brokers string `mapstructure:"brokers"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
}

type WorkModeConfig struct {
CheckIntervalMinutes int `mapstructure:"checkIntervalMinutes"`
LiveModeThreshold int64 `mapstructure:"liveModeThreshold"`
Expand All @@ -201,6 +207,7 @@ type Config struct {
Storage StorageConfig `mapstructure:"storage"`
API APIConfig `mapstructure:"api"`
Publisher PublisherConfig `mapstructure:"publisher"`
NewKafka NewKafkaConfig `mapstructure:"newKafka"`
WorkMode WorkModeConfig `mapstructure:"workMode"`
Validation ValidationConfig `mapstructure:"validation"`
}
Expand Down
86 changes: 86 additions & 0 deletions configs/kafka_config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
rpc:
url: https://eth.llamarpc.com
chainId: "1"
blockReceipts:
enabled: true

log:
level: debug
prettify: true

poller:
enabled: true
interval: 2000
blocksPerPoll: 100

committer:
enabled: true
interval: 2000
blocksPerCommit: 100

storage:
main:
clickhouse:
host: localhost
port: 9440
username: admin
password: password
database: default
disableTLS: true
asyncInsert: true
maxRowsPerInsert: 1000
maxOpenConns: 50
maxIdleConns: 10

staging:
postgres:
host: localhost
port: 5432
username: admin
password: password
database: insight
sslMode: disable
maxOpenConns: 50
maxIdleConns: 10
maxConnLifetime: 300
connectTimeout: 10

orchestrator:
postgres:
host: localhost
port: 5432
username: admin
password: password
database: insight
sslMode: disable
maxOpenConns: 50
maxIdleConns: 10
maxConnLifetime: 300
connectTimeout: 10

api:
host: localhost:3000
basicAuth:
username: admin
password: admin

publisher:
enabled: false
mode: default

# New Kafka configuration
newKafka:
brokers: "localhost:9092"
username: ""
password: ""

validation:
mode: minimal

# Work mode configuration - Controls system behavior based on blockchain state
workMode:
# Interval in minutes to check if system should switch between live/historical mode
checkIntervalMinutes: 10
# Block number threshold to determine if system is in "live mode" (near chain head)
# Setting this very high forces backfill mode for testing
liveModeThreshold: 1000000
Loading
Loading