diff --git a/cmd/root.go b/cmd/root.go index a679734..2a9ac47 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -65,7 +65,6 @@ func init() { rootCmd.PersistentFlags().Int("poller-s3-maxConcurrentDownloads", 3, "Max concurrent downloads for poller archive source") rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer") rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval") - rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds") rootCmd.PersistentFlags().Int("committer-from-block", 0, "From which block to start committing") rootCmd.PersistentFlags().Int("committer-to-block", 0, "To which block to commit") rootCmd.PersistentFlags().Bool("reorgHandler-enabled", true, "Toggle reorg handler") @@ -191,8 +190,6 @@ func init() { rootCmd.PersistentFlags().String("publisher-events-topicName", "", "Kafka topic name for events") rootCmd.PersistentFlags().String("publisher-events-addressFilter", "", "Filter events by address") rootCmd.PersistentFlags().String("publisher-events-topic0Filter", "", "Filter events by topic0") - rootCmd.PersistentFlags().Int("workMode-checkIntervalMinutes", 10, "How often to check work mode in minutes") - rootCmd.PersistentFlags().Int64("workMode-liveModeThreshold", 500, "How many blocks the indexer can be behind before switching to live mode") rootCmd.PersistentFlags().String("validation-mode", "strict", "Validation mode. Strict will validate logsBloom and transactionsRoot. Minimal will validate transaction count and logs existence.") rootCmd.PersistentFlags().String("migrator-destination-type", "auto", "Storage type for migrator destination (auto, clickhouse, postgres, kafka, badger, pebble, s3)") rootCmd.PersistentFlags().String("migrator-destination-clickhouse-host", "", "Clickhouse host for migrator destination") @@ -263,7 +260,6 @@ func init() { viper.BindPFlag("poller.s3.maxConcurrentDownloads", rootCmd.PersistentFlags().Lookup("poller-s3-maxConcurrentDownloads")) viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled")) viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit")) - viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval")) viper.BindPFlag("committer.fromBlock", rootCmd.PersistentFlags().Lookup("committer-from-block")) viper.BindPFlag("committer.toBlock", rootCmd.PersistentFlags().Lookup("committer-to-block")) viper.BindPFlag("reorgHandler.enabled", rootCmd.PersistentFlags().Lookup("reorgHandler-enabled")) @@ -389,8 +385,6 @@ func init() { viper.BindPFlag("publisher.events.topicName", rootCmd.PersistentFlags().Lookup("publisher-events-topicName")) viper.BindPFlag("publisher.events.addressFilter", rootCmd.PersistentFlags().Lookup("publisher-events-addressFilter")) viper.BindPFlag("publisher.events.topic0Filter", rootCmd.PersistentFlags().Lookup("publisher-events-topic0Filter")) - viper.BindPFlag("workMode.checkIntervalMinutes", rootCmd.PersistentFlags().Lookup("workMode-checkIntervalMinutes")) - viper.BindPFlag("workMode.liveModeThreshold", rootCmd.PersistentFlags().Lookup("workMode-liveModeThreshold")) viper.BindPFlag("validation.mode", rootCmd.PersistentFlags().Lookup("validation-mode")) // Migrator viper bindings viper.BindPFlag("migrator.destination.type", rootCmd.PersistentFlags().Lookup("migrator-destination-type")) diff --git a/configs/config.go b/configs/config.go index 0e60f47..6b5c10e 100644 --- a/configs/config.go +++ b/configs/config.go @@ -24,7 +24,6 @@ type PollerConfig struct { type CommitterConfig struct { Enabled bool `mapstructure:"enabled"` - Interval int `mapstructure:"interval"` BlocksPerCommit int `mapstructure:"blocksPerCommit"` FromBlock int `mapstructure:"fromBlock"` ToBlock int `mapstructure:"toBlock"` @@ -38,12 +37,6 @@ type ReorgHandlerConfig struct { ForceFromBlock bool `mapstructure:"forceFromBlock"` } -type FailureRecovererConfig struct { - Enabled bool `mapstructure:"enabled"` - Interval int `mapstructure:"interval"` - BlocksPerRun int `mapstructure:"blocksPerRun"` -} - type StorageConfig struct { Orchestrator StorageOrchestratorConfig `mapstructure:"orchestrator"` Staging StorageStagingConfig `mapstructure:"staging"` @@ -254,11 +247,6 @@ type S3SourceConfig struct { MaxConcurrentDownloads int `mapstructure:"maxConcurrentDownloads"` } -type WorkModeConfig struct { - CheckIntervalMinutes int `mapstructure:"checkIntervalMinutes"` - LiveModeThreshold int64 `mapstructure:"liveModeThreshold"` -} - type ValidationConfig struct { Mode string `mapstructure:"mode"` // "disabled", "minimal", "strict" } @@ -272,18 +260,16 @@ type MigratorConfig struct { } type Config struct { - RPC RPCConfig `mapstructure:"rpc"` - Log LogConfig `mapstructure:"log"` - Poller PollerConfig `mapstructure:"poller"` - Committer CommitterConfig `mapstructure:"committer"` - FailureRecoverer FailureRecovererConfig `mapstructure:"failureRecoverer"` - ReorgHandler ReorgHandlerConfig `mapstructure:"reorgHandler"` - Storage StorageConfig `mapstructure:"storage"` - API APIConfig `mapstructure:"api"` - Publisher PublisherConfig `mapstructure:"publisher"` - WorkMode WorkModeConfig `mapstructure:"workMode"` - Validation ValidationConfig `mapstructure:"validation"` - Migrator MigratorConfig `mapstructure:"migrator"` + RPC RPCConfig `mapstructure:"rpc"` + Log LogConfig `mapstructure:"log"` + Poller PollerConfig `mapstructure:"poller"` + Committer CommitterConfig `mapstructure:"committer"` + ReorgHandler ReorgHandlerConfig `mapstructure:"reorgHandler"` + Storage StorageConfig `mapstructure:"storage"` + API APIConfig `mapstructure:"api"` + Publisher PublisherConfig `mapstructure:"publisher"` + Validation ValidationConfig `mapstructure:"validation"` + Migrator MigratorConfig `mapstructure:"migrator"` } var Cfg Config diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index a485bae..968a021 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -19,11 +19,9 @@ import ( "github.com/thirdweb-dev/indexer/internal/worker" ) -const DEFAULT_COMMITTER_TRIGGER_INTERVAL = 2000 const DEFAULT_BLOCKS_PER_COMMIT = 1000 type Committer struct { - triggerIntervalMs int blocksPerCommit int storage storage.IStorage commitFromBlock *big.Int @@ -39,11 +37,6 @@ type Committer struct { type CommitterOption func(*Committer) func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller, opts ...CommitterOption) *Committer { - triggerInterval := config.Cfg.Committer.Interval - if triggerInterval == 0 { - triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL - } - blocksPerCommit := config.Cfg.Committer.BlocksPerCommit if blocksPerCommit == 0 { blocksPerCommit = DEFAULT_BLOCKS_PER_COMMIT @@ -56,15 +49,14 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller, commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock)) committer := &Committer{ - triggerIntervalMs: triggerInterval, - blocksPerCommit: blocksPerCommit, - storage: storage, - commitFromBlock: commitFromBlock, - commitToBlock: big.NewInt(int64(commitToBlock)), - rpc: rpc, - publisher: publisher.GetInstance(), - poller: poller, - validator: NewValidator(rpc, storage, worker.NewWorker(rpc)), // validator uses worker without sources + blocksPerCommit: blocksPerCommit, + storage: storage, + commitFromBlock: commitFromBlock, + commitToBlock: big.NewInt(int64(commitToBlock)), + rpc: rpc, + publisher: publisher.GetInstance(), + poller: poller, + validator: NewValidator(rpc, storage, worker.NewWorker(rpc)), // validator uses worker without sources } cfb := commitFromBlock.Uint64() committer.lastCommittedBlock.Store(cfb) @@ -78,8 +70,6 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller, } func (c *Committer) Start(ctx context.Context) { - interval := time.Duration(c.triggerIntervalMs) * time.Millisecond - log.Debug().Msgf("Committer running") chainID := c.rpc.GetChainID() @@ -175,40 +165,35 @@ func (c *Committer) Start(ctx context.Context) { if config.Cfg.Publisher.Mode == "parallel" { var wg sync.WaitGroup - publishInterval := interval / 2 - if publishInterval <= 0 { - publishInterval = interval - } wg.Add(2) + go func() { defer wg.Done() - c.runPublishLoop(ctx, publishInterval) + c.runPublishLoop(ctx) }() - // allow the publisher to start before the committer - time.Sleep(publishInterval) go func() { defer wg.Done() - c.runCommitLoop(ctx, interval) + c.runCommitLoop(ctx) }() <-ctx.Done() + wg.Wait() } else { - c.runCommitLoop(ctx, interval) + c.runCommitLoop(ctx) } log.Info().Msg("Committer shutting down") c.publisher.Close() } -func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) { +func (c *Committer) runCommitLoop(ctx context.Context) { for { select { case <-ctx.Done(): return default: - time.Sleep(interval) if c.commitToBlock.Sign() > 0 && c.lastCommittedBlock.Load() >= c.commitToBlock.Uint64() { // Completing the commit loop if we've committed more than commit to block log.Info().Msgf("Committer reached configured toBlock %s, the last commit block is %d, stopping commits", c.commitToBlock.String(), c.lastCommittedBlock.Load()) @@ -231,13 +216,17 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) { } } -func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) { +func (c *Committer) runPublishLoop(ctx context.Context) { for { select { case <-ctx.Done(): return default: - time.Sleep(interval) + if c.commitToBlock.Sign() > 0 && c.lastPublishedBlock.Load() >= c.commitToBlock.Uint64() { + // Completing the publish loop if we've published more than commit to block + log.Info().Msgf("Committer reached configured toBlock %s, the last publish block is %d, stopping publishes", c.commitToBlock.String(), c.lastPublishedBlock.Load()) + return + } if err := c.publish(ctx); err != nil { log.Error().Err(err).Msg("Error publishing blocks") } @@ -397,8 +386,8 @@ func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBl func (c *Committer) fetchBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) { blocksData := c.poller.Request(ctx, blockNumbers) if len(blocksData) == 0 { - // TODO: should wait a little bit, as it may take time to load - log.Warn().Msgf("Committer didn't find the following range: %v - %v", blockNumbers[0].Int64(), blockNumbers[len(blockNumbers)-1].Int64()) + log.Warn().Msgf("Committer didn't find the following range: %v - %v. %v", blockNumbers[0].Int64(), blockNumbers[len(blockNumbers)-1].Int64(), c.poller.GetPollerStatus()) + time.Sleep(500 * time.Millisecond) // TODO: wait for block time return nil, nil } return blocksData, nil diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index 2aa19a2..a8b18ad 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -1,92 +1 @@ package orchestrator - -import ( - "context" - "math/big" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/thirdweb-dev/indexer/internal/rpc" - "github.com/thirdweb-dev/indexer/internal/storage" - mocks "github.com/thirdweb-dev/indexer/test/mocks" -) - -func TestNewCommitter(t *testing.T) { - mockRPC := mocks.NewMockIRPCClient(t) - mockMainStorage := mocks.NewMockIMainStorage(t) - mockStagingStorage := mocks.NewMockIStagingStorage(t) - - mockStorage := storage.IStorage{ - MainStorage: mockMainStorage, - StagingStorage: mockStagingStorage, - } - - // Mock the GetBlocksPerRequest call that happens in NewWorker - mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100}) - - poller := &Poller{} - committer := NewCommitter(mockRPC, mockStorage, poller) - - assert.NotNil(t, committer) - assert.Equal(t, DEFAULT_COMMITTER_TRIGGER_INTERVAL, committer.triggerIntervalMs) - assert.Equal(t, DEFAULT_BLOCKS_PER_COMMIT, committer.blocksPerCommit) -} - -// Removed - test needs to be updated for new implementation - -// Removed - test needs to be updated for new implementation - -// Removed - test needs to be updated for new implementation - -// Removed - test needs to be updated for new implementation - -// Removed - test needs to be updated for new implementation - -// Removed - test needs to be updated for new implementation - -// Removed - test needs to be updated for new implementation - -// Removed - test needs to be updated for new implementation - -// Removed - test needs to be updated for new implementation - -// Removed - test needs to be updated for new implementation - -// Removed - test needs to be updated for new implementation - -// Removed - test needs to be updated for new implementation - -func TestCleanupProcessedStagingBlocks(t *testing.T) { - mockRPC := mocks.NewMockIRPCClient(t) - mockMainStorage := mocks.NewMockIMainStorage(t) - mockStagingStorage := mocks.NewMockIStagingStorage(t) - mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) - mockStorage := storage.IStorage{ - MainStorage: mockMainStorage, - StagingStorage: mockStagingStorage, - OrchestratorStorage: mockOrchestratorStorage, - } - - // Mock the GetBlocksPerRequest call that happens in NewWorker - mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100}) - - poller := &Poller{} - committer := NewCommitter(mockRPC, mockStorage, poller) - - chainID := big.NewInt(1) - committer.lastCommittedBlock.Store(100) - committer.lastPublishedBlock.Store(0) - - ctx := context.Background() - committer.cleanupProcessedStagingBlocks(ctx) - mockStagingStorage.AssertNotCalled(t, "DeleteStagingDataOlderThan", mock.Anything, mock.Anything) - - committer.lastPublishedBlock.Store(90) - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().DeleteStagingDataOlderThan(chainID, big.NewInt(90)).Return(nil) - committer.cleanupProcessedStagingBlocks(ctx) -} - -func TestStartCommitter(t *testing.T) { -} diff --git a/internal/orchestrator/poller.go b/internal/orchestrator/poller.go index d9f6529..93f0fc3 100644 --- a/internal/orchestrator/poller.go +++ b/internal/orchestrator/poller.go @@ -203,7 +203,6 @@ func (p *Poller) Request(ctx context.Context, blockNumbers []*big.Int) []common. if len(blockData) > 0 { go p.triggerLookahead(endBlock, int64(len(blockNumbers))) } - return blockData } @@ -309,7 +308,7 @@ func (p *Poller) processBatch(blockNumbers []*big.Int) { if err != ErrBlocksProcessing && err != ErrNoNewBlocks { if len(blockNumbers) > 0 { startBlock, endBlock := blockNumbers[0], blockNumbers[len(blockNumbers)-1] - log.Debug().Err(err).Msgf("Failed to poll blocks %s - %s", startBlock.String(), endBlock.String()) + log.Error().Err(err).Msgf("Failed to poll blocks %s - %s", startBlock.String(), endBlock.String()) } } return @@ -435,7 +434,7 @@ func (p *Poller) unmarkRangeAsProcessing(rangeKey string) { delete(p.processingRanges, rangeKey) } -// waitForRange waits for a range to finish processing +// waitForRange waits for a range to finish processing with a timeout func (p *Poller) waitForRange(rangeKey string) bool { p.processingRangesMutex.Lock() @@ -451,7 +450,7 @@ func (p *Poller) waitForRange(rangeKey string) bool { p.processingRanges[rangeKey] = append(p.processingRanges[rangeKey], waitChan) p.processingRangesMutex.Unlock() - // Wait for the range to complete or context cancellation + // Wait for the range to complete, timeout, or context cancellation select { case <-waitChan: log.Debug().Msgf("Got notification for range %s processing completed", rangeKey) @@ -460,3 +459,43 @@ func (p *Poller) waitForRange(rangeKey string) bool { return false // Context cancelled } } + +// GetProcessingRanges returns a list of ranges currently being processed (for diagnostics) +func (p *Poller) GetProcessingRanges() []string { + p.processingRangesMutex.RLock() + defer p.processingRangesMutex.RUnlock() + + ranges := make([]string, 0, len(p.processingRanges)) + for rangeKey, waiters := range p.processingRanges { + ranges = append(ranges, fmt.Sprintf("%s (waiters: %d)", rangeKey, len(waiters))) + } + return ranges +} + +// GetQueuedRanges returns a list of ranges currently queued for processing (for diagnostics) +func (p *Poller) GetQueuedRanges() []string { + p.queuedRangesMutex.RLock() + defer p.queuedRangesMutex.RUnlock() + + ranges := make([]string, 0, len(p.queuedRanges)) + for rangeKey := range p.queuedRanges { + ranges = append(ranges, rangeKey) + } + return ranges +} + +// GetPollerStatus returns diagnostic information about the poller's current state +func (p *Poller) GetPollerStatus() map[string]interface{} { + p.lastPolledBlockMutex.RLock() + lastPolled := p.lastPolledBlock.String() + p.lastPolledBlockMutex.RUnlock() + + return map[string]interface{}{ + "last_polled_block": lastPolled, + "processing_ranges": p.GetProcessingRanges(), + "queued_ranges": p.GetQueuedRanges(), + "task_queue_size": len(p.tasks), + "task_queue_cap": cap(p.tasks), + "parallel_pollers": p.parallelPollers, + } +} diff --git a/internal/orchestrator/work_mode_monitor.go b/internal/orchestrator/work_mode_monitor.go index 355dd4a..ee75c04 100644 --- a/internal/orchestrator/work_mode_monitor.go +++ b/internal/orchestrator/work_mode_monitor.go @@ -7,7 +7,6 @@ import ( "time" "github.com/rs/zerolog/log" - config "github.com/thirdweb-dev/indexer/configs" "github.com/thirdweb-dev/indexer/internal/metrics" "github.com/thirdweb-dev/indexer/internal/rpc" "github.com/thirdweb-dev/indexer/internal/storage" @@ -33,14 +32,8 @@ type WorkModeMonitor struct { } func NewWorkModeMonitor(rpc rpc.IRPCClient, storage storage.IStorage) *WorkModeMonitor { - checkInterval := config.Cfg.WorkMode.CheckIntervalMinutes - if checkInterval < 1 { - checkInterval = DEFAULT_WORK_MODE_CHECK_INTERVAL - } - liveModeThreshold := config.Cfg.WorkMode.LiveModeThreshold - if liveModeThreshold < 1 { - liveModeThreshold = DEFAULT_LIVE_MODE_THRESHOLD - } + checkInterval := DEFAULT_WORK_MODE_CHECK_INTERVAL + liveModeThreshold := DEFAULT_LIVE_MODE_THRESHOLD log.Info().Msgf("Work mode monitor initialized with check interval %d and live mode threshold %d", checkInterval, liveModeThreshold) return &WorkModeMonitor{ rpc: rpc, @@ -48,7 +41,7 @@ func NewWorkModeMonitor(rpc rpc.IRPCClient, storage storage.IStorage) *WorkModeM workModeChannels: make(map[chan WorkMode]struct{}), currentMode: "", checkInterval: time.Duration(checkInterval) * time.Minute, - liveModeThreshold: big.NewInt(liveModeThreshold), + liveModeThreshold: big.NewInt(int64(liveModeThreshold)), } } diff --git a/internal/worker/worker.go b/internal/worker/worker.go index eb856b2..63adcfe 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -295,7 +295,7 @@ func (w *Worker) processBatchWithRetry(ctx context.Context, blocks []*big.Int, s Int("chunk_size", chunkSize). Str("first_block", blocks[0].String()). Str("last_block", blocks[len(blocks)-1].String()). - Msg("Processing blocks") + Msgf("Processing blocks for range %s - %s", blocks[0].String(), blocks[len(blocks)-1].String()) var allResults []rpc.GetFullBlockResult var allFailures []rpc.GetFullBlockResult @@ -410,36 +410,28 @@ func (w *Worker) Run(ctx context.Context, blockNumbers []*big.Int) []rpc.GetFull success = len(results) > 0 && len(errors) == 0 } - if !success { - for _, errResult := range errors { - log.Error().Err(errResult.Error).Msgf("Error fetching block %s", errResult.BlockNumber.String()) - } + if len(errors) > 0 { + first, last := blockNumbers[0], blockNumbers[len(blockNumbers)-1] + firstError, lastError := errors[0], errors[len(errors)-1] + log.Error().Msgf("Error fetching block for range: %s - %s. Error: %s - %s (%d)", first.String(), last.String(), firstError.BlockNumber.String(), lastError.BlockNumber.String(), len(errors)) + return nil } - // Update metrics and log summary - if len(results) > 0 { - lastBlockNumberFloat, _ := results[len(results)-1].BlockNumber.Float64() - metrics.LastFetchedBlock.Set(lastBlockNumberFloat) + if !success || len(results) == 0 { + first, last := blockNumbers[0], blockNumbers[len(blockNumbers)-1] + log.Error().Msgf("No blocks fetched for range: %s - %s", first.String(), last.String()) + return nil + } - // Count successes and failures - successful := 0 - failed := 0 - for _, r := range results { - if r.Error == nil { - successful++ - } else { - failed++ - } - } + // Update metrics and log summary + lastBlockNumberFloat, _ := results[len(results)-1].BlockNumber.Float64() + metrics.LastFetchedBlock.Set(lastBlockNumberFloat) - log.Debug(). - Str("first_block", results[0].BlockNumber.String()). - Str("last_block", results[len(results)-1].BlockNumber.String()). - Int("successful", successful). - Int("failed", failed). - Str("source", sourceType.String()). - Msg("Block fetching complete") - } + log.Debug(). + Str("source", sourceType.String()). + Str("first_block", results[0].BlockNumber.String()). + Str("last_block", results[len(results)-1].BlockNumber.String()). + Msgf("Block fetching complete for range %s - %s", results[0].BlockNumber.String(), results[len(results)-1].BlockNumber.String()) return results }