Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions cmd/migrate_valid.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ func processBlockRange(ctx context.Context, migrator *Migrator, workerID int, st
}

blockNumbers := generateBlockNumbersForRange(currentBlock, batchEndBlock)
log.Info().Msgf("Worker %d: Processing blocks %s to %s", workerID, blockNumbers[0].String(), blockNumbers[len(blockNumbers)-1].String())

// Fetch valid blocks from source
fetchStartTime := time.Now()
Expand All @@ -214,7 +213,6 @@ func processBlockRange(ctx context.Context, migrator *Migrator, workerID int, st
time.Sleep(3 * time.Second)
continue
}
log.Debug().Dur("duration", fetchDuration).Int("blocks_fetched", len(validBlocksForRange)).Msgf("Worker %d: Fetched valid blocks from source", workerID)

// Build map of fetched blocks
mapBuildStartTime := time.Now()
Expand All @@ -231,10 +229,11 @@ func processBlockRange(ctx context.Context, migrator *Migrator, workerID int, st
}
}
mapBuildDuration := time.Since(mapBuildStartTime)
log.Debug().Dur("duration", mapBuildDuration).Int("missing_blocks", len(missingBlocks)).Msgf("Worker %d: Identified missing blocks", workerID)

// Fetch missing blocks from RPC
if len(missingBlocks) > 0 {
log.Debug().Dur("duration", mapBuildDuration).Int("missing_blocks", len(missingBlocks)).Msgf("Worker %d: Identified missing blocks", workerID)

rpcFetchStartTime := time.Now()
validMissingBlocks := migrator.GetValidBlocksFromRPC(missingBlocks)
rpcFetchDuration := time.Since(rpcFetchStartTime)
Expand All @@ -249,13 +248,10 @@ func processBlockRange(ctx context.Context, migrator *Migrator, workerID int, st
}

// Prepare blocks for insertion
prepStartTime := time.Now()
blocksToInsert := make([]common.BlockData, 0, len(blocksToInsertMap))
for _, blockData := range blocksToInsertMap {
blocksToInsert = append(blocksToInsert, blockData)
}
prepDuration := time.Since(prepStartTime)
log.Debug().Dur("duration", prepDuration).Int("blocks_to_insert", len(blocksToInsert)).Msgf("Worker %d: Prepared blocks for insertion", workerID)

// Insert blocks to destination
insertStartTime := time.Now()
Expand All @@ -273,7 +269,9 @@ func processBlockRange(ctx context.Context, migrator *Migrator, workerID int, st
Dur("fetch_duration", fetchDuration).
Dur("insert_duration", insertDuration).
Int("blocks_processed", len(blocksToInsert)).
Msgf("Worker %d: Batch processed successfully", workerID)
Str("start_block_number", blockNumbers[0].String()).
Str("end_block_number", blockNumbers[len(blockNumbers)-1].String()).
Msgf("Worker %d: Batch processed successfully for %s - %s", workerID, blockNumbers[0].String(), blockNumbers[len(blockNumbers)-1].String())

currentBlock = new(big.Int).Add(batchEndBlock, big.NewInt(1))
}
Expand Down Expand Up @@ -315,7 +313,7 @@ func NewMigrator() *Migrator {
log.Fatal().Msg("RPC does not support block receipts, but transactions were indexed with receipts")
}

validator := orchestrator.NewValidator(rpcClient, sourceConnector)
validator := orchestrator.NewValidator(rpcClient, sourceConnector, worker.NewWorker(rpcClient))

destinationConnector, err := storage.NewMainConnector(&config.Cfg.Migrator.Destination, &sourceConnector.OrchestratorStorage)
if err != nil {
Expand Down Expand Up @@ -441,8 +439,7 @@ func (m *Migrator) FetchBlocksFromRPC(blockNumbers []*big.Int) ([]common.BlockDa
blockData := m.worker.Run(context.Background(), blockNumbers)
for _, block := range blockData {
if block.Error != nil {
log.Warn().Err(block.Error).Msgf("Failed to fetch block %s from RPC", block.BlockNumber.String())
continue
return nil, block.Error
}
allBlockData = append(allBlockData, block.Data)
}
Expand Down
10 changes: 0 additions & 10 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func init() {
rootCmd.PersistentFlags().Bool("poller-interval", true, "Poller interval")
rootCmd.PersistentFlags().Int("poller-blocks-per-poll", 10, "How many blocks to poll each interval")
rootCmd.PersistentFlags().Int("poller-from-block", 0, "From which block to start polling")
rootCmd.PersistentFlags().Bool("poller-force-from-block", false, "Force the poller to start from the block specified in `poller-from-block`")
rootCmd.PersistentFlags().Int("poller-until-block", 0, "Until which block to poll")
rootCmd.PersistentFlags().Int("poller-parallel-pollers", 5, "Maximum number of parallel pollers")
rootCmd.PersistentFlags().String("poller-s3-bucket", "", "S3 bucket for poller archive source")
Expand All @@ -77,10 +76,6 @@ func init() {
rootCmd.PersistentFlags().Int("reorgHandler-interval", 1000, "How often to run reorg handler in milliseconds")
rootCmd.PersistentFlags().Int("reorgHandler-blocks-per-scan", 100, "How many blocks to scan for reorgs")
rootCmd.PersistentFlags().Int("reorgHandler-from-block", 0, "From which block to start scanning for reorgs")
rootCmd.PersistentFlags().Bool("reorgHandler-force-from-block", false, "Force the reorg handler to start from the block specified in `reorgHandler-from-block`")
rootCmd.PersistentFlags().Bool("failure-recoverer-enabled", true, "Toggle failure recoverer")
rootCmd.PersistentFlags().Int("failure-recoverer-blocks-per-run", 10, "How many blocks to run failure recoverer for")
rootCmd.PersistentFlags().Int("failure-recoverer-interval", 1000, "How often to run failure recoverer in milliseconds")
rootCmd.PersistentFlags().String("storage-staging-clickhouse-database", "", "Clickhouse database for staging storage")
rootCmd.PersistentFlags().Int("storage-staging-clickhouse-port", 0, "Clickhouse port for staging storage")
rootCmd.PersistentFlags().String("storage-main-clickhouse-database", "", "Clickhouse database for main storage")
Expand Down Expand Up @@ -259,7 +254,6 @@ func init() {
viper.BindPFlag("poller.interval", rootCmd.PersistentFlags().Lookup("poller-interval"))
viper.BindPFlag("poller.blocksPerPoll", rootCmd.PersistentFlags().Lookup("poller-blocks-per-poll"))
viper.BindPFlag("poller.fromBlock", rootCmd.PersistentFlags().Lookup("poller-from-block"))
viper.BindPFlag("poller.forceFromBlock", rootCmd.PersistentFlags().Lookup("poller-force-from-block"))
viper.BindPFlag("poller.untilBlock", rootCmd.PersistentFlags().Lookup("poller-until-block"))
viper.BindPFlag("poller.parallelPollers", rootCmd.PersistentFlags().Lookup("poller-parallel-pollers"))
viper.BindPFlag("poller.s3.endpoint", rootCmd.PersistentFlags().Lookup("poller-s3-endpoint"))
Expand All @@ -282,10 +276,6 @@ func init() {
viper.BindPFlag("reorgHandler.interval", rootCmd.PersistentFlags().Lookup("reorgHandler-interval"))
viper.BindPFlag("reorgHandler.blocksPerScan", rootCmd.PersistentFlags().Lookup("reorgHandler-blocks-per-scan"))
viper.BindPFlag("reorgHandler.fromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-from-block"))
viper.BindPFlag("reorgHandler.forceFromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-force-from-block"))
viper.BindPFlag("failureRecoverer.enabled", rootCmd.PersistentFlags().Lookup("failure-recoverer-enabled"))
viper.BindPFlag("failureRecoverer.blocksPerRun", rootCmd.PersistentFlags().Lookup("failure-recoverer-blocks-per-run"))
viper.BindPFlag("failureRecoverer.interval", rootCmd.PersistentFlags().Lookup("failure-recoverer-interval"))
viper.BindPFlag("storage.staging.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-database"))
viper.BindPFlag("storage.staging.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-host"))
viper.BindPFlag("storage.staging.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-port"))
Expand Down
3 changes: 2 additions & 1 deletion cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/thirdweb-dev/indexer/internal/orchestrator"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
"github.com/thirdweb-dev/indexer/internal/worker"
)

var (
Expand Down Expand Up @@ -58,7 +59,7 @@ func RunValidate(cmd *cobra.Command, args []string) {
log.Fatal().Err(err).Msg("Failed to initialize storage")
}

validator := orchestrator.NewValidator(rpcClient, s)
validator := orchestrator.NewValidator(rpcClient, s, worker.NewWorker(rpcClient))

_, invalidBlocks, err := validator.ValidateBlockRange(startBlock, endBlock)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/validate_and_fix.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
"github.com/thirdweb-dev/indexer/internal/validation"
"github.com/thirdweb-dev/indexer/internal/worker"
)

var (
Expand Down Expand Up @@ -116,7 +117,7 @@ func RunValidateAndFix(cmd *cobra.Command, args []string) {
* Validates a range of blocks (end and start are inclusive) for a given chain and fixes any problems it finds
*/
func validateAndFixRange(rpcClient rpc.IRPCClient, s storage.IStorage, conn clickhouse.Conn, startBlock *big.Int, endBlock *big.Int, fixBatchSize int) error {
validator := orchestrator.NewValidator(rpcClient, s)
validator := orchestrator.NewValidator(rpcClient, s, worker.NewWorker(rpcClient))

chainId := rpcClient.GetChainID()
err := validation.FindAndRemoveDuplicates(conn, chainId, startBlock, endBlock)
Expand Down
Loading
Loading