diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 1316f0e..e88a0cc 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -96,38 +96,62 @@ func (c *Committer) Start(ctx context.Context) { c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64()) } + // Initialize publisher position - always use max(lastPublished, lastCommitted) to prevent double publishing lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) if err != nil { - // It's okay to fail silently here; it's only used for staging cleanup and will be - // corrected by the worker loop. - log.Error().Err(err).Msg("failed to get last published block number") - } else if lastPublished != nil && lastPublished.Sign() > 0 { - // Always ensure publisher starts from at least the committed value + log.Error().Err(err).Msg("Failed to get last published block number from storage") + // If we can't read, assume we need to start from the beginning + lastPublished = nil + } + + // Determine the correct publish position - always take the maximum to avoid going backwards + var targetPublishBlock *big.Int + + if lastPublished == nil || lastPublished.Sign() == 0 { + // No previous publish position if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { - if lastPublished.Cmp(latestCommittedBlockNumber) < 0 { - gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished) - log.Warn(). - Str("last_published", lastPublished.String()). - Str("latest_committed", latestCommittedBlockNumber.String()). - Str("gap", gap.String()). - Msg("Publisher is behind committed position, seeking forward to committed value") - - c.lastPublishedBlock.Store(latestCommittedBlockNumber.Uint64()) - if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, latestCommittedBlockNumber); err != nil { - log.Error().Err(err).Msg("Failed to update last published block number after seeking forward") - // Fall back to the stored value on error - c.lastPublishedBlock.Store(lastPublished.Uint64()) - } - } else { - c.lastPublishedBlock.Store(lastPublished.Uint64()) - } + // Start from committed position + targetPublishBlock = latestCommittedBlockNumber + } else if c.commitFromBlock.Sign() > 0 { + // Start from configured position minus 1 (since we publish from next block) + targetPublishBlock = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) } else { - c.lastPublishedBlock.Store(lastPublished.Uint64()) + // Start from 0 + targetPublishBlock = big.NewInt(0) } + + log.Info(). + Str("target_publish_block", targetPublishBlock.String()). + Msg("No previous publish position, initializing publisher cursor") } else { - c.lastPublishedBlock.Store(c.lastCommittedBlock.Load()) + // We have a previous position + targetPublishBlock = lastPublished + } + + // Only update storage if we're changing the position + if lastPublished == nil || targetPublishBlock.Cmp(lastPublished) != 0 { + if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, targetPublishBlock); err != nil { + log.Error().Err(err).Msg("Failed to update published block number in storage") + // If we can't update storage, use what was there originally to avoid issues + if lastPublished != nil { + targetPublishBlock = lastPublished + } + } } + // Store in memory for quick acess + c.lastPublishedBlock.Store(targetPublishBlock.Uint64()) + + log.Info(). + Str("publish_from", targetPublishBlock.String()). + Str("committed_at", func() string { + if latestCommittedBlockNumber != nil { + return latestCommittedBlockNumber.String() + } + return "0" + }()). + Msg("Publisher initialized") + c.cleanupProcessedStagingBlocks() if config.Cfg.Publisher.Mode == "parallel" { @@ -290,25 +314,27 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er } func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, error) { - lastestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID()) - log.Debug().Msgf("Committer found this last published block number in staging storage: %s", lastestPublishedBlockNumber.String()) + // Get the last published block from storage (which was already corrected in Start) + latestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID()) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get last published block number: %v", err) } - if lastestPublishedBlockNumber.Sign() == 0 { - // If no blocks have been committed yet, start from the fromBlock specified in the config - lastestPublishedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) - } else { - lastPublished := new(big.Int).SetUint64(c.lastPublishedBlock.Load()) - if lastestPublishedBlockNumber.Cmp(lastPublished) < 0 { - log.Warn().Msgf("Max block in storage (%s) is less than last published block in memory (%s).", lastestPublishedBlockNumber.String(), lastPublished.String()) - return []*big.Int{}, nil - } + // This should never happen after Start() has run, but handle it defensively + if latestPublishedBlockNumber == nil || latestPublishedBlockNumber.Sign() == 0 { + // Fall back to in-memory value which was set during Start + latestPublishedBlockNumber = new(big.Int).SetUint64(c.lastPublishedBlock.Load()) + log.Warn(). + Str("fallback_value", latestPublishedBlockNumber.String()). + Msg("Storage returned nil/0 for last published block, using in-memory value") } - startBlock := new(big.Int).Add(lastestPublishedBlockNumber, big.NewInt(1)) - endBlock, err := c.getBlockToCommitUntil(ctx, lastestPublishedBlockNumber) + log.Debug(). + Str("last_published", latestPublishedBlockNumber.String()). + Msg("Determining blocks to publish") + + startBlock := new(big.Int).Add(latestPublishedBlockNumber, big.NewInt(1)) + endBlock, err := c.getBlockToCommitUntil(ctx, latestPublishedBlockNumber) if err != nil { return nil, fmt.Errorf("error getting block to commit until: %v", err) }