Skip to content

Commit 43f676e

Browse files
committed
Minor updates
1 parent 72e5cab commit 43f676e

File tree

4 files changed

+27
-26
lines changed

4 files changed

+27
-26
lines changed

internal/orchestrator/chain_tracker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"github.com/thirdweb-dev/indexer/internal/rpc"
1010
)
1111

12-
const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 300000 // 5 minutes
12+
const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 60 * 1000 // 1 minutes
1313

1414
type ChainTracker struct {
1515
rpc rpc.IRPCClient

internal/orchestrator/orchestrator.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,6 @@ func (o *Orchestrator) Start() {
5151
o.cancel()
5252
}()
5353

54-
// Create the work mode monitor first
55-
workModeMonitor := NewWorkModeMonitor(o.rpc, o.storage)
56-
5754
o.initializeWorkerAndPoller()
5855

5956
o.wg.Add(1)
@@ -91,14 +88,6 @@ func (o *Orchestrator) Start() {
9188
}()
9289
}
9390

94-
o.wg.Add(1)
95-
go func() {
96-
defer o.wg.Done()
97-
workModeMonitor.Start(ctx)
98-
99-
log.Info().Msg("Work mode monitor completed")
100-
}()
101-
10291
// The chain tracker is always running
10392
o.wg.Add(1)
10493
go func() {

internal/storage/badger.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,16 @@ type BadgerConnector struct {
2525
gcTicker *time.Ticker
2626
stopGC chan struct{}
2727

28+
// Configuration
29+
stagingDataTTL time.Duration // TTL for staging data entries
30+
gcInterval time.Duration // Interval for running garbage collection
31+
cacheRefreshInterval time.Duration // Interval for refreshing range cache
32+
cacheStalenessTimeout time.Duration // Timeout before considering cache entry stale
33+
2834
// In-memory block range cache
35+
// NOTE: Staging data has a TTL. The cache is refreshed periodically
36+
// to detect expired entries and update min/max ranges accordingly.
37+
// Badger doesn't provide expiry notifications, so we rely on periodic scanning.
2938
rangeCache map[string]*blockRange // chainId -> range
3039
rangeCacheMu sync.RWMutex
3140
rangeUpdateChan chan string // channel for triggering background updates
@@ -71,15 +80,19 @@ func NewBadgerConnector(cfg *config.BadgerConfig) (*BadgerConnector, error) {
7180
}
7281

7382
bc := &BadgerConnector{
74-
db: db,
75-
stopGC: make(chan struct{}),
76-
rangeCache: make(map[string]*blockRange),
77-
rangeUpdateChan: make(chan string, 5),
78-
stopRangeUpdate: make(chan struct{}),
83+
db: db,
84+
stopGC: make(chan struct{}),
85+
rangeCache: make(map[string]*blockRange),
86+
rangeUpdateChan: make(chan string, 5),
87+
stopRangeUpdate: make(chan struct{}),
88+
stagingDataTTL: 10 * time.Minute,
89+
gcInterval: 60 * time.Second,
90+
cacheRefreshInterval: 60 * time.Second,
91+
cacheStalenessTimeout: 120 * time.Second,
7992
}
8093

8194
// Start GC routine
82-
bc.gcTicker = time.NewTicker(time.Duration(60) * time.Second)
95+
bc.gcTicker = time.NewTicker(bc.gcInterval)
8396
go bc.runGC()
8497

8598
// Start range cache update routine
@@ -104,7 +117,7 @@ func (bc *BadgerConnector) runGC() {
104117

105118
// runRangeCacheUpdater runs in the background to validate cache entries
106119
func (bc *BadgerConnector) runRangeCacheUpdater() {
107-
ticker := time.NewTicker(time.Minute)
120+
ticker := time.NewTicker(bc.cacheRefreshInterval)
108121
defer ticker.Stop()
109122

110123
for {
@@ -184,7 +197,7 @@ func (bc *BadgerConnector) refreshStaleRanges() {
184197
staleChains := []string{}
185198
now := time.Now()
186199
for chainId, r := range bc.rangeCache {
187-
if now.Sub(r.lastUpdated) > 3*time.Minute {
200+
if now.Sub(r.lastUpdated) > bc.cacheStalenessTimeout {
188201
staleChains = append(staleChains, chainId)
189202
}
190203
}
@@ -391,7 +404,9 @@ func (bc *BadgerConnector) InsertStagingData(data []common.BlockData) error {
391404
return err
392405
}
393406

394-
if err := txn.Set(key, buf.Bytes()); err != nil {
407+
// Set with configured TTL for staging data
408+
entry := badger.NewEntry(key, buf.Bytes()).WithTTL(bc.stagingDataTTL)
409+
if err := txn.SetEntry(entry); err != nil {
395410
return err
396411
}
397412

internal/worker/worker.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -431,11 +431,8 @@ func (w *Worker) Run(ctx context.Context, blockNumbers []*big.Int) []rpc.GetFull
431431
}
432432

433433
log.Debug().
434-
Int("total", len(results)).
435-
Str("first_block", blockNumbers[0].String()).
436-
Str("last_block", blockNumbers[len(results)-1].String()).
437-
Str("first_block_result", results[0].BlockNumber.String()).
438-
Str("last_block_result", results[len(results)-1].BlockNumber.String()).
434+
Str("first_block", results[0].BlockNumber.String()).
435+
Str("last_block", results[len(results)-1].BlockNumber.String()).
439436
Int("successful", successful).
440437
Int("failed", failed).
441438
Str("source", sourceType.String()).

0 commit comments

Comments
 (0)