-
Notifications
You must be signed in to change notification settings - Fork 28
Fix parallel mode worker state #272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (3)
internal/orchestrator/committer.go (3)
254-269: Potential nil dereference and logging-before-error-check when querying GetMaxBlockNumber.
- You log latestCommittedBlockNumber.String() before checking err and whether the value is nil, which can panic.
- You also call Sign() without guarding against a nil pointer.
Apply this diff to fix both issues:
- latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) - log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String()) - if err != nil { - return nil, err - } + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) + if err != nil { + return nil, err + } + if latestCommittedBlockNumber != nil { + log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String()) + } else { + log.Debug().Msg("Committer found nil max block number in main storage") + } -if latestCommittedBlockNumber.Sign() == 0 { +if latestCommittedBlockNumber == nil || latestCommittedBlockNumber.Sign() == 0 { // If no blocks have been committed yet, start from the fromBlock specified in the config latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) } else { lastCommitted := new(big.Int).SetUint64(c.lastCommittedBlock.Load()) if latestCommittedBlockNumber.Cmp(lastCommitted) < 0 { log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), lastCommitted.String()) return []*big.Int{}, nil } }
293-329: Potential nil dereference and misleading variable name in getBlockNumbersToPublish.
- You log lastestPublishedBlockNumber.String() before checking err and nil, which can panic.
- You call Sign() on a potentially nil pointer.
- Variable name “lastestPublishedBlockNumber” is a typo; prefer “latestPublishedBlockNumber” for clarity.
Apply this diff:
- lastestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID()) - log.Debug().Msgf("Committer found this last published block number in staging storage: %s", lastestPublishedBlockNumber.String()) - if err != nil { - return nil, err - } + latestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID()) + if err != nil { + return nil, err + } + if latestPublishedBlockNumber != nil { + log.Debug().Msgf("Committer found this last published block number in staging storage: %s", latestPublishedBlockNumber.String()) + } else { + log.Debug().Msg("Committer found nil last published block number in staging storage") + } -if lastestPublishedBlockNumber.Sign() == 0 { +if latestPublishedBlockNumber == nil || latestPublishedBlockNumber.Sign() == 0 { // If no blocks have been committed yet, start from the fromBlock specified in the config - lastestPublishedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) + latestPublishedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) } else { lastPublished := new(big.Int).SetUint64(c.lastPublishedBlock.Load()) - if lastestPublishedBlockNumber.Cmp(lastPublished) < 0 { + if latestPublishedBlockNumber.Cmp(lastPublished) < 0 { log.Warn().Msgf("Max block in storage (%s) is less than last published block in memory (%s).", lastestPublishedBlockNumber.String(), lastPublished.String()) return []*big.Int{}, nil } } -startBlock := new(big.Int).Add(lastestPublishedBlockNumber, big.NewInt(1)) -endBlock, err := c.getBlockToCommitUntil(ctx, lastestPublishedBlockNumber) +startBlock := new(big.Int).Add(latestPublishedBlockNumber, big.NewInt(1)) +endBlock, err := c.getBlockToCommitUntil(ctx, latestPublishedBlockNumber)
426-432: Metric for first missed block is incorrect.You’re recording blocksData[0].Block.Number, which is the first present block, not the first missed one. Use expectedBlockNumber.
- // record the first missed block number in prometheus - metrics.MissedBlockNumbers.Set(float64(blocksData[0].Block.Number.Int64())) + // record the first missed block number in prometheus + metrics.MissedBlockNumbers.Set(float64(expectedBlockNumber.Int64()))
🧹 Nitpick comments (5)
internal/orchestrator/committer.go (5)
105-126: Startup alignment logic is sensible; consider persisting when lastPublished is unset.Today, when lastPublished is nil/zero you only set the in-memory lastPublishedBlock to match lastCommitted, but you don’t persist it to staging. In parallel mode, that can cause the publisher to start again from commitFromBlock (staging still at 0) and re-publish older data.
- Either persist the aligned lastPublished to staging when lastPublished is nil/zero, or confirm that downstream deduplication makes this safe and acceptable.
Would you like me to provide a patch that also updates StagingStorage when lastPublished is nil/zero to prevent potential duplicate publishes on restart?
162-176: Only the commit loop drains workMode updates; publisher responsiveness may lag.Since runPublishLoop doesn't read from workModeChan, mode changes are only applied when the commit loop’s select runs. During the default branch’s sleep, mode switches can take up to a full interval to reflect on the publisher. Consider:
- Moving workMode channel handling to a dedicated goroutine in Start, or
- Replacing time.Sleep with a time.Ticker and including its channel in the select to avoid blocking other cases.
This will improve responsiveness and avoid potential back-pressure on the sender (if the channel is unbuffered).
451-459: Nit: incorrect error message in getSequentialBlockDataToPublish.The error says “commit” but this function is for publish; update for clarity.
- return nil, fmt.Errorf("error determining blocks to commit: %v", err) + return nil, fmt.Errorf("error determining blocks to publish: %v", err)
531-555: Skip gap handling in live mode: LGTM; consider handling poller errors (if any).The live-mode short-circuit is correct. If Poll returns an error, consider logging it to aid triage during backfill gap recovery.
Do you want me to add defensive logging around poller.Poll if its signature returns an error?
202-215: Replace time.Sleep with time.Ticker in both loops to avoid delaying mode updates.Sleeping in the default branch prevents the select from serving workMode updates until the sleep ends. Using time.Ticker (and selecting on ticker.C alongside ctx.Done and the mode channel) will improve responsiveness without extra complexity.
I can provide a patch converting both loops to ticker-based selects if you’d like.
Also applies to: 162-185
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these settings in your CodeRabbit configuration.
📒 Files selected for processing (1)
internal/orchestrator/committer.go(7 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
internal/orchestrator/committer.go (1)
internal/orchestrator/work_mode_monitor.go (2)
WorkModeBackfill(22-22)WorkModeLive(21-21)
🔇 Additional comments (5)
internal/orchestrator/committer.go (5)
34-34: Thread-safe workMode access: good addition.Protecting workMode with an RWMutex is the right call to eliminate races across commit/publish loops.
179-185: Mode gating in commit loop: LGTM.Skipping work when mode is unset is appropriate and prevents undefined behavior during initialization.
209-215: Mode gating in publish loop: LGTM.Same rationale as commit loop; avoids work until the system is ready.
333-350: Mode-aware “until” computation: LGTM.Backfill mode returning computed window and live mode clamping to RPC latest block is correct.
353-379: Mode-aware data source selection: LGTM.Using staging for backfill and a poller for live mode is appropriate. The warning + handleMissingStagingData on gaps in backfill is a nice touch.
Summary by CodeRabbit
Bug Fixes
Refactor