From 3a7bacebbf3a6cd34b0049d42a11072f7db1258e Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Thu, 14 Aug 2025 17:02:46 +0000 Subject: [PATCH 1/2] Fix publish parallel mode --- internal/orchestrator/committer.go | 60 +++++++++++++++++++++++++----- 1 file changed, 51 insertions(+), 9 deletions(-) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index d85213a..228812a 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -31,6 +31,7 @@ type Committer struct { lastPublishedBlock atomic.Uint64 publisher *publisher.Publisher workMode WorkMode + workModeMutex sync.RWMutex workModeChan chan WorkMode validator *Validator } @@ -101,7 +102,28 @@ func (c *Committer) Start(ctx context.Context) { // corrected by the worker loop. log.Error().Err(err).Msg("failed to get last published block number") } else if lastPublished != nil && lastPublished.Sign() > 0 { - c.lastPublishedBlock.Store(lastPublished.Uint64()) + // Always ensure publisher starts from at least the committed value + if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { + if lastPublished.Cmp(latestCommittedBlockNumber) < 0 { + gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished) + log.Warn(). + Str("last_published", lastPublished.String()). + Str("latest_committed", latestCommittedBlockNumber.String()). + Str("gap", gap.String()). + Msg("Publisher is behind committed position, seeking forward to committed value") + + c.lastPublishedBlock.Store(latestCommittedBlockNumber.Uint64()) + if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, latestCommittedBlockNumber); err != nil { + log.Error().Err(err).Msg("Failed to update last published block number after seeking forward") + // Fall back to the stored value on error + c.lastPublishedBlock.Store(lastPublished.Uint64()) + } + } else { + c.lastPublishedBlock.Store(lastPublished.Uint64()) + } + } else { + c.lastPublishedBlock.Store(lastPublished.Uint64()) + } } else { c.lastPublishedBlock.Store(c.lastCommittedBlock.Load()) } @@ -143,13 +165,21 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) { case <-ctx.Done(): return case workMode := <-c.workModeChan: - if workMode != c.workMode && workMode != "" { - log.Info().Msgf("Committer work mode changing from %s to %s", c.workMode, workMode) - c.workMode = workMode + if workMode != "" { + c.workModeMutex.Lock() + oldMode := c.workMode + if workMode != oldMode { + log.Info().Msgf("Committer work mode changing from %s to %s", oldMode, workMode) + c.workMode = workMode + } + c.workModeMutex.Unlock() } default: time.Sleep(interval) - if c.workMode == "" { + c.workModeMutex.RLock() + currentMode := c.workMode + c.workModeMutex.RUnlock() + if currentMode == "" { log.Debug().Msg("Committer work mode not set, skipping commit") continue } @@ -176,7 +206,10 @@ func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) return default: time.Sleep(interval) - if c.workMode == "" { + c.workModeMutex.RLock() + currentMode := c.workMode + c.workModeMutex.RUnlock() + if currentMode == "" { log.Debug().Msg("Committer work mode not set, skipping publish") continue } @@ -297,7 +330,10 @@ func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, e func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBlockNumber *big.Int) (*big.Int, error) { untilBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(int64(c.blocksPerCommit))) - if c.workMode == WorkModeBackfill { + c.workModeMutex.RLock() + currentMode := c.workMode + c.workModeMutex.RUnlock() + if currentMode == WorkModeBackfill { return untilBlock, nil } else { // get latest block from RPC and if that's less than until block, return that @@ -314,7 +350,10 @@ func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBl } func (c *Committer) fetchBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) { - if c.workMode == WorkModeBackfill { + c.workModeMutex.RLock() + currentMode := c.workMode + c.workModeMutex.RUnlock() + if currentMode == WorkModeBackfill { startTime := time.Now() blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blockNumbers, ChainId: c.rpc.GetChainID()}) log.Debug().Str("metric", "get_staging_data_duration").Msgf("StagingStorage.GetStagingData duration: %f", time.Since(startTime).Seconds()) @@ -489,7 +528,10 @@ func (c *Committer) handleGap(ctx context.Context, expectedStartBlockNumber *big // record the first missed block number in prometheus metrics.MissedBlockNumbers.Set(float64(expectedStartBlockNumber.Int64())) - if c.workMode == WorkModeLive { + c.workModeMutex.RLock() + currentMode := c.workMode + c.workModeMutex.RUnlock() + if currentMode == WorkModeLive { log.Debug().Msgf("Skipping gap handling in live mode. Expected block %s, actual first block %s", expectedStartBlockNumber.String(), actualFirstBlock.Number.String()) return nil } From cfc8cf6419b0662341ce59cc77d4f601bf4a602e Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Thu, 14 Aug 2025 17:03:53 +0000 Subject: [PATCH 2/2] Gofmt --- internal/orchestrator/committer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 228812a..1316f0e 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -111,7 +111,7 @@ func (c *Committer) Start(ctx context.Context) { Str("latest_committed", latestCommittedBlockNumber.String()). Str("gap", gap.String()). Msg("Publisher is behind committed position, seeking forward to committed value") - + c.lastPublishedBlock.Store(latestCommittedBlockNumber.Uint64()) if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, latestCommittedBlockNumber); err != nil { log.Error().Err(err).Msg("Failed to update last published block number after seeking forward")