Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 66 additions & 29 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,38 +96,73 @@ func (c *Committer) Start(ctx context.Context) {
c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64())
}

// Initialize publisher position - always use max(lastPublished, lastCommitted) to prevent double publishing
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
if err != nil {
// It's okay to fail silently here; it's only used for staging cleanup and will be
// corrected by the worker loop.
log.Error().Err(err).Msg("failed to get last published block number")
} else if lastPublished != nil && lastPublished.Sign() > 0 {
// Always ensure publisher starts from at least the committed value
log.Error().Err(err).Msg("Failed to get last published block number from storage")
// If we can't read, assume we need to start from the beginning
lastPublished = nil
}

// Determine the correct publish position - always take the maximum to avoid going backwards
var targetPublishBlock *big.Int

if lastPublished == nil || lastPublished.Sign() == 0 {
// No previous publish position
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
// Start from committed position
targetPublishBlock = latestCommittedBlockNumber
} else if c.commitFromBlock.Sign() > 0 {
// Start from configured position minus 1 (since we publish from next block)
targetPublishBlock = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
} else {
// Start from 0
targetPublishBlock = big.NewInt(0)
}

log.Info().
Str("target_publish_block", targetPublishBlock.String()).
Msg("No previous publish position, initializing publisher cursor")
} else {
// We have a previous position - use max(lastPublished, lastCommitted)
targetPublishBlock = lastPublished
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
if lastPublished.Cmp(latestCommittedBlockNumber) < 0 {
if latestCommittedBlockNumber.Cmp(lastPublished) > 0 {
gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
log.Warn().
Str("last_published", lastPublished.String()).
Str("latest_committed", latestCommittedBlockNumber.String()).
Str("gap", gap.String()).
Msg("Publisher is behind committed position, seeking forward to committed value")
targetPublishBlock = latestCommittedBlockNumber
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Advancing publisher to latestCommitted can skip unpublished backlog in parallel mode

Using max(lastPublished, latestCommitted) unconditionally drops any unpublished-but-committed backlog on cold start or restart. That’s fine for default (commit->publish) mode, but in parallel mode this can cause data loss for downstream consumers expecting at-least-once delivery.

Gate the advancement by Publisher.Mode so we only fast-forward in default mode, and preserve backlog in parallel mode.

Would you confirm the intended delivery semantics? If at-least-once is required for parallel mode, apply:

-    // We have a previous position - use max(lastPublished, lastCommitted)
+    // We have a previous position
     targetPublishBlock = lastPublished
     if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
-      if latestCommittedBlockNumber.Cmp(lastPublished) > 0 {
-        gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
-        log.Warn().
-          Str("last_published", lastPublished.String()).
-          Str("latest_committed", latestCommittedBlockNumber.String()).
-          Str("gap", gap.String()).
-          Msg("Publisher is behind committed position, seeking forward to committed value")
-        targetPublishBlock = latestCommittedBlockNumber
-      }
+      if latestCommittedBlockNumber.Cmp(lastPublished) > 0 {
+        gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
+        if config.Cfg.Publisher.Mode == "default" {
+          log.Warn().
+            Str("last_published", lastPublished.String()).
+            Str("latest_committed", latestCommittedBlockNumber.String()).
+            Str("gap", gap.String()).
+            Msg("Publisher is behind committed position; advancing to committed value (default mode)")
+          targetPublishBlock = latestCommittedBlockNumber
+        } else {
+          log.Warn().
+            Str("last_published", lastPublished.String()).
+            Str("latest_committed", latestCommittedBlockNumber.String()).
+            Str("gap", gap.String()).
+            Msg("Publisher is behind committed position; not advancing in parallel mode to avoid skipping backlog")
+        }
+      }
     }

🏁 Script executed:

#!/bin/bash
set -e
# Display the imports and config references in internal/orchestrator/committer.go
sed -n '1,200p' internal/orchestrator/committer.go

Length of output: 6309


🏁 Script executed:

#!/bin/bash
set -e
# Locate the definition of Cfg in the configs package
rg -n "type Cfg" -n configs

# Find where Publisher.Mode is defined within config.Cfg
rg -n "Publisher" -n configs

Length of output: 38


🏁 Script executed:

#!/bin/bash
set -e
# List all files in the configs package
echo "Config files:"
ls -R configs

# Search for the global Cfg variable
echo -e "\nSearching for Cfg declaration:"
grep -R "var Cfg" -n configs || true
grep -R "Cfg =" -n configs || true

# Search for Publisher settings within configs
echo -e "\nSearching for Publisher in configs:"
grep -R "Publisher" -n configs || true
grep -R "Mode" -n configs || true

Length of output: 2290


🏁 Script executed:

#!/bin/bash
set -e
# Show runCommitLoop implementation to verify publish behavior in default mode
rg -n "func (c \\*Committer) runCommitLoop" -n internal/orchestrator/committer.go -A 50

Length of output: 97


Gate publisher initialization by mode to preserve backlog in parallel mode

Unconditionally advancing targetPublishBlock to latestCommittedBlockNumber on startup will drop any unpublished-but-committed blocks when running in parallel mode, breaking at-least-once delivery guarantees. We should only fast-forward in the default (commit→publish) mode and preserve the backlog in parallel mode.

• File: internal/orchestrator/committer.go
• Location: In the Start method, around the “use max(lastPublished, lastCommitted)” block

Proposed change:

-       // We have a previous position - use max(lastPublished, lastCommitted)
+       // We have a previous position
        targetPublishBlock = lastPublished
        if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
-           if latestCommittedBlockNumber.Cmp(lastPublished) > 0 {
-               gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
-               log.Warn().
-                   Str("last_published", lastPublished.String()).
-                   Str("latest_committed", latestCommittedBlockNumber.String()).
-                   Str("gap", gap.String()).
-                   Msg("Publisher is behind committed position, seeking forward to committed value")
-               targetPublishBlock = latestCommittedBlockNumber
-           }
+           if latestCommittedBlockNumber.Cmp(lastPublished) > 0 {
+               gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
+               if config.Cfg.Publisher.Mode == "default" {
+                   log.Warn().
+                       Str("last_published", lastPublished.String()).
+                       Str("latest_committed", latestCommittedBlockNumber.String()).
+                       Str("gap", gap.String()).
+                       Msg("Publisher is behind committed position; advancing to committed value (default mode)")
+                   targetPublishBlock = latestCommittedBlockNumber
+               } else {
+                   log.Warn().
+                       Str("last_published", lastPublished.String()).
+                       Str("latest_committed", latestCommittedBlockNumber.String()).
+                       Str("gap", gap.String()).
+                       Msg("Publisher is behind committed position; not advancing in parallel mode to avoid skipping backlog")
+               }
+           }
        }

Please confirm that at-least-once delivery is required when running in parallel mode. If so, apply the above change to prevent skipping any unpublished blocks.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// We have a previous position - use max(lastPublished, lastCommitted)
targetPublishBlock = lastPublished
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
if lastPublished.Cmp(latestCommittedBlockNumber) < 0 {
if latestCommittedBlockNumber.Cmp(lastPublished) > 0 {
gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
log.Warn().
Str("last_published", lastPublished.String()).
Str("latest_committed", latestCommittedBlockNumber.String()).
Str("gap", gap.String()).
Msg("Publisher is behind committed position, seeking forward to committed value")
targetPublishBlock = latestCommittedBlockNumber
}
}
// We have a previous position
targetPublishBlock = lastPublished
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
if latestCommittedBlockNumber.Cmp(lastPublished) > 0 {
gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
if config.Cfg.Publisher.Mode == "default" {
log.Warn().
Str("last_published", lastPublished.String()).
Str("latest_committed", latestCommittedBlockNumber.String()).
Str("gap", gap.String()).
Msg("Publisher is behind committed position; advancing to committed value (default mode)")
targetPublishBlock = latestCommittedBlockNumber
} else {
log.Warn().
Str("last_published", lastPublished.String()).
Str("latest_committed", latestCommittedBlockNumber.String()).
Str("gap", gap.String()).
Msg("Publisher is behind committed position; not advancing in parallel mode to avoid skipping backlog")
}
}
}
🤖 Prompt for AI Agents
In internal/orchestrator/committer.go around lines 127 to 139, the logic
unconditionally advances targetPublishBlock to latestCommittedBlockNumber which
will drop unpublished-but-committed blocks in parallel mode; change it to only
fast‑forward when the orchestrator is running in the default commit→publish mode
(i.e., when mode != "parallel" or when a boolean like commitThenPublish is
true). Concretely, add a gate checking the configured mode before comparing and
assigning latestCommittedBlockNumber to targetPublishBlock so that in parallel
mode you leave targetPublishBlock as lastPublished and preserve the backlog for
at‑least‑once delivery.

}

