diff --git a/cmd/root.go b/cmd/root.go index 0140b4cd..81608b8d 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -49,7 +49,7 @@ func init() { rootCmd.PersistentFlags().Int("rpc-traces-batchDelay", 0, "Milliseconds to wait between batches of traces when fetching from the RPC") rootCmd.PersistentFlags().String("log-level", "", "Log level to use for the application") rootCmd.PersistentFlags().Bool("log-prettify", false, "Whether to prettify the log output") - rootCmd.PersistentFlags().Int("poller-parallel-pollers", 5, "Maximum number of parallel pollers") + rootCmd.PersistentFlags().Int("poller-parallel-pollers", 0, "Maximum number of parallel pollers") rootCmd.PersistentFlags().String("poller-s3-bucket", "", "S3 bucket for oller archive source") rootCmd.PersistentFlags().String("poller-s3-region", "", "S3 region for poller archive source") rootCmd.PersistentFlags().String("poller-s3-prefix", "", "S3 prefix for poller archive source") @@ -173,7 +173,6 @@ func init() { rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request") rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request") rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher") - rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel") rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers") rootCmd.PersistentFlags().String("publisher-username", "", "Kafka username for publisher") rootCmd.PersistentFlags().String("publisher-password", "", "Kafka password for publisher") @@ -368,7 +367,6 @@ func init() { viper.BindPFlag("api.contractApiRequest.disableCompression", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-disableCompression")) viper.BindPFlag("api.contractApiRequest.timeout", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-timeout")) viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled")) - viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode")) viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers")) viper.BindPFlag("publisher.username", rootCmd.PersistentFlags().Lookup("publisher-username")) viper.BindPFlag("publisher.password", rootCmd.PersistentFlags().Lookup("publisher-password")) diff --git a/configs/config.go b/configs/config.go index 74f71cdb..5194a700 100644 --- a/configs/config.go +++ b/configs/config.go @@ -227,7 +227,6 @@ type EventPublisherConfig struct { type PublisherConfig struct { Enabled bool `mapstructure:"enabled"` - Mode string `mapstructure:"mode"` Brokers string `mapstructure:"brokers"` Username string `mapstructure:"username"` Password string `mapstructure:"password"` diff --git a/internal/orchestrator/chain_tracker.go b/internal/orchestrator/chain_tracker.go deleted file mode 100644 index 29a8fc7f..00000000 --- a/internal/orchestrator/chain_tracker.go +++ /dev/null @@ -1,53 +0,0 @@ -package orchestrator - -import ( - "context" - "time" - - "github.com/rs/zerolog/log" - "github.com/thirdweb-dev/indexer/internal/metrics" - "github.com/thirdweb-dev/indexer/internal/rpc" -) - -const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 60 * 1000 // 1 minutes - -type ChainTracker struct { - rpc rpc.IRPCClient - triggerIntervalMs int -} - -func NewChainTracker(rpc rpc.IRPCClient) *ChainTracker { - return &ChainTracker{ - rpc: rpc, - triggerIntervalMs: DEFAULT_CHAIN_TRACKER_POLL_INTERVAL, - } -} - -func (ct *ChainTracker) Start(ctx context.Context) { - interval := time.Duration(ct.triggerIntervalMs) * time.Millisecond - ticker := time.NewTicker(interval) - defer ticker.Stop() - - log.Debug().Msgf("Chain tracker running") - ct.trackLatestBlockNumber(ctx) - - for { - select { - case <-ctx.Done(): - log.Info().Msg("Chain tracker shutting down") - return - case <-ticker.C: - ct.trackLatestBlockNumber(ctx) - } - } -} - -func (ct *ChainTracker) trackLatestBlockNumber(ctx context.Context) { - latestBlockNumber, err := ct.rpc.GetLatestBlockNumber(ctx) - if err != nil { - log.Error().Err(err).Msg("Error getting latest block number") - return - } - latestBlockNumberFloat, _ := latestBlockNumber.Float64() - metrics.ChainHead.Set(latestBlockNumberFloat) -} diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 968a021a..f658edc4 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -47,145 +47,88 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller, commitToBlock = -1 } - commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock)) committer := &Committer{ blocksPerCommit: blocksPerCommit, storage: storage, - commitFromBlock: commitFromBlock, + commitFromBlock: big.NewInt(int64(config.Cfg.Committer.FromBlock)), commitToBlock: big.NewInt(int64(commitToBlock)), rpc: rpc, publisher: publisher.GetInstance(), poller: poller, validator: NewValidator(rpc, storage, worker.NewWorker(rpc)), // validator uses worker without sources } - cfb := commitFromBlock.Uint64() - committer.lastCommittedBlock.Store(cfb) - committer.lastPublishedBlock.Store(cfb) for _, opt := range opts { opt(committer) } + if err := committer.initCommittedAndPublishedBlockNumbers(); err != nil { + log.Fatal().Err(err).Msg("Failed to initialize committer block numbers") + } + return committer } func (c *Committer) Start(ctx context.Context) { log.Debug().Msgf("Committer running") - chainID := c.rpc.GetChainID() - latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(chainID) - if err != nil { - // It's okay to fail silently here; this value is only used for staging cleanup and - // the worker loop will eventually correct the state and delete as needed. - log.Error().Msgf("Error getting latest committed block number: %v", err) - } else if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { - c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64()) - } + var wg sync.WaitGroup + wg.Add(2) - // Initialize publisher position - always use max(lastPublished, lastCommitted) to prevent double publishing - lastPublished, err := c.storage.OrchestratorStorage.GetLastPublishedBlockNumber(chainID) - if err != nil { - // It's okay to fail silently here; it's only used for staging cleanup and will be - // corrected by the worker loop. - log.Error().Err(err).Msg("failed to get last published block number") - } else if lastPublished != nil && lastPublished.Sign() > 0 { - // Always ensure publisher starts from at least the committed value - if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { - if lastPublished.Cmp(latestCommittedBlockNumber) < 0 { - gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished) - log.Warn(). - Str("last_published", lastPublished.String()). - Str("latest_committed", latestCommittedBlockNumber.String()). - Str("gap", gap.String()). - Msg("Publisher is behind committed position, seeking forward to committed value") - - c.lastPublishedBlock.Store(latestCommittedBlockNumber.Uint64()) - if err := c.storage.OrchestratorStorage.SetLastPublishedBlockNumber(chainID, latestCommittedBlockNumber); err != nil { - log.Error().Err(err).Msg("Failed to update last published block number after seeking forward") - // Fall back to the stored value on error - c.lastPublishedBlock.Store(lastPublished.Uint64()) - } - } else { - c.lastPublishedBlock.Store(lastPublished.Uint64()) - } - } else { - c.lastPublishedBlock.Store(lastPublished.Uint64()) - } - } else { - c.lastPublishedBlock.Store(c.lastCommittedBlock.Load()) - } - - // Determine the correct publish position - always take the maximum to avoid going backwards - var targetPublishBlock *big.Int - - if lastPublished == nil || lastPublished.Sign() == 0 { - // No previous publish position - if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { - // Start from committed position - targetPublishBlock = latestCommittedBlockNumber - } else if c.commitFromBlock.Sign() > 0 { - // Start from configured position minus 1 (since we publish from next block) - targetPublishBlock = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) - } else { - // Start from 0 - targetPublishBlock = big.NewInt(0) - } + go func() { + defer wg.Done() + c.runPublishLoop(ctx) + }() - log.Info(). - Str("target_publish_block", targetPublishBlock.String()). - Msg("No previous publish position, initializing publisher cursor") - } else { - // We have a previous position - targetPublishBlock = lastPublished - } - - // Only update storage if we're changing the position - if lastPublished == nil || targetPublishBlock.Cmp(lastPublished) != 0 { - if err := c.storage.OrchestratorStorage.SetLastPublishedBlockNumber(chainID, targetPublishBlock); err != nil { - log.Error().Err(err).Msg("Failed to update published block number in storage") - // If we can't update storage, use what was there originally to avoid issues - if lastPublished != nil { - targetPublishBlock = lastPublished - } - } - } + go func() { + defer wg.Done() + c.runCommitLoop(ctx) + }() - // Store in memory for quick acess - c.lastPublishedBlock.Store(targetPublishBlock.Uint64()) + <-ctx.Done() - log.Info(). - Str("publish_from", targetPublishBlock.String()). - Str("committed_at", func() string { - if latestCommittedBlockNumber != nil { - return latestCommittedBlockNumber.String() - } - return "0" - }()). - Msg("Publisher initialized") + wg.Wait() - if config.Cfg.Publisher.Mode == "parallel" { - var wg sync.WaitGroup - wg.Add(2) + log.Info().Msg("Committer shutting down") + c.publisher.Close() +} - go func() { - defer wg.Done() - c.runPublishLoop(ctx) - }() +func (c *Committer) initCommittedAndPublishedBlockNumbers() error { + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) + if err != nil { + return err + } - go func() { - defer wg.Done() - c.runCommitLoop(ctx) - }() + if latestCommittedBlockNumber == nil { + latestCommittedBlockNumber = new(big.Int).SetUint64(0) + } - <-ctx.Done() + if c.commitFromBlock.Sign() > 0 && latestCommittedBlockNumber.Cmp(c.commitFromBlock) < 0 { + latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) + } + c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64()) - wg.Wait() - } else { - c.runCommitLoop(ctx) + // Initialize published block number + lastPublished, err := c.storage.OrchestratorStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID()) + if err != nil { + return err } - log.Info().Msg("Committer shutting down") - c.publisher.Close() + if lastPublished == nil { + lastPublished = new(big.Int).SetUint64(0) + } + + // If the last published block is not initialized yet, set it to the last committed block number + if lastPublished.Sign() == 0 && c.lastCommittedBlock.Load() > 0 { + lastPublished = new(big.Int).SetUint64(c.lastCommittedBlock.Load()) + + if err := c.storage.OrchestratorStorage.SetLastPublishedBlockNumber(c.rpc.GetChainID(), lastPublished); err != nil { + return err + } + } + c.lastPublishedBlock.Store(lastPublished.Uint64()) + + return nil } func (c *Committer) runCommitLoop(ctx context.Context) { @@ -199,16 +142,7 @@ func (c *Committer) runCommitLoop(ctx context.Context) { log.Info().Msgf("Committer reached configured toBlock %s, the last commit block is %d, stopping commits", c.commitToBlock.String(), c.lastCommittedBlock.Load()) return } - blockDataToCommit, err := c.getSequentialBlockDataToCommit(ctx) - if err != nil { - log.Error().Err(err).Msg("Error getting block data to commit") - continue - } - if len(blockDataToCommit) == 0 { - log.Debug().Msg("No block data to commit") - continue - } - if err := c.commit(ctx, blockDataToCommit); err != nil { + if err := c.commit(ctx); err != nil { log.Error().Err(err).Msg("Error committing blocks") } go c.cleanupProcessedStagingBlocks(ctx) @@ -283,40 +217,17 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er if err != nil { return nil, err } - if latestCommittedBlockNumber == nil { - latestCommittedBlockNumber = new(big.Int).SetUint64(0) - } - log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String()) - if latestCommittedBlockNumber.Sign() == 0 { - // If no blocks have been committed yet, start from the fromBlock specified in the config - latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) - } else { - lastCommitted := new(big.Int).SetUint64(c.lastCommittedBlock.Load()) - if latestCommittedBlockNumber.Cmp(lastCommitted) < 0 { - log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), lastCommitted.String()) - return []*big.Int{}, nil - } + if latestCommittedBlockNumber == nil || c.lastCommittedBlock.Load() != latestCommittedBlockNumber.Uint64() { + log.Fatal().Msgf("Inconsistent last committed block state between memory (%d) and storage (%v)", c.lastCommittedBlock.Load(), latestCommittedBlockNumber) + return nil, fmt.Errorf("last committed block number is not initialized correctly") } - startBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(1)) - endBlock, err := c.getBlockToCommitUntil(ctx, latestCommittedBlockNumber) + blockNumbers, err := c.getBlockRange(ctx, latestCommittedBlockNumber) if err != nil { - return nil, fmt.Errorf("error getting block to commit until: %v", err) + return nil, fmt.Errorf("failed to get block range to commit: %v", err) } - blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1 - if blockCount < 0 { - return []*big.Int{}, fmt.Errorf("more blocks have been committed than the RPC has available - possible chain reset") - } - if blockCount == 0 { - return []*big.Int{}, nil - } - blockNumbers := make([]*big.Int, blockCount) - for i := int64(0); i < blockCount; i++ { - blockNumber := new(big.Int).Add(startBlock, big.NewInt(i)) - blockNumbers[i] = blockNumber - } return blockNumbers, nil } @@ -327,46 +238,25 @@ func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, e return nil, fmt.Errorf("failed to get last published block number: %v", err) } - // This should never happen after Start() has run, but handle it defensively - if latestPublishedBlockNumber == nil || latestPublishedBlockNumber.Sign() == 0 { - // Fall back to in-memory value which was set during Start - latestPublishedBlockNumber = new(big.Int).SetUint64(c.lastPublishedBlock.Load()) - log.Warn(). - Str("fallback_value", latestPublishedBlockNumber.String()). - Msg("Storage returned nil/0 for last published block, using in-memory value") + if latestPublishedBlockNumber == nil || c.lastPublishedBlock.Load() != latestPublishedBlockNumber.Uint64() { + log.Fatal().Msgf("Inconsistent last published block state between memory (%d) and storage (%v)", c.lastPublishedBlock.Load(), latestPublishedBlockNumber) + return nil, fmt.Errorf("last published block number is not initialized correctly") } - log.Debug(). - Str("last_published", latestPublishedBlockNumber.String()). - Msg("Determining blocks to publish") - - startBlock := new(big.Int).Add(latestPublishedBlockNumber, big.NewInt(1)) - endBlock, err := c.getBlockToCommitUntil(ctx, latestPublishedBlockNumber) + blockNumbers, err := c.getBlockRange(ctx, latestPublishedBlockNumber) if err != nil { - return nil, fmt.Errorf("error getting block to commit until: %v", err) + return nil, fmt.Errorf("failed to get block range to publish: %v", err) } - blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1 - if blockCount < 0 { - return []*big.Int{}, fmt.Errorf("more blocks have been committed than the RPC has available - possible chain reset") - } - if blockCount == 0 { - return []*big.Int{}, nil - } - blockNumbers := make([]*big.Int, blockCount) - for i := int64(0); i < blockCount; i++ { - blockNumber := new(big.Int).Add(startBlock, big.NewInt(i)) - blockNumbers[i] = blockNumber - } return blockNumbers, nil } -func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBlockNumber *big.Int) (*big.Int, error) { - untilBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(int64(c.blocksPerCommit))) +func (c *Committer) getBlockRange(ctx context.Context, lastBlockNumber *big.Int) ([]*big.Int, error) { + endBlock := new(big.Int).Add(lastBlockNumber, big.NewInt(int64(c.blocksPerCommit))) // If a commit until block is set, then set a limit on the commit until block - if c.commitToBlock.Sign() > 0 && untilBlock.Cmp(c.commitToBlock) > 0 { - return new(big.Int).Set(c.commitToBlock), nil + if c.commitToBlock.Sign() > 0 && endBlock.Cmp(c.commitToBlock) > 0 { + endBlock = new(big.Int).Set(c.commitToBlock) } // get latest block from RPC and if that's less than until block, return that @@ -375,12 +265,25 @@ func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBl return nil, fmt.Errorf("error getting latest block from RPC: %v", err) } - if latestBlock.Cmp(untilBlock) < 0 { - log.Debug().Msgf("Committing until latest block: %s", latestBlock.String()) - return latestBlock, nil + if latestBlock.Cmp(endBlock) < 0 { + endBlock = new(big.Int).Set(latestBlock) } - return untilBlock, nil + startBlock := new(big.Int).Add(lastBlockNumber, big.NewInt(1)) + blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1 + if blockCount < 0 { + return []*big.Int{}, fmt.Errorf("more blocks have been committed than the RPC has available - possible chain reset") + } + if blockCount == 0 { + return []*big.Int{}, nil + } + + blockNumbers := make([]*big.Int, blockCount) + for i := int64(0); i < blockCount; i++ { + blockNumber := new(big.Int).Add(startBlock, big.NewInt(i)) + blockNumbers[i] = blockNumber + } + return blockNumbers, nil } func (c *Committer) fetchBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) { @@ -447,30 +350,16 @@ func (c *Committer) getSequentialBlockData(ctx context.Context, blockNumbers []* return sequentialBlockData, nil } -func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]common.BlockData, error) { - blocksToCommit, err := c.getBlockNumbersToCommit(ctx) - if err != nil { - return nil, fmt.Errorf("error determining blocks to commit: %v", err) - } - if len(blocksToCommit) == 0 { - return nil, nil - } - return c.getSequentialBlockData(ctx, blocksToCommit) -} - -func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) { - blocksToPublish, err := c.getBlockNumbersToPublish(ctx) +func (c *Committer) publish(ctx context.Context) error { + blockNumbersToPublish, err := c.getBlockNumbersToPublish(ctx) if err != nil { - return nil, fmt.Errorf("error determining blocks to publish: %v", err) + return fmt.Errorf("error determining blocks to publish: %v", err) } - if len(blocksToPublish) == 0 { - return nil, nil + if len(blockNumbersToPublish) == 0 { + return nil } - return c.getSequentialBlockData(ctx, blocksToPublish) -} -func (c *Committer) publish(ctx context.Context) error { - blockData, err := c.getSequentialBlockDataToPublish(ctx) + blockData, err := c.getSequentialBlockData(ctx, blockNumbersToPublish) if err != nil { return err } @@ -479,48 +368,51 @@ func (c *Committer) publish(ctx context.Context) error { } if err := c.publisher.PublishBlockData(blockData); err != nil { + log.Error().Err(err).Msgf("Failed to publish blocks: %v", blockNumbersToPublish) return err } chainID := c.rpc.GetChainID() highest := blockData[len(blockData)-1].Block.Number if err := c.storage.OrchestratorStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil { + log.Error().Err(err).Msgf("Failed to update last published block number to %s", highest.String()) return err } + c.lastPublishedBlock.Store(highest.Uint64()) return nil } -func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error { - blockNumbers := make([]*big.Int, len(blockData)) - highestBlock := blockData[0].Block - for i, block := range blockData { - blockNumbers[i] = block.Block.Number - if block.Block.Number.Cmp(highestBlock.Number) > 0 { - highestBlock = block.Block - } +func (c *Committer) commit(ctx context.Context) error { + blockNumbersToCommit, err := c.getBlockNumbersToCommit(ctx) + if err != nil { + return fmt.Errorf("error determining blocks to commit: %v", err) } - log.Debug().Msgf("Committing %d blocks from %s to %s", len(blockNumbers), blockNumbers[0].String(), blockNumbers[len(blockNumbers)-1].String()) + if len(blockNumbersToCommit) == 0 { + return nil + } + + blockData, err := c.getSequentialBlockData(ctx, blockNumbersToCommit) + if err != nil { + log.Error().Err(err).Msg("Error getting block data to commit") + return err + } + if len(blockData) == 0 { + return nil + } + + highestBlock := blockData[len(blockData)-1].Block + + log.Debug().Msgf("Committing %d blocks from %s to %s", len(blockData), blockData[0].Block.Number.String(), highestBlock.Number.String()) mainStorageStart := time.Now() if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil { - log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers) + log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbersToCommit) return fmt.Errorf("error saving data to main storage: %v", err) } log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds()) metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds()) - if config.Cfg.Publisher.Mode == "default" { - highest := highestBlock.Number.Uint64() - go func() { - if err := c.publisher.PublishBlockData(blockData); err != nil { - log.Error().Err(err).Msg("Failed to publish block data to kafka") - return - } - c.lastPublishedBlock.Store(highest) - }() - } - c.lastCommittedBlock.Store(highestBlock.Number.Uint64()) // Update metrics for successful commits diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 13ec47e6..00de1839 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -87,16 +87,6 @@ func (o *Orchestrator) Start() { }() } - // The chain tracker is always running - o.wg.Add(1) - go func() { - defer o.wg.Done() - chainTracker := NewChainTracker(o.rpc) - chainTracker.Start(ctx) - - log.Info().Msg("Chain tracker completed") - }() - // Waiting for all goroutines to complete o.wg.Wait() diff --git a/internal/orchestrator/poller.go b/internal/orchestrator/poller.go index 93f0fc3a..2c6a8b3f 100644 --- a/internal/orchestrator/poller.go +++ b/internal/orchestrator/poller.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "runtime" "sync" "time" @@ -59,7 +60,10 @@ func WithPollerWorker(cfg *worker.Worker) PollerOption { func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage, opts ...PollerOption) *Poller { parallelPollers := config.Cfg.Poller.ParallelPollers if parallelPollers == 0 { - parallelPollers = DEFAULT_PARALLEL_POLLERS + parallelPollers = runtime.GOMAXPROCS(0) + if parallelPollers < 1 { + parallelPollers = 1 + } } // Set the lookahead -> number of pollers + 2 diff --git a/internal/orchestrator/work_mode_monitor.go b/internal/orchestrator/work_mode_monitor.go deleted file mode 100644 index ee75c04e..00000000 --- a/internal/orchestrator/work_mode_monitor.go +++ /dev/null @@ -1,160 +0,0 @@ -package orchestrator - -import ( - "context" - "math/big" - "sync" - "time" - - "github.com/rs/zerolog/log" - "github.com/thirdweb-dev/indexer/internal/metrics" - "github.com/thirdweb-dev/indexer/internal/rpc" - "github.com/thirdweb-dev/indexer/internal/storage" -) - -type WorkMode string - -const ( - DEFAULT_WORK_MODE_CHECK_INTERVAL = 10 - DEFAULT_LIVE_MODE_THRESHOLD = 500 - WorkModeLive WorkMode = "live" - WorkModeBackfill WorkMode = "backfill" -) - -type WorkModeMonitor struct { - rpc rpc.IRPCClient - storage storage.IStorage - workModeChannels map[chan WorkMode]struct{} - channelsMutex sync.RWMutex - currentMode WorkMode - checkInterval time.Duration - liveModeThreshold *big.Int -} - -func NewWorkModeMonitor(rpc rpc.IRPCClient, storage storage.IStorage) *WorkModeMonitor { - checkInterval := DEFAULT_WORK_MODE_CHECK_INTERVAL - liveModeThreshold := DEFAULT_LIVE_MODE_THRESHOLD - log.Info().Msgf("Work mode monitor initialized with check interval %d and live mode threshold %d", checkInterval, liveModeThreshold) - return &WorkModeMonitor{ - rpc: rpc, - storage: storage, - workModeChannels: make(map[chan WorkMode]struct{}), - currentMode: "", - checkInterval: time.Duration(checkInterval) * time.Minute, - liveModeThreshold: big.NewInt(int64(liveModeThreshold)), - } -} - -// RegisterChannel adds a new channel to receive work mode updates -func (m *WorkModeMonitor) RegisterChannel(ch chan WorkMode) { - m.channelsMutex.Lock() - defer m.channelsMutex.Unlock() - - m.workModeChannels[ch] = struct{}{} - // Send current mode to the new channel only if it's not empty - if m.currentMode != "" { - select { - case ch <- m.currentMode: - log.Debug().Msg("Initial work mode sent to new channel") - default: - log.Warn().Msg("Failed to send initial work mode to new channel - channel full") - } - } -} - -// UnregisterChannel removes a channel from receiving work mode updates -func (m *WorkModeMonitor) UnregisterChannel(ch chan WorkMode) { - m.channelsMutex.Lock() - defer m.channelsMutex.Unlock() - - delete(m.workModeChannels, ch) -} - -func (m *WorkModeMonitor) updateWorkModeMetric(mode WorkMode) { - var value float64 - if mode == WorkModeLive { - value = 1 - } - metrics.CurrentWorkMode.Set(value) -} - -func (m *WorkModeMonitor) Start(ctx context.Context) { - // Perform immediate check - newMode, err := m.determineWorkMode(ctx) - if err != nil { - log.Error().Err(err).Msg("Error checking work mode during startup") - } else if newMode != m.currentMode { - log.Info().Msgf("Work mode changing from %s to %s during startup", m.currentMode, newMode) - m.currentMode = newMode - m.updateWorkModeMetric(newMode) - m.broadcastWorkMode(newMode) - } - - ticker := time.NewTicker(m.checkInterval) - defer ticker.Stop() - - log.Info().Msgf("Work mode monitor started with initial mode: %s", m.currentMode) - - for { - select { - case <-ctx.Done(): - log.Info().Msg("Work mode monitor shutting down") - return - case <-ticker.C: - newMode, err := m.determineWorkMode(ctx) - if err != nil { - log.Error().Err(err).Msg("Error checking work mode") - continue - } - - if newMode != m.currentMode { - log.Info().Msgf("Work mode changing from %s to %s", m.currentMode, newMode) - m.currentMode = newMode - m.updateWorkModeMetric(newMode) - m.broadcastWorkMode(newMode) - } - } - } -} - -func (m *WorkModeMonitor) broadcastWorkMode(mode WorkMode) { - m.channelsMutex.RLock() - defer m.channelsMutex.RUnlock() - - for ch := range m.workModeChannels { - select { - case ch <- mode: - log.Debug().Msg("Work mode change notification sent") - default: - if r := recover(); r != nil { - log.Warn().Msg("Work mode notification dropped - channel closed") - delete(m.workModeChannels, ch) - } - } - } -} - -func (m *WorkModeMonitor) determineWorkMode(ctx context.Context) (WorkMode, error) { - lastCommittedBlock, err := m.storage.MainStorage.GetMaxBlockNumber(m.rpc.GetChainID()) - if err != nil { - return "", err - } - - if lastCommittedBlock.Sign() == 0 { - log.Debug().Msg("No blocks committed yet, using backfill mode") - return WorkModeBackfill, nil - } - - latestBlock, err := m.rpc.GetLatestBlockNumber(ctx) - if err != nil { - return "", err - } - - blockDiff := new(big.Int).Sub(latestBlock, lastCommittedBlock) - log.Debug().Msgf("Committer is %d blocks behind the chain", blockDiff.Int64()) - if blockDiff.Cmp(m.liveModeThreshold) < 0 { - return WorkModeLive, nil - } - - return WorkModeBackfill, nil -} diff --git a/internal/rpc/rpc.go b/internal/rpc/rpc.go index d1484185..939a1a68 100644 --- a/internal/rpc/rpc.go +++ b/internal/rpc/rpc.go @@ -13,6 +13,7 @@ import ( "github.com/rs/zerolog/log" config "github.com/thirdweb-dev/indexer/configs" "github.com/thirdweb-dev/indexer/internal/common" + "github.com/thirdweb-dev/indexer/internal/metrics" ) type GetFullBlockResult struct { @@ -305,6 +306,7 @@ func (rpc *Client) GetLatestBlockNumber(ctx context.Context) (*big.Int, error) { if err != nil { return nil, fmt.Errorf("failed to get latest block number: %v", err) } + metrics.ChainHead.Set(float64(blockNumber)) return new(big.Int).SetUint64(blockNumber), nil }