Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 51 additions & 9 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Committer struct {
lastPublishedBlock atomic.Uint64
publisher *publisher.Publisher
workMode WorkMode
workModeMutex sync.RWMutex
workModeChan chan WorkMode
validator *Validator
}
Expand Down Expand Up @@ -101,7 +102,28 @@ func (c *Committer) Start(ctx context.Context) {
// corrected by the worker loop.
log.Error().Err(err).Msg("failed to get last published block number")
} else if lastPublished != nil && lastPublished.Sign() > 0 {
c.lastPublishedBlock.Store(lastPublished.Uint64())
// Always ensure publisher starts from at least the committed value
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
if lastPublished.Cmp(latestCommittedBlockNumber) < 0 {
gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
log.Warn().
Str("last_published", lastPublished.String()).
Str("latest_committed", latestCommittedBlockNumber.String()).
Str("gap", gap.String()).
Msg("Publisher is behind committed position, seeking forward to committed value")

c.lastPublishedBlock.Store(latestCommittedBlockNumber.Uint64())
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, latestCommittedBlockNumber); err != nil {
log.Error().Err(err).Msg("Failed to update last published block number after seeking forward")
// Fall back to the stored value on error
c.lastPublishedBlock.Store(lastPublished.Uint64())
}
} else {
c.lastPublishedBlock.Store(lastPublished.Uint64())
}
} else {
c.lastPublishedBlock.Store(lastPublished.Uint64())
}
} else {
c.lastPublishedBlock.Store(c.lastCommittedBlock.Load())
}
Expand Down Expand Up @@ -143,13 +165,21 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
case <-ctx.Done():
return
case workMode := <-c.workModeChan:
if workMode != c.workMode && workMode != "" {
log.Info().Msgf("Committer work mode changing from %s to %s", c.workMode, workMode)
c.workMode = workMode
if workMode != "" {
c.workModeMutex.Lock()
oldMode := c.workMode
if workMode != oldMode {
log.Info().Msgf("Committer work mode changing from %s to %s", oldMode, workMode)
c.workMode = workMode
}
c.workModeMutex.Unlock()
}
default:
time.Sleep(interval)
if c.workMode == "" {
c.workModeMutex.RLock()
currentMode := c.workMode
c.workModeMutex.RUnlock()
if currentMode == "" {
log.Debug().Msg("Committer work mode not set, skipping commit")
continue
}
Expand All @@ -176,7 +206,10 @@ func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration)
return
default:
time.Sleep(interval)
if c.workMode == "" {
c.workModeMutex.RLock()
currentMode := c.workMode
c.workModeMutex.RUnlock()
if currentMode == "" {
log.Debug().Msg("Committer work mode not set, skipping publish")
continue
}
Expand Down Expand Up @@ -297,7 +330,10 @@ func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, e

func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBlockNumber *big.Int) (*big.Int, error) {
untilBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(int64(c.blocksPerCommit)))
if c.workMode == WorkModeBackfill {
c.workModeMutex.RLock()
currentMode := c.workMode
c.workModeMutex.RUnlock()
if currentMode == WorkModeBackfill {
return untilBlock, nil
} else {
// get latest block from RPC and if that's less than until block, return that
Expand All @@ -314,7 +350,10 @@ func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBl
}

func (c *Committer) fetchBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
if c.workMode == WorkModeBackfill {
c.workModeMutex.RLock()
currentMode := c.workMode
c.workModeMutex.RUnlock()
if currentMode == WorkModeBackfill {
startTime := time.Now()
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blockNumbers, ChainId: c.rpc.GetChainID()})
log.Debug().Str("metric", "get_staging_data_duration").Msgf("StagingStorage.GetStagingData duration: %f", time.Since(startTime).Seconds())
Expand Down Expand Up @@ -489,7 +528,10 @@ func (c *Committer) handleGap(ctx context.Context, expectedStartBlockNumber *big
// record the first missed block number in prometheus
metrics.MissedBlockNumbers.Set(float64(expectedStartBlockNumber.Int64()))

if c.workMode == WorkModeLive {
c.workModeMutex.RLock()
currentMode := c.workMode
c.workModeMutex.RUnlock()
if currentMode == WorkModeLive {
log.Debug().Msgf("Skipping gap handling in live mode. Expected block %s, actual first block %s", expectedStartBlockNumber.String(), actualFirstBlock.Number.String())
return nil
}
Expand Down