c.lastPublishedBlock.Store(latestCommittedBlockNumber.Uint64())
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, latestCommittedBlockNumber); err != nil {
log.Error().Err(err).Msg("Failed to update last published block number after seeking forward")
// Fall back to the stored value on error
c.lastPublishedBlock.Store(lastPublished.Uint64())
}
} else {
c.lastPublishedBlock.Store(lastPublished.Uint64())
// Only update storage if we're changing the position
if lastPublished == nil || targetPublishBlock.Cmp(lastPublished) != 0 {
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, targetPublishBlock); err != nil {
log.Error().Err(err).Msg("Failed to update published block number in storage")
// If we can't update storage, use what was there originally to avoid issues
if lastPublished != nil {
targetPublishBlock = lastPublished
}
} else {
c.lastPublishedBlock.Store(lastPublished.Uint64())
}
} else {
c.lastPublishedBlock.Store(c.lastCommittedBlock.Load())
}

// Store in memory for quick access
c.lastPublishedBlock.Store(targetPublishBlock.Uint64())

log.Info().
Str("publish_from", targetPublishBlock.String()).
Str("committed_at", func() string {
if latestCommittedBlockNumber != nil {
return latestCommittedBlockNumber.String()
}
return "0"
}()).
Msg("Publisher initialized")

