diff --git a/cmd/migrate_valid.go b/cmd/migrate_valid.go index cc3e912..7cc62da 100644 --- a/cmd/migrate_valid.go +++ b/cmd/migrate_valid.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "fmt" "math/big" "os" "strconv" @@ -11,6 +12,7 @@ import ( config "github.com/thirdweb-dev/indexer/configs" "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/orchestrator" + "github.com/thirdweb-dev/indexer/internal/publisher/newkafka" "github.com/thirdweb-dev/indexer/internal/rpc" "github.com/thirdweb-dev/indexer/internal/storage" ) @@ -36,6 +38,9 @@ func RunValidationMigration(cmd *cobra.Command, args []string) { migrator := NewMigrator() defer migrator.Close() + // get absolute start and end block for the migration. eg, 0-10M or 10M-20M + absStartBlock, absEndBlock := migrator.getAbsStartAndEndBlock() + rangeStartBlock, rangeEndBlock := migrator.DetermineMigrationBoundaries() log.Info().Msgf("Migrating blocks from %s to %s (both ends inclusive)", rangeStartBlock.String(), rangeEndBlock.String()) @@ -75,9 +80,14 @@ func RunValidationMigration(cmd *cobra.Command, args []string) { blocksToInsert = append(blocksToInsert, blockData) } - err := migrator.targetConn.InsertBlockData(blocksToInsert) + err := migrator.newkafka.PublishBlockData(blocksToInsert) if err != nil { - log.Fatal().Err(err).Msg("Failed to insert blocks to target storage") + log.Fatal().Err(err).Msg("Failed to publish block data") + } + + err = migrator.UpdateMigratedBlock(absStartBlock, absEndBlock, blocksToInsert) + if err != nil { + log.Fatal().Err(err).Msg("Failed to update migrated block range") } currentBlock = new(big.Int).Add(endBlock, big.NewInt(1)) @@ -94,6 +104,8 @@ type Migrator struct { targetConn *storage.ClickHouseConnector migrationBatchSize int rpcBatchSize int + newkafka *newkafka.NewKafka + psql *storage.PostgresConnector } func NewMigrator() *Migrator { @@ -138,6 +150,18 @@ func NewMigrator() *Migrator { log.Fatal().Err(err).Msg("Failed to initialize target storage") } + // publish to new kafka stream i.e new clickhouse database + newpublisher := newkafka.GetInstance() + + // psql cursor for new kafka + psql, err := storage.NewPostgresConnector(config.Cfg.Storage.Staging.Postgres) + if err != nil { + log.Fatal().Err(err).Msg("Failed to initialize psql cursor") + } + + // Create migrated_block_ranges table if it doesn't exist + createMigratedBlockRangesTable(psql) + return &Migrator{ migrationBatchSize: batchSize, rpcBatchSize: rpcBatchSize, @@ -145,16 +169,80 @@ func NewMigrator() *Migrator { storage: s, validator: validator, targetConn: targetConn, + newkafka: newpublisher, + psql: psql, + } +} + +// createMigratedBlockRangesTable creates the migrated_block_ranges table if it doesn't exist +func createMigratedBlockRangesTable(psql *storage.PostgresConnector) { + createTableSQL := ` + CREATE TABLE IF NOT EXISTS migrated_block_ranges ( + chain_id BIGINT NOT NULL, + block_number BIGINT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05) + ` + + // Execute the CREATE TABLE statement + _, err := psql.ExecRaw(createTableSQL) + if err != nil { + log.Warn().Err(err).Msg("Failed to create migrated_block_ranges table") + } + + // Create index if it doesn't exist + createIndexSQL := ` + CREATE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_block + ON migrated_block_ranges(chain_id, block_number DESC) + ` + _, err = psql.ExecRaw(createIndexSQL) + if err != nil { + log.Warn().Err(err).Msg("Failed to create index on migrated_block_ranges table") + } + + // Create trigger if it doesn't exist + createTriggerSQL := ` + CREATE TRIGGER update_migrated_block_ranges_updated_at + BEFORE UPDATE ON migrated_block_ranges + FOR EACH ROW EXECUTE FUNCTION update_updated_at_column() + ` + _, err = psql.ExecRaw(createTriggerSQL) + if err != nil { + log.Warn().Err(err).Msg("Failed to create trigger on migrated_block_ranges table") } } func (m *Migrator) Close() { m.rpcClient.Close() + m.newkafka.Close() + m.psql.Close() } func (m *Migrator) DetermineMigrationBoundaries() (*big.Int, *big.Int) { + startBlock, endBlock := m.getAbsStartAndEndBlock() + + latestMigratedBlock, err := m.GetMaxBlockNumberInRange(startBlock, endBlock) + if err != nil { + log.Fatal().Err(err).Msg("Failed to get latest block from target storage") + } + log.Info().Msgf("Latest block in target storage: %d", latestMigratedBlock) + + if latestMigratedBlock.Cmp(endBlock) >= 0 { + log.Fatal().Msgf("Full range is already migrated") + } + + // if configured start block is less than or equal to already migrated and migrated block is not 0, start from last migrated + 1 + if startBlock.Cmp(latestMigratedBlock) <= 0 && latestMigratedBlock.Sign() > 0 { + startBlock = new(big.Int).Add(latestMigratedBlock, big.NewInt(1)) + } + + return startBlock, endBlock +} + +func (m *Migrator) getAbsStartAndEndBlock() (*big.Int, *big.Int) { // get latest block from main storage - latestBlockStored, err := m.storage.MainStorage.GetMaxBlockNumber(m.rpcClient.GetChainID()) + latestBlockStored, err := m.rpcClient.GetLatestBlockNumber(context.Background()) if err != nil { log.Fatal().Err(err).Msg("Failed to get latest block from main storage") } @@ -187,22 +275,103 @@ func (m *Migrator) DetermineMigrationBoundaries() (*big.Int, *big.Int) { startBlock = configuredStartBlock } - latestMigratedBlock, err := m.targetConn.GetMaxBlockNumberInRange(m.rpcClient.GetChainID(), startBlock, endBlock) + return startBlock, endBlock +} + +func (m *Migrator) UpdateMigratedBlock(startBlock *big.Int, endBlock *big.Int, blockData []common.BlockData) error { + if len(blockData) == 0 { + return nil + } + + maxBlockNumber := big.NewInt(0) + for _, block := range blockData { + if block.Block.Number.Cmp(maxBlockNumber) > 0 { + maxBlockNumber = block.Block.Number + } + } + + chainID := blockData[0].Block.ChainId + err := m.upsertMigratedBlockRange(chainID, maxBlockNumber, startBlock, endBlock) if err != nil { - log.Fatal().Err(err).Msg("Failed to get latest block from target storage") + return fmt.Errorf("failed to update migrated block range: %w", err) } - log.Info().Msgf("Latest block in target storage: %d", latestMigratedBlock) + return nil +} - if latestMigratedBlock.Cmp(endBlock) >= 0 { - log.Fatal().Msgf("Full range is already migrated") +func (m *Migrator) GetMaxBlockNumberInRange(startBlock *big.Int, endBlock *big.Int) (*big.Int, error) { + // Get chain ID from RPC client + chainID := m.rpcClient.GetChainID() + + // Get the maximum end_block for the given chain_id + maxBlock, err := m.getMaxMigratedBlock(chainID, startBlock, endBlock) + if err != nil || maxBlock.Cmp(startBlock) < 0 { + log.Warn().Err(err).Msg("Failed to get last migrated block, returning start block - 1") + // Return startBlock - 1 so that the next block to migrate is startBlock + return new(big.Int).Sub(startBlock, big.NewInt(1)), nil } - // if configured start block is less than or equal to already migrated and migrated block is not 0, start from last migrated + 1 - if startBlock.Cmp(latestMigratedBlock) <= 0 && latestMigratedBlock.Sign() > 0 { - startBlock = new(big.Int).Add(latestMigratedBlock, big.NewInt(1)) + // Return the actual maxBlock (not +1) since this represents the last migrated block + return maxBlock, nil +} + +// upsertMigratedBlockRange upserts a row for the given chain_id and block range +func (m *Migrator) upsertMigratedBlockRange(chainID, blockNumber, startBlock, endBlock *big.Int) error { + // First, try to update existing rows that overlap with this range + updateSQL := ` + UPDATE migrated_block_ranges + SET block_number = $1, updated_at = CURRENT_TIMESTAMP + WHERE chain_id = $2 AND block_number >= $3 AND block_number <= $4 + ` + + result, err := m.psql.ExecRaw(updateSQL, blockNumber.String(), chainID.String(), startBlock.String(), endBlock.String()) + if err != nil { + return fmt.Errorf("failed to update migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String()) } - return startBlock, endBlock + // Check if any rows were updated + rowsAffected, err := result.RowsAffected() + if err != nil { + log.Warn().Err(err).Msgf("Failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String()) + return fmt.Errorf("failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String()) + } + + // If no rows were updated, insert a new row + if rowsAffected == 0 { + insertSQL := ` + INSERT INTO migrated_block_ranges (chain_id, block_number, created_at, updated_at) + VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ` + + _, err := m.psql.ExecRaw(insertSQL, chainID.String(), blockNumber.String()) + if err != nil { + return fmt.Errorf("failed to insert migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String()) + } + } + return nil +} + +// getMaxMigratedBlock gets the maximum block number within the given range for the given chain_id +func (m *Migrator) getMaxMigratedBlock(chainID, startBlock, endBlock *big.Int) (*big.Int, error) { + querySQL := ` + SELECT COALESCE(MAX(block_number), 0) as max_block + FROM migrated_block_ranges + WHERE chain_id = $1 + AND block_number >= $2 + AND block_number <= $3 + ` + + var maxBlockStr string + err := m.psql.QueryRowRaw(querySQL, chainID.String(), startBlock.String(), endBlock.String()).Scan(&maxBlockStr) + if err != nil { + return nil, fmt.Errorf("failed to query migrated block ranges: %w", err) + } + + maxBlock, ok := new(big.Int).SetString(maxBlockStr, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", maxBlockStr) + } + + return maxBlock, nil } func (m *Migrator) FetchBlocksFromRPC(blockNumbers []*big.Int) ([]common.BlockData, error) { diff --git a/cmd/root.go b/cmd/root.go index 6ba9702..b669b61 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -149,6 +149,9 @@ func init() { rootCmd.PersistentFlags().String("publisher-events-topicName", "", "Kafka topic name for events") rootCmd.PersistentFlags().String("publisher-events-addressFilter", "", "Filter events by address") rootCmd.PersistentFlags().String("publisher-events-topic0Filter", "", "Filter events by topic0") + rootCmd.PersistentFlags().String("newKafka-brokers", "", "New Kafka brokers (comma-separated)") + rootCmd.PersistentFlags().String("newKafka-username", "", "New Kafka username") + rootCmd.PersistentFlags().String("newKafka-password", "", "New Kafka password") rootCmd.PersistentFlags().Int("workMode-checkIntervalMinutes", 10, "How often to check work mode in minutes") rootCmd.PersistentFlags().Int64("workMode-liveModeThreshold", 500, "How many blocks the indexer can be behind before switching to live mode") rootCmd.PersistentFlags().String("validation-mode", "strict", "Validation mode. Strict will validate logsBloom and transactionsRoot. Minimal will validate transaction count and logs existence.") @@ -265,6 +268,9 @@ func init() { viper.BindPFlag("publisher.events.topicName", rootCmd.PersistentFlags().Lookup("publisher-events-topicName")) viper.BindPFlag("publisher.events.addressFilter", rootCmd.PersistentFlags().Lookup("publisher-events-addressFilter")) viper.BindPFlag("publisher.events.topic0Filter", rootCmd.PersistentFlags().Lookup("publisher-events-topic0Filter")) + viper.BindPFlag("newKafka.brokers", rootCmd.PersistentFlags().Lookup("newKafka-brokers")) + viper.BindPFlag("newKafka.username", rootCmd.PersistentFlags().Lookup("newKafka-username")) + viper.BindPFlag("newKafka.password", rootCmd.PersistentFlags().Lookup("newKafka-password")) viper.BindPFlag("workMode.checkIntervalMinutes", rootCmd.PersistentFlags().Lookup("workMode-checkIntervalMinutes")) viper.BindPFlag("workMode.liveModeThreshold", rootCmd.PersistentFlags().Lookup("workMode-liveModeThreshold")) viper.BindPFlag("validation.mode", rootCmd.PersistentFlags().Lookup("validation-mode")) diff --git a/configs/config.go b/configs/config.go index 0be0feb..bfe02e5 100644 --- a/configs/config.go +++ b/configs/config.go @@ -182,6 +182,12 @@ type PublisherConfig struct { Events EventPublisherConfig `mapstructure:"events"` } +type NewKafkaConfig struct { + Brokers string `mapstructure:"brokers"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` +} + type WorkModeConfig struct { CheckIntervalMinutes int `mapstructure:"checkIntervalMinutes"` LiveModeThreshold int64 `mapstructure:"liveModeThreshold"` @@ -201,6 +207,7 @@ type Config struct { Storage StorageConfig `mapstructure:"storage"` API APIConfig `mapstructure:"api"` Publisher PublisherConfig `mapstructure:"publisher"` + NewKafka NewKafkaConfig `mapstructure:"newKafka"` WorkMode WorkModeConfig `mapstructure:"workMode"` Validation ValidationConfig `mapstructure:"validation"` } diff --git a/configs/kafka_config.yml b/configs/kafka_config.yml new file mode 100644 index 0000000..48cd6fa --- /dev/null +++ b/configs/kafka_config.yml @@ -0,0 +1,86 @@ +rpc: + url: https://eth.llamarpc.com + chainId: "1" + blockReceipts: + enabled: true + +log: + level: debug + prettify: true + +poller: + enabled: true + interval: 2000 + blocksPerPoll: 100 + +committer: + enabled: true + interval: 2000 + blocksPerCommit: 100 + +storage: + main: + clickhouse: + host: localhost + port: 9440 + username: admin + password: password + database: default + disableTLS: true + asyncInsert: true + maxRowsPerInsert: 1000 + maxOpenConns: 50 + maxIdleConns: 10 + + staging: + postgres: + host: localhost + port: 5432 + username: admin + password: password + database: insight + sslMode: disable + maxOpenConns: 50 + maxIdleConns: 10 + maxConnLifetime: 300 + connectTimeout: 10 + + orchestrator: + postgres: + host: localhost + port: 5432 + username: admin + password: password + database: insight + sslMode: disable + maxOpenConns: 50 + maxIdleConns: 10 + maxConnLifetime: 300 + connectTimeout: 10 + +api: + host: localhost:3000 + basicAuth: + username: admin + password: admin + +publisher: + enabled: false + mode: default + +# New Kafka configuration +newKafka: + brokers: "localhost:9092" + username: "" + password: "" + +validation: + mode: minimal + +# Work mode configuration - Controls system behavior based on blockchain state +workMode: + # Interval in minutes to check if system should switch between live/historical mode + checkIntervalMinutes: 10 + # Block number threshold to determine if system is in "live mode" (near chain head) + # Setting this very high forces backfill mode for testing + liveModeThreshold: 1000000 diff --git a/internal/publisher/newkafka/publisherNewKafka.go b/internal/publisher/newkafka/publisherNewKafka.go new file mode 100644 index 0000000..7768102 --- /dev/null +++ b/internal/publisher/newkafka/publisherNewKafka.go @@ -0,0 +1,178 @@ +package newkafka + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "net" + "strings" + "sync" + "time" + + "github.com/rs/zerolog/log" + config "github.com/thirdweb-dev/indexer/configs" + "github.com/thirdweb-dev/indexer/internal/common" + "github.com/thirdweb-dev/indexer/internal/metrics" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/sasl/plain" +) + +type NewKafka struct { + client *kgo.Client + mu sync.RWMutex +} + +var ( + instance *NewKafka + once sync.Once +) + +type PublishableMessage[T common.BlockModel | common.TransactionModel | common.LogModel | common.TraceModel] struct { + Data T `json:"data"` + Status string `json:"status"` +} + +// GetInstance returns the singleton Publisher instance +func GetInstance() *NewKafka { + once.Do(func() { + instance = &NewKafka{} + if err := instance.initialize(); err != nil { + log.Error().Err(err).Msg("Failed to initialize publisher") + } + }) + return instance +} + +func (p *NewKafka) initialize() error { + p.mu.Lock() + defer p.mu.Unlock() + + if config.Cfg.NewKafka.Brokers == "" { + log.Info().Msg("No Kafka brokers configured, skipping publisher initialization") + return nil + } + + brokers := strings.Split(config.Cfg.NewKafka.Brokers, ",") + opts := []kgo.Opt{ + kgo.SeedBrokers(brokers...), + kgo.AllowAutoTopicCreation(), + kgo.ProducerBatchCompression(kgo.SnappyCompression()), + kgo.ClientID(fmt.Sprintf("insight-indexer-%s", config.Cfg.RPC.ChainID)), + kgo.MaxBufferedRecords(1_000_000), + kgo.ProducerBatchMaxBytes(16_000_000), + kgo.RecordPartitioner(kgo.UniformBytesPartitioner(1_000_000, false, false, nil)), + kgo.MetadataMaxAge(60 * time.Second), + kgo.DialTimeout(10 * time.Second), + } + + if config.Cfg.NewKafka.Username != "" && config.Cfg.NewKafka.Password != "" { + opts = append(opts, kgo.SASL(plain.Auth{ + User: config.Cfg.NewKafka.Username, + Pass: config.Cfg.NewKafka.Password, + }.AsMechanism())) + tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}} + opts = append(opts, kgo.Dialer(tlsDialer.DialContext)) + } + + client, err := kgo.NewClient(opts...) + if err != nil { + return fmt.Errorf("failed to create Kafka client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := client.Ping(ctx); err != nil { + client.Close() + return fmt.Errorf("failed to connect to Kafka: %v", err) + } + p.client = client + return nil +} + +func (p *NewKafka) PublishBlockData(blockData []common.BlockData) error { + return p.publishBlockData(blockData, false) +} + +func (p *NewKafka) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.client != nil { + p.client.Close() + log.Debug().Msg("Publisher client closed") + } + return nil +} + +func (p *NewKafka) publishBlockData(blockData []common.BlockData, isReorg bool) error { + if p.client == nil || len(blockData) == 0 { + return nil + } + + publishStart := time.Now() + + // Prepare messages for blocks, events, transactions and traces + blockdataMessages := make([]*kgo.Record, len(blockData)) + + status := "new" + if isReorg { + status = "reverted" + } + + for i, data := range blockData { + msgJson, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("failed to marshal block data: %v", err) + } + blockdataMessages[i] = &kgo.Record{ + Topic: "block_data", + Key: []byte(fmt.Sprintf("block-%s-%s-%s", status, data.Block.ChainId.String(), data.Block.Hash)), + Value: msgJson, + } + } + + if err := p.publishMessages(context.Background(), blockdataMessages); err != nil { + return fmt.Errorf("failed to publish block messages: %v", err) + } + log.Debug(). + Str("metric", "publish_duration"). + Str("publisher", "newkafka"). + Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds()) + metrics.PublishDuration.Observe(time.Since(publishStart).Seconds()) + metrics.PublisherBlockCounter.Add(float64(len(blockData))) + metrics.LastPublishedBlock.Set(float64(blockData[len(blockData)-1].Block.Number.Int64())) + if isReorg { + metrics.PublisherReorgedBlockCounter.Add(float64(len(blockData))) + } + return nil +} + +func (p *NewKafka) publishMessages(ctx context.Context, messages []*kgo.Record) error { + if len(messages) == 0 { + return nil + } + + p.mu.RLock() + defer p.mu.RUnlock() + + if p.client == nil { + return nil // Skip if no client configured + } + + var wg sync.WaitGroup + wg.Add(len(messages)) + // Publish to all configured producers + for _, msg := range messages { + p.client.Produce(ctx, msg, func(_ *kgo.Record, err error) { + defer wg.Done() + if err != nil { + log.Error().Err(err).Msg("Failed to publish message to Kafka") + } + }) + } + wg.Wait() + + return nil +} diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index c61256b..d5ad86c 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -2016,6 +2016,11 @@ func (c *ClickHouseConnector) FindMissingBlockNumbers(chainId *big.Int, startBlo } func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []*big.Int) (blocks []common.BlockData, err error) { + // For migration purposes, we don't need ForceConsistentData which causes FINAL keyword issues + return c.GetFullBlockDataWithOptions(chainId, blockNumbers, true) +} + +func (c *ClickHouseConnector) GetFullBlockDataWithOptions(chainId *big.Int, blockNumbers []*big.Int, forceConsistentData bool) (blocks []common.BlockData, err error) { // Get blocks, logs and transactions concurrently type blockResult struct { blocks []common.Block @@ -2047,7 +2052,7 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers [] blocksResult, err := c.GetBlocks(QueryFilter{ ChainId: chainId, BlockNumbers: blockNumbers, - ForceConsistentData: true, + ForceConsistentData: forceConsistentData, }) blocksChan <- blockResult{blocks: blocksResult.Data, err: err} }() @@ -2056,7 +2061,7 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers [] logsResult, err := c.GetLogs(QueryFilter{ ChainId: chainId, BlockNumbers: blockNumbers, - ForceConsistentData: true, + ForceConsistentData: forceConsistentData, }) if err != nil { logsChan <- logResult{err: err} @@ -2076,7 +2081,7 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers [] transactionsResult, err := c.GetTransactions(QueryFilter{ ChainId: chainId, BlockNumbers: blockNumbers, - ForceConsistentData: true, + ForceConsistentData: forceConsistentData, }) if err != nil { txsChan <- txResult{err: err} @@ -2096,7 +2101,7 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers [] tracesResult, err := c.GetTraces(QueryFilter{ ChainId: chainId, BlockNumbers: blockNumbers, - ForceConsistentData: true, + ForceConsistentData: forceConsistentData, }) if err != nil { tracesChan <- traceResult{err: err} diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 1476c44..8eef1a7 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -448,3 +448,18 @@ func (p *PostgresConnector) DeleteOlderThan(chainId *big.Int, blockNumber *big.I func (p *PostgresConnector) Close() error { return p.db.Close() } + +// ExecRaw executes a raw SQL statement +func (p *PostgresConnector) ExecRaw(query string, args ...interface{}) (sql.Result, error) { + return p.db.Exec(query, args...) +} + +// QueryRaw executes a raw SQL query and returns the result rows +func (p *PostgresConnector) QueryRaw(query string, args ...interface{}) (*sql.Rows, error) { + return p.db.Query(query, args...) +} + +// QueryRowRaw executes a raw SQL query and returns a single row +func (p *PostgresConnector) QueryRowRaw(query string, args ...interface{}) *sql.Row { + return p.db.QueryRow(query, args...) +} diff --git a/internal/tools/postgres/0011_postgres_create_migrated_block_ranges_table.sql b/internal/tools/postgres/0011_postgres_create_migrated_block_ranges_table.sql new file mode 100644 index 0000000..9eb31a5 --- /dev/null +++ b/internal/tools/postgres/0011_postgres_create_migrated_block_ranges_table.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS migrated_block_ranges ( + chain_id BIGINT NOT NULL, + block_number BIGINT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, +) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05); + +-- Create index for efficient querying by chain_id and block ranges +CREATE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_block ON migrated_block_ranges(chain_id, block_number DESC); + +-- Create trigger to automatically update the updated_at timestamp +CREATE TRIGGER update_migrated_block_ranges_updated_at BEFORE UPDATE ON migrated_block_ranges + FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();