diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 4a08854..eb20d79 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -144,3 +144,11 @@ var ( Buckets: prometheus.DefBuckets, }) ) + +// Work Mode Metrics +var ( + CurrentWorkMode = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "current_work_mode", + Help: "The current work mode (0 = backfill, 1 = live)", + }) +) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index c24caed..e8487f1 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -27,9 +27,19 @@ type Committer struct { rpc rpc.IRPCClient lastCommittedBlock *big.Int publisher *publisher.Publisher + workMode WorkMode + workModeChan chan WorkMode } -func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer { +type CommitterOption func(*Committer) + +func WithCommitterWorkModeChan(ch chan WorkMode) CommitterOption { + return func(c *Committer) { + c.workModeChan = ch + } +} + +func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...CommitterOption) *Committer { triggerInterval := config.Cfg.Committer.Interval if triggerInterval == 0 { triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL @@ -40,7 +50,7 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer { } commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock)) - return &Committer{ + committer := &Committer{ triggerIntervalMs: triggerInterval, blocksPerCommit: blocksPerCommit, storage: storage, @@ -48,7 +58,14 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer { rpc: rpc, lastCommittedBlock: commitFromBlock, publisher: publisher.GetInstance(), + workMode: "", + } + + for _, opt := range opts { + opt(committer) } + + return committer } func (c *Committer) Start(ctx context.Context) { @@ -61,8 +78,17 @@ func (c *Committer) Start(ctx context.Context) { log.Info().Msg("Committer shutting down") c.publisher.Close() return + case workMode := <-c.workModeChan: + if workMode != c.workMode && workMode != "" { + log.Info().Msgf("Committer work mode changing from %s to %s", c.workMode, workMode) + c.workMode = workMode + } default: time.Sleep(interval) + if c.workMode == "" { + log.Debug().Msg("Committer work mode not set, skipping commit") + continue + } blockDataToCommit, err := c.getSequentialBlockDataToCommit(ctx) if err != nil { log.Error().Err(err).Msg("Error getting block data to commit") diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index 25b0ca3..5c1bf60 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -372,6 +372,7 @@ func TestStartCommitter(t *testing.T) { committer := NewCommitter(mockRPC, mockStorage) committer.triggerIntervalMs = 100 // Set a short interval for testing + committer.workMode = WorkModeBackfill chainID := big.NewInt(1) mockRPC.EXPECT().GetChainID().Return(chainID) @@ -405,6 +406,7 @@ func TestCommitterRespectsSIGTERM(t *testing.T) { committer := NewCommitter(mockRPC, mockStorage) committer.triggerIntervalMs = 100 // Short interval for testing + committer.workMode = WorkModeBackfill chainID := big.NewInt(1) mockRPC.EXPECT().GetChainID().Return(chainID) diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 2bf2faa..d2e4bb4 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -54,11 +54,17 @@ func (o *Orchestrator) Start() { o.cancel() }() + // Create the work mode monitor first + workModeMonitor := NewWorkModeMonitor(o.rpc, o.storage) + if o.pollerEnabled { wg.Add(1) go func() { defer wg.Done() - poller := NewPoller(o.rpc, o.storage) + pollerWorkModeChan := make(chan WorkMode, 1) + workModeMonitor.RegisterChannel(pollerWorkModeChan) + defer workModeMonitor.UnregisterChannel(pollerWorkModeChan) + poller := NewPoller(o.rpc, o.storage, WithPollerWorkModeChan(pollerWorkModeChan)) poller.Start(ctx) }() } @@ -76,7 +82,10 @@ func (o *Orchestrator) Start() { wg.Add(1) go func() { defer wg.Done() - committer := NewCommitter(o.rpc, o.storage) + committerWorkModeChan := make(chan WorkMode, 1) + workModeMonitor.RegisterChannel(committerWorkModeChan) + defer workModeMonitor.UnregisterChannel(committerWorkModeChan) + committer := NewCommitter(o.rpc, o.storage, WithCommitterWorkModeChan(committerWorkModeChan)) committer.Start(ctx) }() } @@ -90,6 +99,12 @@ func (o *Orchestrator) Start() { }() } + wg.Add(1) + go func() { + defer wg.Done() + workModeMonitor.Start(ctx) + }() + // The chain tracker is always running wg.Add(1) go func() { diff --git a/internal/orchestrator/poller.go b/internal/orchestrator/poller.go index 325de7c..b660a8c 100644 --- a/internal/orchestrator/poller.go +++ b/internal/orchestrator/poller.go @@ -28,6 +28,7 @@ type Poller struct { pollFromBlock *big.Int pollUntilBlock *big.Int parallelPollers int + workModeChan chan WorkMode } type BlockNumberWithError struct { @@ -35,7 +36,15 @@ type BlockNumberWithError struct { Error error } -func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller { +type PollerOption func(*Poller) + +func WithPollerWorkModeChan(ch chan WorkMode) PollerOption { + return func(p *Poller) { + p.workModeChan = ch + } +} + +func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage, opts ...PollerOption) *Poller { blocksPerPoll := config.Cfg.Poller.BlocksPerPoll if blocksPerPoll == 0 { blocksPerPoll = DEFAULT_BLOCKS_PER_POLL @@ -44,19 +53,25 @@ func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller { if triggerInterval == 0 { triggerInterval = DEFAULT_TRIGGER_INTERVAL } - return &Poller{ + poller := &Poller{ rpc: rpc, triggerIntervalMs: int64(triggerInterval), blocksPerPoll: int64(blocksPerPoll), storage: storage, parallelPollers: config.Cfg.Poller.ParallelPollers, } + + for _, opt := range opts { + opt(poller) + } + + return poller } var ErrNoNewBlocks = fmt.Errorf("no new blocks to poll") -func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller { - poller := NewBoundlessPoller(rpc, storage) +func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage, opts ...PollerOption) *Poller { + poller := NewBoundlessPoller(rpc, storage, opts...) untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock)) pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock)) lastPolledBlock := new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block @@ -131,11 +146,14 @@ func (p *Poller) Start(ctx context.Context) { for { select { case <-ctx.Done(): - cancel() - close(tasks) - wg.Wait() - log.Info().Msg("Poller shutting down") + p.shutdown(cancel, tasks, &wg) return + case workMode := <-p.workModeChan: + if workMode == WorkModeLive { + log.Info().Msg("Switching to live mode, stopping poller") + p.shutdown(cancel, tasks, &wg) + return + } case <-ticker.C: select { case tasks <- struct{}{}: @@ -274,3 +292,10 @@ func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) { log.Error().Err(err).Msg("Error saving block failures") } } + +func (p *Poller) shutdown(cancel context.CancelFunc, tasks chan struct{}, wg *sync.WaitGroup) { + cancel() + close(tasks) + wg.Wait() + log.Info().Msg("Poller shutting down") +} diff --git a/internal/orchestrator/work_mode_monitor.go b/internal/orchestrator/work_mode_monitor.go new file mode 100644 index 0000000..298eb89 --- /dev/null +++ b/internal/orchestrator/work_mode_monitor.go @@ -0,0 +1,153 @@ +package orchestrator + +import ( + "context" + "math/big" + "sync" + "time" + + "github.com/rs/zerolog/log" + "github.com/thirdweb-dev/indexer/internal/metrics" + "github.com/thirdweb-dev/indexer/internal/rpc" + "github.com/thirdweb-dev/indexer/internal/storage" +) + +type WorkMode string + +const ( + WORK_MODE_CHECK_INTERVAL = 10 * time.Minute + WORK_MODE_BACKFILL_THRESHOLD = 500 + WorkModeLive WorkMode = "live" + WorkModeBackfill WorkMode = "backfill" +) + +type WorkModeMonitor struct { + rpc rpc.IRPCClient + storage storage.IStorage + workModeChannels map[chan WorkMode]struct{} + channelsMutex sync.RWMutex + currentMode WorkMode +} + +func NewWorkModeMonitor(rpc rpc.IRPCClient, storage storage.IStorage) *WorkModeMonitor { + return &WorkModeMonitor{ + rpc: rpc, + storage: storage, + workModeChannels: make(map[chan WorkMode]struct{}), + currentMode: "", + } +} + +// RegisterChannel adds a new channel to receive work mode updates +func (m *WorkModeMonitor) RegisterChannel(ch chan WorkMode) { + m.channelsMutex.Lock() + defer m.channelsMutex.Unlock() + + m.workModeChannels[ch] = struct{}{} + // Send current mode to the new channel only if it's not empty + if m.currentMode != "" { + select { + case ch <- m.currentMode: + log.Debug().Msg("Initial work mode sent to new channel") + default: + log.Warn().Msg("Failed to send initial work mode to new channel - channel full") + } + } +} + +// UnregisterChannel removes a channel from receiving work mode updates +func (m *WorkModeMonitor) UnregisterChannel(ch chan WorkMode) { + m.channelsMutex.Lock() + defer m.channelsMutex.Unlock() + + delete(m.workModeChannels, ch) +} + +func (m *WorkModeMonitor) updateWorkModeMetric(mode WorkMode) { + var value float64 + if mode == WorkModeLive { + value = 1 + } + metrics.CurrentWorkMode.Set(value) +} + +func (m *WorkModeMonitor) Start(ctx context.Context) { + // Perform immediate check + newMode, err := m.determineWorkMode(ctx) + if err != nil { + log.Error().Err(err).Msg("Error checking work mode during startup") + } else if newMode != m.currentMode { + log.Info().Msgf("Work mode changing from %s to %s during startup", m.currentMode, newMode) + m.currentMode = newMode + m.updateWorkModeMetric(newMode) + m.broadcastWorkMode(newMode) + } + + ticker := time.NewTicker(WORK_MODE_CHECK_INTERVAL) + defer ticker.Stop() + + log.Info().Msgf("Work mode monitor started with initial mode: %s", m.currentMode) + + for { + select { + case <-ctx.Done(): + log.Info().Msg("Work mode monitor shutting down") + return + case <-ticker.C: + newMode, err := m.determineWorkMode(ctx) + if err != nil { + log.Error().Err(err).Msg("Error checking work mode") + continue + } + + if newMode != m.currentMode { + log.Info().Msgf("Work mode changing from %s to %s", m.currentMode, newMode) + m.currentMode = newMode + m.updateWorkModeMetric(newMode) + m.broadcastWorkMode(newMode) + } + } + } +} + +func (m *WorkModeMonitor) broadcastWorkMode(mode WorkMode) { + m.channelsMutex.RLock() + defer m.channelsMutex.RUnlock() + + for ch := range m.workModeChannels { + select { + case ch <- mode: + log.Debug().Msg("Work mode change notification sent") + default: + if r := recover(); r != nil { + log.Warn().Msg("Work mode notification dropped - channel closed") + delete(m.workModeChannels, ch) + } + } + } +} + +func (m *WorkModeMonitor) determineWorkMode(ctx context.Context) (WorkMode, error) { + lastCommittedBlock, err := m.storage.MainStorage.GetMaxBlockNumber(m.rpc.GetChainID()) + if err != nil { + return "", err + } + + if lastCommittedBlock.Sign() == 0 { + log.Debug().Msg("No blocks committed yet, using backfill mode") + return WorkModeBackfill, nil + } + + latestBlock, err := m.rpc.GetLatestBlockNumber(ctx) + if err != nil { + return "", err + } + + blockDiff := new(big.Int).Sub(latestBlock, lastCommittedBlock) + log.Debug().Msgf("Committer is %d blocks behind the chain", blockDiff.Int64()) + if blockDiff.Cmp(big.NewInt(WORK_MODE_BACKFILL_THRESHOLD)) < 0 { + return WorkModeLive, nil + } + + return WorkModeBackfill, nil +}