Skip to content

Commit 84624f5

Browse files
authored
setup workmode determination and update (#224)
### TL;DR Added a work mode monitor to dynamically switch between backfill and live processing modes. ### What changed? - Created a new `WorkModeMonitor` component that determines whether the system should operate in "backfill" or "live" mode based on how far behind the indexer is from the chain head - Added communication channels between the monitor and other components (Poller and Committer) - Modified the Poller to shut down when switching to live mode - Added Prometheus metrics to track the current work mode - Implemented functional options pattern for Poller and Committer initialization ### How to test? 1. Run the indexer and observe the logs for work mode changes 2. Check the Prometheus metrics for `current_work_mode` (0 = backfill, 1 = live) 3. Verify that the Poller shuts down when the system transitions to live mode 4. Test with different block heights to ensure the system correctly switches between modes based on the configured threshold (500 blocks) ### Why make this change? This change improves resource utilization by dynamically adapting the system's behavior based on its current state: - In backfill mode, the system uses parallel pollers to catch up with historical data - In live mode, the system stops the resource-intensive polling process since it's already caught up with the chain - This approach prevents unnecessary resource consumption once the indexer is up-to-date with the blockchain <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced automatic detection and switching between "live" and "backfill" operational modes based on blockchain synchronization status. - Added real-time metrics to display the current operational mode. - Components now dynamically respond to changes in work mode, improving system adaptability and monitoring. - **Bug Fixes** - Improved logging and shutdown handling during mode transitions for better reliability and transparency. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents df8a106 + 3fb9870 commit 84624f5

File tree

6 files changed

+241
-12
lines changed

6 files changed

+241
-12
lines changed

internal/metrics/metrics.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,11 @@ var (
144144
Buckets: prometheus.DefBuckets,
145145
})
146146
)
147+
148+
// Work Mode Metrics
149+
var (
150+
CurrentWorkMode = promauto.NewGauge(prometheus.GaugeOpts{
151+
Name: "current_work_mode",
152+
Help: "The current work mode (0 = backfill, 1 = live)",
153+
})
154+
)

internal/orchestrator/committer.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,19 @@ type Committer struct {
2727
rpc rpc.IRPCClient
2828
lastCommittedBlock *big.Int
2929
publisher *publisher.Publisher
30+
workMode WorkMode
31+
workModeChan chan WorkMode
3032
}
3133

32-
func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
34+
type CommitterOption func(*Committer)
35+
36+
func WithCommitterWorkModeChan(ch chan WorkMode) CommitterOption {
37+
return func(c *Committer) {
38+
c.workModeChan = ch
39+
}
40+
}
41+
42+
func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...CommitterOption) *Committer {
3343
triggerInterval := config.Cfg.Committer.Interval
3444
if triggerInterval == 0 {
3545
triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL
@@ -40,15 +50,22 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
4050
}
4151

4252
commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock))
43-
return &Committer{
53+
committer := &Committer{
4454
triggerIntervalMs: triggerInterval,
4555
blocksPerCommit: blocksPerCommit,
4656
storage: storage,
4757
commitFromBlock: commitFromBlock,
4858
rpc: rpc,
4959
lastCommittedBlock: commitFromBlock,
5060
publisher: publisher.GetInstance(),
61+
workMode: "",
62+
}
63+
64+
for _, opt := range opts {
65+
opt(committer)
5166
}
67+
68+
return committer
5269
}
5370

