From 507ef745f6d1b8c2ac6f3e30a527526417578fab Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Thu, 14 Aug 2025 19:20:23 +0000 Subject: [PATCH 1/2] Init publisher value on start --- internal/orchestrator/committer.go | 95 +++++++++++++++++++++--------- 1 file changed, 66 insertions(+), 29 deletions(-) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 1316f0e..f636339 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -96,38 +96,73 @@ func (c *Committer) Start(ctx context.Context) { c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64()) } + // Initialize publisher position - always use max(lastPublished, lastCommitted) to prevent double publishing lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) if err != nil { - // It's okay to fail silently here; it's only used for staging cleanup and will be - // corrected by the worker loop. - log.Error().Err(err).Msg("failed to get last published block number") - } else if lastPublished != nil && lastPublished.Sign() > 0 { - // Always ensure publisher starts from at least the committed value + log.Error().Err(err).Msg("Failed to get last published block number from storage") + // If we can't read, assume we need to start from the beginning + lastPublished = nil + } + + // Determine the correct publish position - always take the maximum to avoid going backwards + var targetPublishBlock *big.Int + + if lastPublished == nil || lastPublished.Sign() == 0 { + // No previous publish position + if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { + // Start from committed position + targetPublishBlock = latestCommittedBlockNumber + } else if c.commitFromBlock.Sign() > 0 { + // Start from configured position minus 1 (since we publish from next block) + targetPublishBlock = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) + } else { + // Start from 0 + targetPublishBlock = big.NewInt(0) + } + + log.Info(). + Str("target_publish_block", targetPublishBlock.String()). + Msg("No previous publish position, initializing publisher cursor") + } else { + // We have a previous position - use max(lastPublished, lastCommitted) + targetPublishBlock = lastPublished if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { - if lastPublished.Cmp(latestCommittedBlockNumber) < 0 { + if latestCommittedBlockNumber.Cmp(lastPublished) > 0 { gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished) log.Warn(). Str("last_published", lastPublished.String()). Str("latest_committed", latestCommittedBlockNumber.String()). Str("gap", gap.String()). Msg("Publisher is behind committed position, seeking forward to committed value") + targetPublishBlock = latestCommittedBlockNumber + } + } + } - c.lastPublishedBlock.Store(latestCommittedBlockNumber.Uint64()) - if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, latestCommittedBlockNumber); err != nil { - log.Error().Err(err).Msg("Failed to update last published block number after seeking forward") - // Fall back to the stored value on error - c.lastPublishedBlock.Store(lastPublished.Uint64()) - } - } else { - c.lastPublishedBlock.Store(lastPublished.Uint64()) + // Only update storage if we're changing the position + if lastPublished == nil || targetPublishBlock.Cmp(lastPublished) != 0 { + if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, targetPublishBlock); err != nil { + log.Error().Err(err).Msg("Failed to update published block number in storage") + // If we can't update storage, use what was there originally to avoid issues + if lastPublished != nil { + targetPublishBlock = lastPublished } - } else { - c.lastPublishedBlock.Store(lastPublished.Uint64()) } - } else { - c.lastPublishedBlock.Store(c.lastCommittedBlock.Load()) } + // Store in memory for quick access + c.lastPublishedBlock.Store(targetPublishBlock.Uint64()) + + log.Info(). + Str("publish_from", targetPublishBlock.String()). + Str("committed_at", func() string { + if latestCommittedBlockNumber != nil { + return latestCommittedBlockNumber.String() + } + return "0" + }()). + Msg("Publisher initialized") + c.cleanupProcessedStagingBlocks() if config.Cfg.Publisher.Mode == "parallel" { @@ -290,23 +325,25 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er } func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, error) { + // Get the last published block from storage (which was already corrected in Start) lastestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID()) - log.Debug().Msgf("Committer found this last published block number in staging storage: %s", lastestPublishedBlockNumber.String()) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get last published block number: %v", err) } - if lastestPublishedBlockNumber.Sign() == 0 { - // If no blocks have been committed yet, start from the fromBlock specified in the config - lastestPublishedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) - } else { - lastPublished := new(big.Int).SetUint64(c.lastPublishedBlock.Load()) - if lastestPublishedBlockNumber.Cmp(lastPublished) < 0 { - log.Warn().Msgf("Max block in storage (%s) is less than last published block in memory (%s).", lastestPublishedBlockNumber.String(), lastPublished.String()) - return []*big.Int{}, nil - } + // This should never happen after Start() has run, but handle it defensively + if lastestPublishedBlockNumber == nil || lastestPublishedBlockNumber.Sign() == 0 { + // Fall back to in-memory value which was set during Start + lastestPublishedBlockNumber = new(big.Int).SetUint64(c.lastPublishedBlock.Load()) + log.Warn(). + Str("fallback_value", lastestPublishedBlockNumber.String()). + Msg("Storage returned nil/0 for last published block, using in-memory value") } + log.Debug(). + Str("last_published", lastestPublishedBlockNumber.String()). + Msg("Determining blocks to publish") + startBlock := new(big.Int).Add(lastestPublishedBlockNumber, big.NewInt(1)) endBlock, err := c.getBlockToCommitUntil(ctx, lastestPublishedBlockNumber) if err != nil { From 3826c5131f87580074ce64581d6fd73a0eafef57 Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Thu, 14 Aug 2025 19:54:07 +0000 Subject: [PATCH 2/2] Remove max(publish, commit) to support older publish --- internal/orchestrator/committer.go | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index f636339..e88a0cc 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -124,19 +124,8 @@ func (c *Committer) Start(ctx context.Context) { Str("target_publish_block", targetPublishBlock.String()). Msg("No previous publish position, initializing publisher cursor") } else { - // We have a previous position - use max(lastPublished, lastCommitted) + // We have a previous position targetPublishBlock = lastPublished - if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { - if latestCommittedBlockNumber.Cmp(lastPublished) > 0 { - gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished) - log.Warn(). - Str("last_published", lastPublished.String()). - Str("latest_committed", latestCommittedBlockNumber.String()). - Str("gap", gap.String()). - Msg("Publisher is behind committed position, seeking forward to committed value") - targetPublishBlock = latestCommittedBlockNumber - } - } } // Only update storage if we're changing the position @@ -150,7 +139,7 @@ func (c *Committer) Start(ctx context.Context) { } } - // Store in memory for quick access + // Store in memory for quick acess c.lastPublishedBlock.Store(targetPublishBlock.Uint64()) log.Info(). @@ -326,26 +315,26 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, error) { // Get the last published block from storage (which was already corrected in Start) - lastestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID()) + latestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID()) if err != nil { return nil, fmt.Errorf("failed to get last published block number: %v", err) } // This should never happen after Start() has run, but handle it defensively - if lastestPublishedBlockNumber == nil || lastestPublishedBlockNumber.Sign() == 0 { + if latestPublishedBlockNumber == nil || latestPublishedBlockNumber.Sign() == 0 { // Fall back to in-memory value which was set during Start - lastestPublishedBlockNumber = new(big.Int).SetUint64(c.lastPublishedBlock.Load()) + latestPublishedBlockNumber = new(big.Int).SetUint64(c.lastPublishedBlock.Load()) log.Warn(). - Str("fallback_value", lastestPublishedBlockNumber.String()). + Str("fallback_value", latestPublishedBlockNumber.String()). Msg("Storage returned nil/0 for last published block, using in-memory value") } log.Debug(). - Str("last_published", lastestPublishedBlockNumber.String()). + Str("last_published", latestPublishedBlockNumber.String()). Msg("Determining blocks to publish") - startBlock := new(big.Int).Add(lastestPublishedBlockNumber, big.NewInt(1)) - endBlock, err := c.getBlockToCommitUntil(ctx, lastestPublishedBlockNumber) + startBlock := new(big.Int).Add(latestPublishedBlockNumber, big.NewInt(1)) + endBlock, err := c.getBlockToCommitUntil(ctx, latestPublishedBlockNumber) if err != nil { return nil, fmt.Errorf("error getting block to commit until: %v", err) }