c.cleanupProcessedStagingBlocks()

if config.Cfg.Publisher.Mode == "parallel" {
Expand Down Expand Up @@ -290,23 +325,25 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
}

func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, error) {
// Get the last published block from storage (which was already corrected in Start)
lastestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID())
log.Debug().Msgf("Committer found this last published block number in staging storage: %s", lastestPublishedBlockNumber.String())
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get last published block number: %v", err)
}

if lastestPublishedBlockNumber.Sign() == 0 {
// If no blocks have been committed yet, start from the fromBlock specified in the config
lastestPublishedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
} else {
lastPublished := new(big.Int).SetUint64(c.lastPublishedBlock.Load())
if lastestPublishedBlockNumber.Cmp(lastPublished) < 0 {
log.Warn().Msgf("Max block in storage (%s) is less than last published block in memory (%s).", lastestPublishedBlockNumber.String(), lastPublished.String())
return []*big.Int{}, nil
}
// This should never happen after Start() has run, but handle it defensively
if lastestPublishedBlockNumber == nil || lastestPublishedBlockNumber.Sign() == 0 {
// Fall back to in-memory value which was set during Start
lastestPublishedBlockNumber = new(big.Int).SetUint64(c.lastPublishedBlock.Load())
log.Warn().
Str("fallback_value", lastestPublishedBlockNumber.String()).
Msg("Storage returned nil/0 for last published block, using in-memory value")
}

log.Debug().
Str("last_published", lastestPublishedBlockNumber.String()).
Msg("Determining blocks to publish")

startBlock := new(big.Int).Add(lastestPublishedBlockNumber, big.NewInt(1))
endBlock, err := c.getBlockToCommitUntil(ctx, lastestPublishedBlockNumber)
if err != nil {
Expand Down