5471
func (c *Committer) Start(ctx context.Context) {
@@ -61,8 +78,17 @@ func (c *Committer) Start(ctx context.Context) {
6178
log.Info().Msg("Committer shutting down")
6279
c.publisher.Close()
6380
return
81+
case workMode := <-c.workModeChan:
82+
if workMode != c.workMode && workMode != "" {
83+
log.Info().Msgf("Committer work mode changing from %s to %s", c.workMode, workMode)
84+
c.workMode = workMode
85+
}
6486
default:
6587
time.Sleep(interval)
88+
if c.workMode == "" {
89+
log.Debug().Msg("Committer work mode not set, skipping commit")
90+
continue
91+
}
6692
blockDataToCommit, err := c.getSequentialBlockDataToCommit(ctx)
6793
if err != nil {
6894
log.Error().Err(err).Msg("Error getting block data to commit")

internal/orchestrator/committer_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,7 @@ func TestStartCommitter(t *testing.T) {
372372

373373
committer := NewCommitter(mockRPC, mockStorage)
374374
committer.triggerIntervalMs = 100 // Set a short interval for testing
375+
committer.workMode = WorkModeBackfill
375376

376377
chainID := big.NewInt(1)
377378
mockRPC.EXPECT().GetChainID().Return(chainID)
@@ -405,6 +406,7 @@ func TestCommitterRespectsSIGTERM(t *testing.T) {
405406

406407
committer := NewCommitter(mockRPC, mockStorage)
407408
committer.triggerIntervalMs = 100 // Short interval for testing
409+
committer.workMode = WorkModeBackfill
408410

409411
chainID := big.NewInt(1)
410412
mockRPC.EXPECT().GetChainID().Return(chainID)

internal/orchestrator/orchestrator.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,17 @@ func (o *Orchestrator) Start() {
5454
o.cancel()
5555
}()
5656

57+
// Create the work mode monitor first
58+
workModeMonitor := NewWorkModeMonitor(o.rpc, o.storage)
59+
5760
if o.pollerEnabled {
5861
wg.Add(1)
5962
go func() {
6063
defer wg.Done()
61-
poller := NewPoller(o.rpc, o.storage)
64+
pollerWorkModeChan := make(chan WorkMode, 1)
65+
workModeMonitor.RegisterChannel(pollerWorkModeChan)
66+
defer workModeMonitor.UnregisterChannel(pollerWorkModeChan)
67+
poller := NewPoller(o.rpc, o.storage, WithPollerWorkModeChan(pollerWorkModeChan))
6268
poller.Start(ctx)
6369
}()
6470
}
@@ -76,7 +82,10 @@ func (o *Orchestrator) Start() {
7682
wg.Add(1)
7783
go func() {
7884
defer wg.Done()
79-
committer := NewCommitter(o.rpc, o.storage)
85+
committerWorkModeChan := make(chan WorkMode, 1)
86+
workModeMonitor.RegisterChannel(committerWorkModeChan)
87+
defer workModeMonitor.UnregisterChannel(committerWorkModeChan)
88+
committer := NewCommitter(o.rpc, o.storage, WithCommitterWorkModeChan(committerWorkModeChan))
8089
committer.Start(ctx)
8190
}()
8291
}
@@ -90,6 +99,12 @@ func (o *Orchestrator) Start() {
9099
}()
91100
}
92101

102+
wg.Add(1)
103+
go func() {
104+
defer wg.Done()
105+
workModeMonitor.Start(ctx)
106+
}()
107+
93108
// The chain tracker is always running
94109
wg.Add(1)
95110
go func() {

internal/orchestrator/poller.go

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,23 @@ type Poller struct {
2828
pollFromBlock *big.Int
2929
pollUntilBlock *big.Int
3030
parallelPollers int
31+
workModeChan chan WorkMode
3132
}
3233

3334
type BlockNumberWithError struct {
3435
BlockNumber *big.Int
3536
Error error
3637
}
3738

38-
func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
39+
type PollerOption func(*Poller)
40+
41+
func WithPollerWorkModeChan(ch chan WorkMode) PollerOption {
42+
return func(p *Poller) {
43+
p.workModeChan = ch
44+
}
45+
}
46+
47+
func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage, opts ...PollerOption) *Poller {
3948
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
4049
if blocksPerPoll == 0 {
4150
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
@@ -44,19 +53,25 @@ func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
4453
if triggerInterval == 0 {
4554
triggerInterval = DEFAULT_TRIGGER_INTERVAL
4655
}
47-
return &Poller{
56+
poller := &Poller{
4857
rpc: rpc,
4958
triggerIntervalMs: int64(triggerInterval),
5059
blocksPerPoll: int64(blocksPerPoll),
5160
storage: storage,
5261
parallelPollers: config.Cfg.Poller.ParallelPollers,
5362
}
63+
64+
for _, opt := range opts {
65+
opt(poller)
66+
}
67+
68+
return poller
5469
}
5570

5671
var ErrNoNewBlocks = fmt.Errorf("no new blocks to poll")
5772

58-
func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
59-
poller := NewBoundlessPoller(rpc, storage)
73+
func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage, opts ...PollerOption) *Poller {
74+
poller := NewBoundlessPoller(rpc, storage, opts...)
6075
untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock))
6176
pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock))
6277
lastPolledBlock := new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
@@ -131,11 +146,14 @@ func (p *Poller) Start(ctx context.Context) {
131146
for {
132147
select {
133148
case <-ctx.Done():
134-
cancel()
135-
close(tasks)
136-
wg.Wait()
137-
log.Info().Msg("Poller shutting down")
149+
p.shutdown(cancel, tasks, &wg)
138150
return
151+
case workMode := <-p.workModeChan:
152+
if workMode == WorkModeLive {
153+
log.Info().Msg("Switching to live mode, stopping poller")
154+
p.shutdown(cancel, tasks, &wg)
155+
return
156+
}
139157
case <-ticker.C:
140158
select {
141159
case tasks <- struct{}{}:
@@ -274,3 +292,10 @@ func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) {
274292
log.Error().Err(err).Msg("Error saving block failures")
275293
}
276294
}
295+
296+
func (p *Poller) shutdown(cancel context.CancelFunc, tasks chan struct{}, wg *sync.WaitGroup) {
297+
cancel()
298+
close(tasks)
299+
wg.Wait()
300+
log.Info().Msg("Poller shutting down")
301+
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package orchestrator
2+
3+
import (
4+
"context"
5+
"math/big"
6+
"sync"
7+
"time"
8+
9+
"github.com/rs/zerolog/log"
10+
"github.com/thirdweb-dev/indexer/internal/metrics"
11+
"github.com/thirdweb-dev/indexer/internal/rpc"
12+
"github.com/thirdweb-dev/indexer/internal/storage"
13+
)
14+
15+
type WorkMode string
16+
17+
const (
18+
WORK_MODE_CHECK_INTERVAL = 10 * time.Minute
19+
WORK_MODE_BACKFILL_THRESHOLD = 500
20+
WorkModeLive WorkMode = "live"
21+
WorkModeBackfill WorkMode = "backfill"
22+
)
23+
24+
type WorkModeMonitor struct {
25+
rpc rpc.IRPCClient
26+
storage storage.IStorage
27+
workModeChannels map[chan WorkMode]struct{}
28+
channelsMutex sync.RWMutex
29+
currentMode WorkMode
30+
}
31+
32+
func NewWorkModeMonitor(rpc rpc.IRPCClient, storage storage.IStorage) *WorkModeMonitor {
33+
return &WorkModeMonitor{
34+
rpc: rpc,
35+
storage: storage,
36+
workModeChannels: make(map[chan WorkMode]struct{}),
37+
currentMode: "",
38+
}
39+
}
40+
41+
// RegisterChannel adds a new channel to receive work mode updates
42+
func (m *WorkModeMonitor) RegisterChannel(ch chan WorkMode) {
43+
m.channelsMutex.Lock()
44+
defer m.channelsMutex.Unlock()
45+
46+
m.workModeChannels[ch] = struct{}{}
47+
// Send current mode to the new channel only if it's not empty
48+
if m.currentMode != "" {
49+
select {
50+
case ch <- m.currentMode:
51+
log.Debug().Msg("Initial work mode sent to new channel")
52+
default:
53+
log.Warn().Msg("Failed to send initial work mode to new channel - channel full")
54+
}
55+
}
56+
}
57+
58+
// UnregisterChannel removes a channel from receiving work mode updates
59+
func (m *WorkModeMonitor) UnregisterChannel(ch chan WorkMode) {
60+
m.channelsMutex.Lock()
61+
defer m.channelsMutex.Unlock()
62+
63+
delete(m.workModeChannels, ch)
64+
}
65+
66+
func (m *WorkModeMonitor) updateWorkModeMetric(mode WorkMode) {
67+
var value float64
68+
if mode == WorkModeLive {
69+
value = 1
70+
}
71+
metrics.CurrentWorkMode.Set(value)
72+
}
73+
74+
func (m *WorkModeMonitor) Start(ctx context.Context) {
75+
// Perform immediate check
76+
newMode, err := m.determineWorkMode(ctx)
77+
if err != nil {
78+
log.Error().Err(err).Msg("Error checking work mode during startup")
79+
} else if newMode != m.currentMode {
80+
log.Info().Msgf("Work mode changing from %s to %s during startup", m.currentMode, newMode)
81+
m.currentMode = newMode
82+
m.updateWorkModeMetric(newMode)
83+
m.broadcastWorkMode(newMode)
84+
}
85+
86+
ticker := time.NewTicker(WORK_MODE_CHECK_INTERVAL)
87+
defer ticker.Stop()
88+
89+
log.Info().Msgf("Work mode monitor started with initial mode: %s", m.currentMode)
90+
91+
for {
92+
select {
93+
case <-ctx.Done():
94+
log.Info().Msg("Work mode monitor shutting down")
95+
return
96+
case <-ticker.C:
97+
newMode, err := m.determineWorkMode(ctx)
98+
if err != nil {
99+
log.Error().Err(err).Msg("Error checking work mode")
100+
continue
101+
}
102+
103+
if newMode != m.currentMode {
104+
log.Info().Msgf("Work mode changing from %s to %s", m.currentMode, newMode)
105+
m.currentMode = newMode
106+
m.updateWorkModeMetric(newMode)
107+
m.broadcastWorkMode(newMode)
108+
}
109+
}
110+
}
111+
}
112+
113+
func (m *WorkModeMonitor) broadcastWorkMode(mode WorkMode) {
114+
m.channelsMutex.RLock()
115+
defer m.channelsMutex.RUnlock()
116+
117+
for ch := range m.workModeChannels {
118+
select {
119+
case ch <- mode:
120+
log.Debug().Msg("Work mode change notification sent")
121+
default:
122+
if r := recover(); r != nil {
123+
log.Warn().Msg("Work mode notification dropped - channel closed")
124+
delete(m.workModeChannels, ch)
125+
}
126+
}
127+
}
128+
}
129+
130+
func (m *WorkModeMonitor) determineWorkMode(ctx context.Context) (WorkMode, error) {
131+
lastCommittedBlock, err := m.storage.MainStorage.GetMaxBlockNumber(m.rpc.GetChainID())
132+
if err != nil {
133+
return "", err
134+
}
135+
136+
if lastCommittedBlock.Sign() == 0 {
137+
log.Debug().Msg("No blocks committed yet, using backfill mode")
138+
return WorkModeBackfill, nil
139+
}
140+
141+
latestBlock, err := m.rpc.GetLatestBlockNumber(ctx)
142+
if err != nil {
143+
return "", err
144+
}
145+
146+
blockDiff := new(big.Int).Sub(latestBlock, lastCommittedBlock)
147+
log.Debug().Msgf("Committer is %d blocks behind the chain", blockDiff.Int64())
148+
if blockDiff.Cmp(big.NewInt(WORK_MODE_BACKFILL_THRESHOLD)) < 0 {
149+
return WorkModeLive, nil
150+
}
151+
152+
return WorkModeBackfill, nil
153+
}

0 commit comments

Comments
 (0)