Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,11 @@ var (
Buckets: prometheus.DefBuckets,
})
)

// Work Mode Metrics
var (
CurrentWorkMode = promauto.NewGauge(prometheus.GaugeOpts{
Name: "current_work_mode",
Help: "The current work mode (0 = backfill, 1 = live)",
})
)
30 changes: 28 additions & 2 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,19 @@ type Committer struct {
rpc rpc.IRPCClient
lastCommittedBlock *big.Int
publisher *publisher.Publisher
workMode WorkMode
workModeChan chan WorkMode
}

func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
type CommitterOption func(*Committer)

func WithCommitterWorkModeChan(ch chan WorkMode) CommitterOption {
return func(c *Committer) {
c.workModeChan = ch
}
}

func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...CommitterOption) *Committer {
triggerInterval := config.Cfg.Committer.Interval
if triggerInterval == 0 {
triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL
Expand All @@ -40,15 +50,22 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
}

commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock))
return &Committer{
committer := &Committer{
triggerIntervalMs: triggerInterval,
blocksPerCommit: blocksPerCommit,
storage: storage,
commitFromBlock: commitFromBlock,
rpc: rpc,
lastCommittedBlock: commitFromBlock,
publisher: publisher.GetInstance(),
workMode: "",
}

for _, opt := range opts {
opt(committer)
}

return committer
}

