-
Notifications
You must be signed in to change notification settings - Fork 28
Init publisher value on start #273
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughInitializes and advances the publisher cursor using max(lastPublished, latestCommitted) with safe fallbacks on storage read/write failures; adds defensive handling and logging in getBlockNumbersToPublish; retains staging cleanup and does not change any public APIs. Changes
Sequence Diagram(s)sequenceDiagram
participant C as Committer
participant S as Storage
Note over C: Start()
C->>S: Read lastPublished
alt read fails
C->>C: Treat as no previous position
C->>C: Choose targetPublishBlock from: latestCommitted (if >0) OR commitFromBlock-1 (if set) OR 0
else read ok
C->>C: targetPublishBlock = max(stored lastPublished, latestCommitted)
end
C->>C: Emit "Publisher initialized" (publish_from, committed_at)
alt target differs from in-memory
C->>S: Write updated lastPublished
alt write fails
C->>C: Revert to previous in-memory lastPublished
end
end
C->>C: Set in-memory lastPublished and cleanup staging
sequenceDiagram
participant C as Committer
participant S as Storage
Note over C: getBlockNumbersToPublish()
C->>S: Read lastPublished
alt read error or returned nil/0
C->>C: Log warning and fallback to in-memory lastPublished
else ok
C->>C: Use stored lastPublished
end
C->>C: Compute startBlock = latestPublished + 1
C->>C: Compute endBlock = getBlockToCommitUntil(ctx, latestPublished)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
internal/orchestrator/committer.go (1)
289-293: Nil dereference risk: logging before error/nil checksYou log latestCommittedBlockNumber.String() before checking err or nil, which can panic.
Apply:
- latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) - log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String()) - if err != nil { - return nil, err - } + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) + if err != nil { + return nil, err + } + if latestCommittedBlockNumber == nil { + return nil, fmt.Errorf("GetMaxBlockNumber returned nil without error") + } + log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
🧹 Nitpick comments (3)
internal/orchestrator/committer.go (3)
343-345: Reduce log severity to avoid recurring warnings on hot pathThe fallback warning will trigger every publish loop until storage is healed. If you don’t adopt the self-heal, consider downgrading to Info/Debug to avoid log noise.
Apply if not self-healing:
- log.Warn(). + log.Info(). Str("fallback_value", lastestPublishedBlockNumber.String()). Msg("Storage returned nil/0 for last published block, using in-memory value")
347-351: Publishing range should be bounded by committed progress to avoid publishing ahead of commitCurrently endBlock is derived solely from RPC latest (via getBlockToCommitUntil). In parallel mode, this allows the publisher to run ahead of commits, which might break downstream expectations that published data is committed. Consider bounding endBlock by the latest committed block in main storage.
Example change:
- endBlock, err := c.getBlockToCommitUntil(ctx, latestPublishedBlockNumber) + endBlock, err := c.getBlockToCommitUntil(ctx, latestPublishedBlockNumber) if err != nil { return nil, fmt.Errorf("error getting block to commit until: %v", err) } + // Bound by committed height to prevent publishing ahead of commit + if msMax, msErr := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()); msErr == nil && msMax != nil && msMax.Sign() > 0 && msMax.Cmp(endBlock) < 0 { + endBlock = msMax + }
489-492: Minor: fix error string to say “publish” not “commit”This is in publish path; the error message says “blocks to commit”.
Apply:
- return nil, fmt.Errorf("error determining blocks to commit: %v", err) + return nil, fmt.Errorf("error determining blocks to publish: %v", err)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these settings in your CodeRabbit configuration.
📒 Files selected for processing (1)
internal/orchestrator/committer.go(2 hunks)
🔇 Additional comments (1)
internal/orchestrator/committer.go (1)
110-121: Starting from latestCommitted when no previous publish position may also drop backlogIf storage has no lastPublished (nil/0), you initialize to latestCommitted. In parallel mode, that assumes historic publishes happened and may skip them. Consider gating this too by Publisher.Mode, or at least confirm this is acceptable for your SLAs.
I can provide a mode-gated variant if you confirm the desired behavior in parallel mode.
internal/orchestrator/committer.go
Outdated
| // We have a previous position - use max(lastPublished, lastCommitted) | ||
| targetPublishBlock = lastPublished | ||
| if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { | ||
| if lastPublished.Cmp(latestCommittedBlockNumber) < 0 { | ||
| if latestCommittedBlockNumber.Cmp(lastPublished) > 0 { | ||
| gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished) | ||
| log.Warn(). | ||
| Str("last_published", lastPublished.String()). | ||
| Str("latest_committed", latestCommittedBlockNumber.String()). | ||
| Str("gap", gap.String()). | ||
| Msg("Publisher is behind committed position, seeking forward to committed value") | ||
| targetPublishBlock = latestCommittedBlockNumber | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Advancing publisher to latestCommitted can skip unpublished backlog in parallel mode
Using max(lastPublished, latestCommitted) unconditionally drops any unpublished-but-committed backlog on cold start or restart. That’s fine for default (commit->publish) mode, but in parallel mode this can cause data loss for downstream consumers expecting at-least-once delivery.
Gate the advancement by Publisher.Mode so we only fast-forward in default mode, and preserve backlog in parallel mode.
Would you confirm the intended delivery semantics? If at-least-once is required for parallel mode, apply:
- // We have a previous position - use max(lastPublished, lastCommitted)
+ // We have a previous position
targetPublishBlock = lastPublished
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
- if latestCommittedBlockNumber.Cmp(lastPublished) > 0 {
- gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
- log.Warn().
- Str("last_published", lastPublished.String()).
- Str("latest_committed", latestCommittedBlockNumber.String()).
- Str("gap", gap.String()).
- Msg("Publisher is behind committed position, seeking forward to committed value")
- targetPublishBlock = latestCommittedBlockNumber
- }
+ if latestCommittedBlockNumber.Cmp(lastPublished) > 0 {
+ gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
+ if config.Cfg.Publisher.Mode == "default" {
+ log.Warn().
+ Str("last_published", lastPublished.String()).
+ Str("latest_committed", latestCommittedBlockNumber.String()).
+ Str("gap", gap.String()).
+ Msg("Publisher is behind committed position; advancing to committed value (default mode)")
+ targetPublishBlock = latestCommittedBlockNumber
+ } else {
+ log.Warn().
+ Str("last_published", lastPublished.String()).
+ Str("latest_committed", latestCommittedBlockNumber.String()).
+ Str("gap", gap.String()).
+ Msg("Publisher is behind committed position; not advancing in parallel mode to avoid skipping backlog")
+ }
+ }
}🏁 Script executed:
#!/bin/bash
set -e
# Display the imports and config references in internal/orchestrator/committer.go
sed -n '1,200p' internal/orchestrator/committer.goLength of output: 6309
🏁 Script executed:
#!/bin/bash
set -e
# Locate the definition of Cfg in the configs package
rg -n "type Cfg" -n configs
# Find where Publisher.Mode is defined within config.Cfg
rg -n "Publisher" -n configsLength of output: 38
🏁 Script executed:
#!/bin/bash
set -e
# List all files in the configs package
echo "Config files:"
ls -R configs
# Search for the global Cfg variable
echo -e "\nSearching for Cfg declaration:"
grep -R "var Cfg" -n configs || true
grep -R "Cfg =" -n configs || true
# Search for Publisher settings within configs
echo -e "\nSearching for Publisher in configs:"
grep -R "Publisher" -n configs || true
grep -R "Mode" -n configs || trueLength of output: 2290
🏁 Script executed:
#!/bin/bash
set -e
# Show runCommitLoop implementation to verify publish behavior in default mode
rg -n "func (c \\*Committer) runCommitLoop" -n internal/orchestrator/committer.go -A 50Length of output: 97
Gate publisher initialization by mode to preserve backlog in parallel mode
Unconditionally advancing targetPublishBlock to latestCommittedBlockNumber on startup will drop any unpublished-but-committed blocks when running in parallel mode, breaking at-least-once delivery guarantees. We should only fast-forward in the default (commit→publish) mode and preserve the backlog in parallel mode.
• File: internal/orchestrator/committer.go
• Location: In the Start method, around the “use max(lastPublished, lastCommitted)” block
Proposed change:
- // We have a previous position - use max(lastPublished, lastCommitted)
+ // We have a previous position
targetPublishBlock = lastPublished
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
- if latestCommittedBlockNumber.Cmp(lastPublished) > 0 {
- gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
- log.Warn().
- Str("last_published", lastPublished.String()).
- Str("latest_committed", latestCommittedBlockNumber.String()).
- Str("gap", gap.String()).
- Msg("Publisher is behind committed position, seeking forward to committed value")
- targetPublishBlock = latestCommittedBlockNumber
- }
+ if latestCommittedBlockNumber.Cmp(lastPublished) > 0 {
+ gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
+ if config.Cfg.Publisher.Mode == "default" {
+ log.Warn().
+ Str("last_published", lastPublished.String()).
+ Str("latest_committed", latestCommittedBlockNumber.String()).
+ Str("gap", gap.String()).
+ Msg("Publisher is behind committed position; advancing to committed value (default mode)")
+ targetPublishBlock = latestCommittedBlockNumber
+ } else {
+ log.Warn().
+ Str("last_published", lastPublished.String()).
+ Str("latest_committed", latestCommittedBlockNumber.String()).
+ Str("gap", gap.String()).
+ Msg("Publisher is behind committed position; not advancing in parallel mode to avoid skipping backlog")
+ }
+ }
}Please confirm that at-least-once delivery is required when running in parallel mode. If so, apply the above change to prevent skipping any unpublished blocks.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // We have a previous position - use max(lastPublished, lastCommitted) | |
| targetPublishBlock = lastPublished | |
| if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { | |
| if lastPublished.Cmp(latestCommittedBlockNumber) < 0 { | |
| if latestCommittedBlockNumber.Cmp(lastPublished) > 0 { | |
| gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished) | |
| log.Warn(). | |
| Str("last_published", lastPublished.String()). | |
| Str("latest_committed", latestCommittedBlockNumber.String()). | |
| Str("gap", gap.String()). | |
| Msg("Publisher is behind committed position, seeking forward to committed value") | |
| targetPublishBlock = latestCommittedBlockNumber | |
| } | |
| } | |
| // We have a previous position | |
| targetPublishBlock = lastPublished | |
| if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { | |
| if latestCommittedBlockNumber.Cmp(lastPublished) > 0 { | |
| gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished) | |
| if config.Cfg.Publisher.Mode == "default" { | |
| log.Warn(). | |
| Str("last_published", lastPublished.String()). | |
| Str("latest_committed", latestCommittedBlockNumber.String()). | |
| Str("gap", gap.String()). | |
| Msg("Publisher is behind committed position; advancing to committed value (default mode)") | |
| targetPublishBlock = latestCommittedBlockNumber | |
| } else { | |
| log.Warn(). | |
| Str("last_published", lastPublished.String()). | |
| Str("latest_committed", latestCommittedBlockNumber.String()). | |
| Str("gap", gap.String()). | |
| Msg("Publisher is behind committed position; not advancing in parallel mode to avoid skipping backlog") | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
In internal/orchestrator/committer.go around lines 127 to 139, the logic
unconditionally advances targetPublishBlock to latestCommittedBlockNumber which
will drop unpublished-but-committed blocks in parallel mode; change it to only
fast‑forward when the orchestrator is running in the default commit→publish mode
(i.e., when mode != "parallel" or when a boolean like commitThenPublish is
true). Concretely, add a gate checking the configured mode before comparing and
assigning latestCommittedBlockNumber to targetPublishBlock so that in parallel
mode you leave targetPublishBlock as lastPublished and preserve the backlog for
at‑least‑once delivery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
internal/orchestrator/committer.go (1)
278-283: Possible nil dereference in debug log; log only after error/nil checks
latestCommittedBlockNumbermay be nil whenGetMaxBlockNumberreturns an error or a nil value, but it’s dereferenced in the log before checkingerr, which can panic.- latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) - log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String()) - if err != nil { - return nil, err - } + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) + if err != nil { + return nil, err + } + if latestCommittedBlockNumber != nil { + log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String()) + } else { + log.Debug().Msg("Committer found nil max block number in main storage") + }
♻️ Duplicate comments (2)
internal/orchestrator/committer.go (2)
107-129: Don’t skip unpublished backlog in parallel mode; gate fast-forward by modeUnconditionally using the committed position when no prior publish position exists, and not considering the committed gap when a prior position exists, can either:
- skip backlog in parallel mode, or
- leave the publisher behind in default mode.
Gate advancement by mode: in default mode, advance to committed; in parallel mode, never fast-forward over unpublished backlog. Also, if there is a prior publish position and committed is ahead, only advance in default mode.
- // Determine the correct publish position - always take the maximum to avoid going backwards + // Determine the correct publish position; only fast-forward in default mode to avoid skipping backlog in parallel mode var targetPublishBlock *big.Int if lastPublished == nil || lastPublished.Sign() == 0 { - // No previous publish position - if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { - // Start from committed position - targetPublishBlock = latestCommittedBlockNumber - } else if c.commitFromBlock.Sign() > 0 { - // Start from configured position minus 1 (since we publish from next block) - targetPublishBlock = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) - } else { - // Start from 0 - targetPublishBlock = big.NewInt(0) - } - log.Info(). - Str("target_publish_block", targetPublishBlock.String()). - Msg("No previous publish position, initializing publisher cursor") + // No previous publish position + if config.Cfg.Publisher.Mode == "default" { + if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { + // Start from committed position in default mode + targetPublishBlock = latestCommittedBlockNumber + } else if c.commitFromBlock.Sign() > 0 { + // Start from configured position minus 1 (since we publish from next block) + targetPublishBlock = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) + } else { + // Start from 0 + targetPublishBlock = big.NewInt(0) + } + } else { + // Parallel mode: do not skip backlog; start from config (if set) or 0 + if c.commitFromBlock.Sign() > 0 { + targetPublishBlock = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) + } else { + targetPublishBlock = big.NewInt(0) + } + log.Warn().Msg("No previous publish position; not advancing to committed in parallel mode to avoid skipping backlog") + } + log.Info(). + Str("target_publish_block", targetPublishBlock.String()). + Msg("No previous publish position, initializing publisher cursor") } else { - // We have a previous position - targetPublishBlock = lastPublished + // We have a previous position + targetPublishBlock = lastPublished + if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 && + latestCommittedBlockNumber.Cmp(lastPublished) > 0 { + gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished) + if config.Cfg.Publisher.Mode == "default" { + log.Warn(). + Str("last_published", lastPublished.String()). + Str("latest_committed", latestCommittedBlockNumber.String()). + Str("gap", gap.String()). + Msg("Publisher behind committed position; advancing to committed value (default mode)") + targetPublishBlock = latestCommittedBlockNumber + } else { + log.Warn(). + Str("last_published", lastPublished.String()). + Str("latest_committed", latestCommittedBlockNumber.String()). + Str("gap", gap.String()). + Msg("Publisher behind committed position; not advancing in parallel mode to avoid skipping backlog") + } + } }Would you confirm that at-least-once delivery is required in parallel mode? If so, the above change preserves backlog on cold start.
317-330: Self-heal storage when falling back to in-memory lastPublished to avoid repeated warningsIf storage returns nil/0 and you switch to the in-memory value, persist that value back to storage so subsequent iterations don’t warn and so the state is consistent.
- // Get the last published block from storage (which was already corrected in Start) - latestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID()) + // Get the last published block from storage (Start attempts to correct this) + chainID := c.rpc.GetChainID() + latestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) if err != nil { return nil, fmt.Errorf("failed to get last published block number: %v", err) } // This should never happen after Start() has run, but handle it defensively if latestPublishedBlockNumber == nil || latestPublishedBlockNumber.Sign() == 0 { // Fall back to in-memory value which was set during Start latestPublishedBlockNumber = new(big.Int).SetUint64(c.lastPublishedBlock.Load()) - log.Warn(). - Str("fallback_value", latestPublishedBlockNumber.String()). - Msg("Storage returned nil/0 for last published block, using in-memory value") + // Attempt to heal storage so we don't warn on every loop + if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, latestPublishedBlockNumber); err != nil { + log.Warn().Err(err). + Str("fallback_value", latestPublishedBlockNumber.String()). + Msg("Storage returned nil/0 for last published block; using in-memory value and failed to heal storage") + } else { + log.Info(). + Str("healed_value", latestPublishedBlockNumber.String()). + Msg("Healed last published position in storage from in-memory value") + } }
🧹 Nitpick comments (3)
internal/orchestrator/committer.go (3)
99-105: Comment contradicts behavior; clarify intent or update logicThe comment says “always use max(lastPublished, lastCommitted)”, but the implementation below does not compute the max when
lastPublishedexists. Either update the comment to reflect actual behavior or implement the intended logic (see next comment for a safe, mode‑aware approach).- // Initialize publisher position - always use max(lastPublished, lastCommitted) to prevent double publishing + // Initialize publisher position. In default mode we may fast-forward to the latest committed position; + // in parallel mode we preserve any unpublished backlog.
142-144: Typo: “acess” → “access”Minor spelling fix in comment.
- // Store in memory for quick acess + // Store in memory for quick access
479-481: Nit: error message references “commit” in publish pathThis is a publish path; error text should say “publish” for clarity.
- return nil, fmt.Errorf("error determining blocks to commit: %v", err) + return nil, fmt.Errorf("error determining blocks to publish: %v", err)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these settings in your CodeRabbit configuration.
📒 Files selected for processing (1)
internal/orchestrator/committer.go(2 hunks)
🔇 Additional comments (1)
internal/orchestrator/committer.go (1)
131-140: Only persisting when the position changes is goodThis block conditionally updates storage and falls back cleanly on error. No issues.
Summary by CodeRabbit