Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 64 additions & 38 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,38 +96,62 @@ func (c *Committer) Start(ctx context.Context) {
c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64())
}

// Initialize publisher position - always use max(lastPublished, lastCommitted) to prevent double publishing
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
if err != nil {
// It's okay to fail silently here; it's only used for staging cleanup and will be
// corrected by the worker loop.
log.Error().Err(err).Msg("failed to get last published block number")
} else if lastPublished != nil && lastPublished.Sign() > 0 {
// Always ensure publisher starts from at least the committed value
log.Error().Err(err).Msg("Failed to get last published block number from storage")
// If we can't read, assume we need to start from the beginning
lastPublished = nil
}

// Determine the correct publish position - always take the maximum to avoid going backwards
var targetPublishBlock *big.Int

if lastPublished == nil || lastPublished.Sign() == 0 {
// No previous publish position
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
if lastPublished.Cmp(latestCommittedBlockNumber) < 0 {
gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
log.Warn().
Str("last_published", lastPublished.String()).
Str("latest_committed", latestCommittedBlockNumber.String()).
Str("gap", gap.String()).
Msg("Publisher is behind committed position, seeking forward to committed value")

c.lastPublishedBlock.Store(latestCommittedBlockNumber.Uint64())
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, latestCommittedBlockNumber); err != nil {
log.Error().Err(err).Msg("Failed to update last published block number after seeking forward")
// Fall back to the stored value on error
c.lastPublishedBlock.Store(lastPublished.Uint64())
}
} else {
c.lastPublishedBlock.Store(lastPublished.Uint64())
}
// Start from committed position
targetPublishBlock = latestCommittedBlockNumber
} else if c.commitFromBlock.Sign() > 0 {
// Start from configured position minus 1 (since we publish from next block)
targetPublishBlock = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
} else {
c.lastPublishedBlock.Store(lastPublished.Uint64())
// Start from 0
targetPublishBlock = big.NewInt(0)
}

log.Info().
Str("target_publish_block", targetPublishBlock.String()).
Msg("No previous publish position, initializing publisher cursor")
} else {
c.lastPublishedBlock.Store(c.lastCommittedBlock.Load())
// We have a previous position
targetPublishBlock = lastPublished
}

// Only update storage if we're changing the position
if lastPublished == nil || targetPublishBlock.Cmp(lastPublished) != 0 {
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, targetPublishBlock); err != nil {
log.Error().Err(err).Msg("Failed to update published block number in storage")
// If we can't update storage, use what was there originally to avoid issues
if lastPublished != nil {
targetPublishBlock = lastPublished
}
}
}

// Store in memory for quick acess
c.lastPublishedBlock.Store(targetPublishBlock.Uint64())

log.Info().
Str("publish_from", targetPublishBlock.String()).
Str("committed_at", func() string {
if latestCommittedBlockNumber != nil {
return latestCommittedBlockNumber.String()
}
return "0"
}()).
Msg("Publisher initialized")

c.cleanupProcessedStagingBlocks()

if config.Cfg.Publisher.Mode == "parallel" {
Expand Down Expand Up @@ -290,25 +314,27 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
}

func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, error) {
lastestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID())
log.Debug().Msgf("Committer found this last published block number in staging storage: %s", lastestPublishedBlockNumber.String())
// Get the last published block from storage (which was already corrected in Start)
latestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID())
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get last published block number: %v", err)
}

if lastestPublishedBlockNumber.Sign() == 0 {
// If no blocks have been committed yet, start from the fromBlock specified in the config
lastestPublishedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
} else {
lastPublished := new(big.Int).SetUint64(c.lastPublishedBlock.Load())
if lastestPublishedBlockNumber.Cmp(lastPublished) < 0 {
log.Warn().Msgf("Max block in storage (%s) is less than last published block in memory (%s).", lastestPublishedBlockNumber.String(), lastPublished.String())
return []*big.Int{}, nil
}
// This should never happen after Start() has run, but handle it defensively
if latestPublishedBlockNumber == nil || latestPublishedBlockNumber.Sign() == 0 {
// Fall back to in-memory value which was set during Start
latestPublishedBlockNumber = new(big.Int).SetUint64(c.lastPublishedBlock.Load())
log.Warn().
Str("fallback_value", latestPublishedBlockNumber.String()).
Msg("Storage returned nil/0 for last published block, using in-memory value")
}

startBlock := new(big.Int).Add(lastestPublishedBlockNumber, big.NewInt(1))
endBlock, err := c.getBlockToCommitUntil(ctx, lastestPublishedBlockNumber)
log.Debug().
Str("last_published", latestPublishedBlockNumber.String()).
Msg("Determining blocks to publish")

startBlock := new(big.Int).Add(latestPublishedBlockNumber, big.NewInt(1))
endBlock, err := c.getBlockToCommitUntil(ctx, latestPublishedBlockNumber)
if err != nil {
return nil, fmt.Errorf("error getting block to commit until: %v", err)
}
Expand Down