func (c *Committer) Start(ctx context.Context) {
Expand All @@ -61,8 +78,17 @@ func (c *Committer) Start(ctx context.Context) {
log.Info().Msg("Committer shutting down")
c.publisher.Close()
return
case workMode := <-c.workModeChan:
if workMode != c.workMode && workMode != "" {
log.Info().Msgf("Committer work mode changing from %s to %s", c.workMode, workMode)
c.workMode = workMode
}
default:
time.Sleep(interval)
if c.workMode == "" {
log.Debug().Msg("Committer work mode not set, skipping commit")
continue
}
blockDataToCommit, err := c.getSequentialBlockDataToCommit(ctx)
if err != nil {
log.Error().Err(err).Msg("Error getting block data to commit")
Expand Down
2 changes: 2 additions & 0 deletions internal/orchestrator/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ func TestStartCommitter(t *testing.T) {

committer := NewCommitter(mockRPC, mockStorage)
committer.triggerIntervalMs = 100 // Set a short interval for testing
committer.workMode = WorkModeBackfill

chainID := big.NewInt(1)
mockRPC.EXPECT().GetChainID().Return(chainID)
Expand Down Expand Up @@ -405,6 +406,7 @@ func TestCommitterRespectsSIGTERM(t *testing.T) {

committer := NewCommitter(mockRPC, mockStorage)
committer.triggerIntervalMs = 100 // Short interval for testing
committer.workMode = WorkModeBackfill

chainID := big.NewInt(1)
mockRPC.EXPECT().GetChainID().Return(chainID)
Expand Down
19 changes: 17 additions & 2 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,17 @@ func (o *Orchestrator) Start() {
o.cancel()
}()

// Create the work mode monitor first
workModeMonitor := NewWorkModeMonitor(o.rpc, o.storage)

if o.pollerEnabled {
wg.Add(1)
go func() {
defer wg.Done()
poller := NewPoller(o.rpc, o.storage)
pollerWorkModeChan := make(chan WorkMode, 1)
workModeMonitor.RegisterChannel(pollerWorkModeChan)
defer workModeMonitor.UnregisterChannel(pollerWorkModeChan)
poller := NewPoller(o.rpc, o.storage, WithPollerWorkModeChan(pollerWorkModeChan))
poller.Start(ctx)
}()
}
Expand All @@ -76,7 +82,10 @@ func (o *Orchestrator) Start() {
wg.Add(1)
go func() {
defer wg.Done()
committer := NewCommitter(o.rpc, o.storage)
committerWorkModeChan := make(chan WorkMode, 1)
workModeMonitor.RegisterChannel(committerWorkModeChan)
defer workModeMonitor.UnregisterChannel(committerWorkModeChan)
committer := NewCommitter(o.rpc, o.storage, WithCommitterWorkModeChan(committerWorkModeChan))
committer.Start(ctx)
}()
}
Expand All @@ -90,6 +99,12 @@ func (o *Orchestrator) Start() {
}()
}

wg.Add(1)
go func() {
defer wg.Done()
workModeMonitor.Start(ctx)
}()

// The chain tracker is always running
wg.Add(1)
go func() {
Expand Down
41 changes: 33 additions & 8 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,23 @@ type Poller struct {
pollFromBlock *big.Int
pollUntilBlock *big.Int
parallelPollers int
workModeChan chan WorkMode
}

type BlockNumberWithError struct {
BlockNumber *big.Int
Error error
}

func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
type PollerOption func(*Poller)

func WithPollerWorkModeChan(ch chan WorkMode) PollerOption {
return func(p *Poller) {
p.workModeChan = ch
}
}

func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage, opts ...PollerOption) *Poller {
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
if blocksPerPoll == 0 {
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
Expand All @@ -44,19 +53,25 @@ func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
if triggerInterval == 0 {
triggerInterval = DEFAULT_TRIGGER_INTERVAL
}
return &Poller{
poller := &Poller{
rpc: rpc,
triggerIntervalMs: int64(triggerInterval),
blocksPerPoll: int64(blocksPerPoll),
storage: storage,
parallelPollers: config.Cfg.Poller.ParallelPollers,
}

for _, opt := range opts {
opt(poller)
}

return poller
}

var ErrNoNewBlocks = fmt.Errorf("no new blocks to poll")

func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
poller := NewBoundlessPoller(rpc, storage)
func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage, opts ...PollerOption) *Poller {
poller := NewBoundlessPoller(rpc, storage, opts...)
untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock))
pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock))
lastPolledBlock := new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
Expand Down Expand Up @@ -131,11 +146,14 @@ func (p *Poller) Start(ctx context.Context) {
for {
select {
case <-ctx.Done():
cancel()
close(tasks)
wg.Wait()
log.Info().Msg("Poller shutting down")
p.shutdown(cancel, tasks, &wg)
return
case workMode := <-p.workModeChan:
if workMode == WorkModeLive {
log.Info().Msg("Switching to live mode, stopping poller")
p.shutdown(cancel, tasks, &wg)
return
}
case <-ticker.C:
select {
case tasks <- struct{}{}:
Expand Down Expand Up @@ -274,3 +292,10 @@ func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) {
log.Error().Err(err).Msg("Error saving block failures")
}
}

func (p *Poller) shutdown(cancel context.CancelFunc, tasks chan struct{}, wg *sync.WaitGroup) {
cancel()
close(tasks)
wg.Wait()
log.Info().Msg("Poller shutting down")
}
153 changes: 153 additions & 0 deletions internal/orchestrator/work_mode_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package orchestrator

import (
"context"
"math/big"
"sync"
"time"

"github.com/rs/zerolog/log"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
)

type WorkMode string

const (
WORK_MODE_CHECK_INTERVAL = 10 * time.Minute
WORK_MODE_BACKFILL_THRESHOLD = 500
WorkModeLive WorkMode = "live"
WorkModeBackfill WorkMode = "backfill"
)

type WorkModeMonitor struct {
rpc rpc.IRPCClient
storage storage.IStorage
workModeChannels map[chan WorkMode]struct{}
channelsMutex sync.RWMutex
currentMode WorkMode
}

func NewWorkModeMonitor(rpc rpc.IRPCClient, storage storage.IStorage) *WorkModeMonitor {
return &WorkModeMonitor{
rpc: rpc,
storage: storage,
workModeChannels: make(map[chan WorkMode]struct{}),
currentMode: "",
}
}

// RegisterChannel adds a new channel to receive work mode updates
func (m *WorkModeMonitor) RegisterChannel(ch chan WorkMode) {
m.channelsMutex.Lock()
defer m.channelsMutex.Unlock()

m.workModeChannels[ch] = struct{}{}
// Send current mode to the new channel only if it's not empty
if m.currentMode != "" {
select {
case ch <- m.currentMode:
log.Debug().Msg("Initial work mode sent to new channel")
default:
log.Warn().Msg("Failed to send initial work mode to new channel - channel full")
}
}
}

// UnregisterChannel removes a channel from receiving work mode updates
func (m *WorkModeMonitor) UnregisterChannel(ch chan WorkMode) {
m.channelsMutex.Lock()
defer m.channelsMutex.Unlock()

delete(m.workModeChannels, ch)
}

func (m *WorkModeMonitor) updateWorkModeMetric(mode WorkMode) {
var value float64
if mode == WorkModeLive {
value = 1
}
metrics.CurrentWorkMode.Set(value)
}

func (m *WorkModeMonitor) Start(ctx context.Context) {
// Perform immediate check
newMode, err := m.determineWorkMode(ctx)
if err != nil {
log.Error().Err(err).Msg("Error checking work mode during startup")
} else if newMode != m.currentMode {
log.Info().Msgf("Work mode changing from %s to %s during startup", m.currentMode, newMode)
m.currentMode = newMode
m.updateWorkModeMetric(newMode)
m.broadcastWorkMode(newMode)
}

ticker := time.NewTicker(WORK_MODE_CHECK_INTERVAL)
defer ticker.Stop()

log.Info().Msgf("Work mode monitor started with initial mode: %s", m.currentMode)

for {
select {
case <-ctx.Done():
log.Info().Msg("Work mode monitor shutting down")
return
case <-ticker.C:
newMode, err := m.determineWorkMode(ctx)
if err != nil {
log.Error().Err(err).Msg("Error checking work mode")
continue
}

if newMode != m.currentMode {
log.Info().Msgf("Work mode changing from %s to %s", m.currentMode, newMode)
m.currentMode = newMode
m.updateWorkModeMetric(newMode)
m.broadcastWorkMode(newMode)
}
}
}
}

func (m *WorkModeMonitor) broadcastWorkMode(mode WorkMode) {
m.channelsMutex.RLock()
defer m.channelsMutex.RUnlock()

for ch := range m.workModeChannels {
select {
case ch <- mode:
log.Debug().Msg("Work mode change notification sent")
default:
if r := recover(); r != nil {
log.Warn().Msg("Work mode notification dropped - channel closed")
delete(m.workModeChannels, ch)
}
}
}
}

func (m *WorkModeMonitor) determineWorkMode(ctx context.Context) (WorkMode, error) {
lastCommittedBlock, err := m.storage.MainStorage.GetMaxBlockNumber(m.rpc.GetChainID())
if err != nil {
return "", err
}

if lastCommittedBlock.Sign() == 0 {
log.Debug().Msg("No blocks committed yet, using backfill mode")
return WorkModeBackfill, nil
}

latestBlock, err := m.rpc.GetLatestBlockNumber(ctx)
if err != nil {
return "", err
}

blockDiff := new(big.Int).Sub(latestBlock, lastCommittedBlock)
log.Debug().Msgf("Committer is %d blocks behind the chain", blockDiff.Int64())
if blockDiff.Cmp(big.NewInt(WORK_MODE_BACKFILL_THRESHOLD)) < 0 {
return WorkModeLive, nil
}

return WorkModeBackfill, nil
}