-
Notifications
You must be signed in to change notification settings - Fork 28
Fix Poller Behavior #279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Poller Behavior #279
Conversation
WalkthroughUpdates span orchestrator polling architecture, storage TTL and maintenance intervals, and observability/logging. Poller is redesigned for concurrent range processing with lookahead and explicit lifecycle. Orchestrator removes WorkModeMonitor. ChainTracker poll interval reduced to 1 minute. Storage adds TTL-backed staging and configurable maintenance tickers. Several logs/metrics are refined. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Orchestrator
participant Poller
participant "Worker Goroutines" as Workers
participant RPC
participant Storage
Orchestrator->>Poller: Start(ctx)
activate Poller
note right of Poller: init ctx, tasks chan, processingRanges\nspawn parallel workers
Poller->>Workers: start workerLoop()
activate Workers
loop request or lookahead
Poller->>Poller: mark range processing
Poller->>Workers: enqueue block range (tasks)
Workers->>RPC: fetch blocks
RPC-->>Workers: block data
Workers->>Storage: stageResults(block data) (With TTL)
Workers->>Poller: updateLastPolledBlock
Poller->>Poller: unmark range processing
end
Orchestrator-->>Poller: ctx canceled
Poller->>Workers: close(tasks) & wait wg
deactivate Workers
Poller-->>Orchestrator: shutdown complete
deactivate Poller
note over Poller,Workers: concurrent range processing + lookahead (no ticker)
sequenceDiagram
autonumber
actor Orchestrator
participant Poller
participant Committer
participant ReorgHandler
participant ChainTracker
Orchestrator->>Orchestrator: initializeWorkerAndPoller()
Orchestrator->>Poller: Start(ctx)
par
Orchestrator->>Committer: Start(ctx)
and
opt if enabled
Orchestrator->>ReorgHandler: Start(ctx)
end
and
Orchestrator->>ChainTracker: Start(ctx)
end
note right of Orchestrator: WorkModeMonitor creation/start removed
sequenceDiagram
autonumber
participant BadgerConnector as Storage
participant BadgerDB as Badger
participant GC as GC Ticker
participant Cache as Cache Refresher
Storage->>BadgerDB: InsertStagingData(SetEntry.WithTTL(stagingDataTTL))
GC->>Storage: every gcInterval
Storage->>BadgerDB: run value log GC
Cache->>Storage: every cacheRefreshInterval
Storage->>Storage: refresh range cache
Cache->>Storage: staleness check (cacheStalenessTimeout)
Storage->>Storage: prune stale ranges
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
internal/orchestrator/chain_tracker.go (1)
51-53: Compile-time bug: big.Int has no Float64().Convert via big.Float (or cast Int64 safely) before setting the metric.
- latestBlockNumberFloat, _ := latestBlockNumber.Float64() - metrics.ChainHead.Set(latestBlockNumberFloat) + f := new(big.Float).SetInt(latestBlockNumber) + v, _ := f.Float64() + metrics.ChainHead.Set(v)internal/orchestrator/committer.go (1)
293-297: Nil deref risk before error check.
latestCommittedBlockNumber.String()is called before verifyingerror non-nil value; can panic on error.- latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) - log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String()) - if err != nil { - return nil, err - } + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) + if err != nil { + return nil, err + } + if latestCommittedBlockNumber != nil { + log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String()) + }
🧹 Nitpick comments (8)
internal/orchestrator/chain_tracker.go (1)
12-12: Nit: fix comment grammar."1 minutes" → "1 minute".
-const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 60 * 1000 // 1 minutes +const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 60 * 1000 // 1 minuteinternal/storage/badger.go (2)
82-92: Make TTL/intervals configurable from BadgerConfig.Expose
stagingDataTTL,gcInterval,cacheRefreshInterval, andcacheStalenessTimeoutvia config and only fall back to these defaults if zero.
653-667: Micro: avoid string allocations when parsing keys.Using byte ops (e.g., bytes.IndexByte/bytes.SplitN) avoids converting keys to string in tight loops.
Also applies to: 659-667
internal/orchestrator/poller.go (5)
19-22: Expose lookahead and parallelism via config.
DEFAULT_LOOKAHEAD_BATCHESis fixed; consider reading aCfg.Poller.LookaheadBatcheswith sane default.
58-74: Initialize lookahead from config (not constant).Minor parity gap vs.
ParallelPollers.- lookaheadBatches := DEFAULT_LOOKAHEAD_BATCHES + lookaheadBatches := config.Cfg.Poller.LookaheadBatches + if lookaheadBatches == 0 { + lookaheadBatches = DEFAULT_LOOKAHEAD_BATCHES + }Also applies to: 64-65
276-326: Reduce RPC fan-out in lookahead.Fetch latest block once per lookahead invocation; reuse inside loop.
-func (p *Poller) triggerLookahead(currentEndBlock *big.Int, batchSize int64) { - // Use configurable lookahead batches - for i := 0; i < p.lookaheadBatches; i++ { +func (p *Poller) triggerLookahead(currentEndBlock *big.Int, batchSize int64) { + latestBlock, err := p.rpc.GetLatestBlockNumber(p.ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to get latest block") + return + } + for i := 0; i < p.lookaheadBatches; i++ { startBlock := new(big.Int).Add(currentEndBlock, big.NewInt(int64(i)*batchSize+1)) endBlock := new(big.Int).Add(startBlock, big.NewInt(batchSize-1)) @@ - // Get latest block to ensure we don't exceed chain head - latestBlock, err := p.rpc.GetLatestBlockNumber(p.ctx) - if err != nil { - log.Error().Err(err).Msg("Failed to get latest block") - break - }Also applies to: 300-306
89-89: Remove unused error sentinel.
ErrNoNewBlocksis declared but never used.
344-350: Unused helper.
updateLastPolledBlockisn’t referenced; either call it inpollor remove.- p.lastPolledBlock = new(big.Int).Set(highestBlockNumber) + p.updateLastPolledBlock(highestBlockNumber)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
internal/orchestrator/chain_tracker.go(1 hunks)internal/orchestrator/committer.go(3 hunks)internal/orchestrator/orchestrator.go(0 hunks)internal/orchestrator/poller.go(5 hunks)internal/storage/badger.go(6 hunks)internal/worker/worker.go(1 hunks)
💤 Files with no reviewable changes (1)
- internal/orchestrator/orchestrator.go
🧰 Additional context used
🧬 Code graph analysis (1)
internal/orchestrator/poller.go (3)
internal/rpc/rpc.go (1)
IRPCClient(42-55)internal/storage/connector.go (1)
IStorage(69-73)internal/common/block.go (1)
BlockData(61-66)
🔇 Additional comments (13)
internal/orchestrator/committer.go (3)
277-283: Good: richer staging cleanup log with context + metric.Structured fields make on-call triage easier.
474-478: Good: corrected error string for publish path.Improves accuracy of surfaced errors.
516-517: Good: range summary in commit log.Range logging simplifies correlating batches end-to-end.
internal/storage/badger.go (7)
53-54: Sensible default path.LGTM; temp dir fallback is fine for local/dev.
95-101: GC ticker wiring looks good.Runs on the configured interval; errors downgraded to debug is appropriate.
120-135: Background range-cache refresher: LGTM.Clear separation of explicit updates vs. periodic refresh.
200-203: Staleness gate is straightforward.Timeout-based invalidation keeps cache bounded.
407-410: TTL-backed staging writes: solid.Matches the PR’s intent to let stale staging self-evict. Ensure TTL comfortably exceeds worst-case commit/publish lag to avoid data churn.
Would you like a quick repo scan to surface current TTL vs. commit/publish interval configs to confirm safe headroom?
641-712: Older-than delete with cache maintenance: LGTM.Cache min adjustment and background resync keep range metadata accurate.
716-745: Range query with cache miss fallback: LGTM.Defensive cache population on miss is a good UX for callers.
internal/worker/worker.go (1)
434-436: Good: log first/last from actual results.Prevents misleading ranges when inputs differ from outputs.
internal/orchestrator/poller.go (2)
91-103: Startup lifecycle: LGTM.Worker pool + graceful shutdown via parent ctx is clean.
146-177: Ignore stale cache path concern The Poller’s worker is constructed with the provided staging source viaNewWorkerWithSources(o.rpc, s3, staging), sopollBlockDatacorrectly reads from staging rather than re-fetching over RPC.Likely an incorrect or invalid review comment.
| // Check if already processing | ||
| p.processingRangesMutex.RLock() | ||
| isProcessing := p.processingRanges[rangeKey] | ||
| p.processingRangesMutex.RUnlock() | ||
|
|
||
| for i := 0; i < p.parallelPollers; i++ { | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| for { | ||
| select { | ||
| case <-pollCtx.Done(): | ||
| return | ||
| case _, ok := <-tasks: | ||
| if !ok { | ||
| return | ||
| } | ||
|
|
||
| blockNumbers, err := p.getNextBlockRange(pollCtx) | ||
|
|
||
| if err != nil { | ||
| if err != ErrNoNewBlocks { | ||
| log.Error().Err(err).Msg("Failed to get block range to poll") | ||
| } | ||
| continue | ||
| } | ||
|
|
||
| if pollCtx.Err() != nil { | ||
| return | ||
| } | ||
|
|
||
| lastPolledBlock, err := p.poll(pollCtx, blockNumbers) | ||
| if err != nil { | ||
| log.Error().Err(err).Msg("Failed to poll blocks") | ||
| continue | ||
| } | ||
|
|
||
| if p.reachedPollLimit(lastPolledBlock) { | ||
| log.Info().Msgf("Reached poll limit at block %s, completing poller", lastPolledBlock.String()) | ||
| return | ||
| } | ||
| } | ||
| } | ||
| }() | ||
| if isProcessing { | ||
| return nil, fmt.Errorf("range %s already being processed", rangeKey) | ||
| } | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| p.shutdown(cancel, tasks, &wg) | ||
| return | ||
| case <-ticker.C: | ||
| select { | ||
| case tasks <- struct{}{}: | ||
| default: | ||
| // Channel full, skip this tick | ||
| } | ||
| } | ||
| } | ||
| } | ||
| p.markRangeAsProcessing(rangeKey) | ||
| defer p.unmarkRangeAsProcessing(rangeKey) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Eliminate check-then-mark race for processing ranges.
Two-step read then write can double-queue under contention.
- // Check if already processing
- p.processingRangesMutex.RLock()
- isProcessing := p.processingRanges[rangeKey]
- p.processingRangesMutex.RUnlock()
-
- if isProcessing {
- return nil, fmt.Errorf("range %s already being processed", rangeKey)
- }
-
- p.markRangeAsProcessing(rangeKey)
+ // Atomically try to mark as processing
+ if !p.tryMarkRange(rangeKey) {
+ return nil, fmt.Errorf("range %s already being processed", rangeKey)
+ }
defer p.unmarkRangeAsProcessing(rangeKey)Add helper:
+func (p *Poller) tryMarkRange(rangeKey string) bool {
+ p.processingRangesMutex.Lock()
+ defer p.processingRangesMutex.Unlock()
+ if p.processingRanges[rangeKey] {
+ return false
+ }
+ p.processingRanges[rangeKey] = true
+ return true
+}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Check if already processing | |
| p.processingRangesMutex.RLock() | |
| isProcessing := p.processingRanges[rangeKey] | |
| p.processingRangesMutex.RUnlock() | |
| for i := 0; i < p.parallelPollers; i++ { | |
| wg.Add(1) | |
| go func() { | |
| defer wg.Done() | |
| for { | |
| select { | |
| case <-pollCtx.Done(): | |
| return | |
| case _, ok := <-tasks: | |
| if !ok { | |
| return | |
| } | |
| blockNumbers, err := p.getNextBlockRange(pollCtx) | |
| if err != nil { | |
| if err != ErrNoNewBlocks { | |
| log.Error().Err(err).Msg("Failed to get block range to poll") | |
| } | |
| continue | |
| } | |
| if pollCtx.Err() != nil { | |
| return | |
| } | |
| lastPolledBlock, err := p.poll(pollCtx, blockNumbers) | |
| if err != nil { | |
| log.Error().Err(err).Msg("Failed to poll blocks") | |
| continue | |
| } | |
| if p.reachedPollLimit(lastPolledBlock) { | |
| log.Info().Msgf("Reached poll limit at block %s, completing poller", lastPolledBlock.String()) | |
| return | |
| } | |
| } | |
| } | |
| }() | |
| if isProcessing { | |
| return nil, fmt.Errorf("range %s already being processed", rangeKey) | |
| } | |
| for { | |
| select { | |
| case <-ctx.Done(): | |
| p.shutdown(cancel, tasks, &wg) | |
| return | |
| case <-ticker.C: | |
| select { | |
| case tasks <- struct{}{}: | |
| default: | |
| // Channel full, skip this tick | |
| } | |
| } | |
| } | |
| } | |
| p.markRangeAsProcessing(rangeKey) | |
| defer p.unmarkRangeAsProcessing(rangeKey) | |
| // Atomically try to mark as processing | |
| if !p.tryMarkRange(rangeKey) { | |
| return nil, fmt.Errorf("range %s already being processed", rangeKey) | |
| } | |
| defer p.unmarkRangeAsProcessing(rangeKey) |
Summary by CodeRabbit
New Features
Chores