Skip to content

Commit fc2ae64

Browse files
committed
Fix publish parallel mode
1 parent e61fae7 commit fc2ae64

File tree

1 file changed

+51
-9
lines changed

1 file changed

+51
-9
lines changed

internal/orchestrator/committer.go

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type Committer struct {
3131
lastPublishedBlock atomic.Uint64
3232
publisher *publisher.Publisher
3333
workMode WorkMode
34+
workModeMutex sync.RWMutex
3435
workModeChan chan WorkMode
3536
validator *Validator
3637
}
@@ -101,7 +102,28 @@ func (c *Committer) Start(ctx context.Context) {
101102
// corrected by the worker loop.
102103
log.Error().Err(err).Msg("failed to get last published block number")
103104
} else if lastPublished != nil && lastPublished.Sign() > 0 {
104-
c.lastPublishedBlock.Store(lastPublished.Uint64())
105+
// Always ensure publisher starts from at least the committed value
106+
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
107+
if lastPublished.Cmp(latestCommittedBlockNumber) < 0 {
108+
gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
109+
log.Warn().
110+
Str("last_published", lastPublished.String()).
111+
Str("latest_committed", latestCommittedBlockNumber.String()).
112+
Str("gap", gap.String()).
113+
Msg("Publisher is behind committed position, seeking forward to committed value")
114+
115+
c.lastPublishedBlock.Store(latestCommittedBlockNumber.Uint64())
116+
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, latestCommittedBlockNumber); err != nil {
117+
log.Error().Err(err).Msg("Failed to update last published block number after seeking forward")
118+
// Fall back to the stored value on error
119+
c.lastPublishedBlock.Store(lastPublished.Uint64())
120+
}
121+
} else {
122+
c.lastPublishedBlock.Store(lastPublished.Uint64())
123+
}
124+
} else {
125+
c.lastPublishedBlock.Store(lastPublished.Uint64())
126+
}
105127
} else {
106128
c.lastPublishedBlock.Store(c.lastCommittedBlock.Load())
107129
}
@@ -143,13 +165,21 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
143165
case <-ctx.Done():
144166
return
145167
case workMode := <-c.workModeChan:
146-
if workMode != c.workMode && workMode != "" {
147-
log.Info().Msgf("Committer work mode changing from %s to %s", c.workMode, workMode)
148-
c.workMode = workMode
168+
if workMode != "" {
169+
c.workModeMutex.Lock()
170+
oldMode := c.workMode
171+
if workMode != oldMode {
172+
log.Info().Msgf("Committer work mode changing from %s to %s", oldMode, workMode)
173+
c.workMode = workMode
174+
}
175+
c.workModeMutex.Unlock()
149176
}
150177
default:
151178
time.Sleep(interval)
152-
if c.workMode == "" {
179+
c.workModeMutex.RLock()
180+
currentMode := c.workMode
181+
c.workModeMutex.RUnlock()
182+
if currentMode == "" {
153183
log.Debug().Msg("Committer work mode not set, skipping commit")
154184
continue
155185
}
@@ -176,7 +206,10 @@ func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration)
176206
return
177207
default:
178208
time.Sleep(interval)
179-
if c.workMode == "" {
209+
c.workModeMutex.RLock()
210+
currentMode := c.workMode
211+
c.workModeMutex.RUnlock()
212+
if currentMode == "" {
180213
log.Debug().Msg("Committer work mode not set, skipping publish")
181214
continue
182215
}
@@ -297,7 +330,10 @@ func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, e
297330

298331
func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBlockNumber *big.Int) (*big.Int, error) {
299332
untilBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(int64(c.blocksPerCommit)))
300-
if c.workMode == WorkModeBackfill {
333+
c.workModeMutex.RLock()
334+
currentMode := c.workMode
335+
c.workModeMutex.RUnlock()
336+
if currentMode == WorkModeBackfill {
301337
return untilBlock, nil
302338
} else {
303339
// get latest block from RPC and if that's less than until block, return that
@@ -314,7 +350,10 @@ func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBl
314350
}
315351

316352
func (c *Committer) fetchBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
317-
if c.workMode == WorkModeBackfill {
353+
c.workModeMutex.RLock()
354+
currentMode := c.workMode
355+
c.workModeMutex.RUnlock()
356+
if currentMode == WorkModeBackfill {
318357
startTime := time.Now()
319358
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blockNumbers, ChainId: c.rpc.GetChainID()})
320359
log.Debug().Str("metric", "get_staging_data_duration").Msgf("StagingStorage.GetStagingData duration: %f", time.Since(startTime).Seconds())
@@ -489,7 +528,10 @@ func (c *Committer) handleGap(ctx context.Context, expectedStartBlockNumber *big
489528
// record the first missed block number in prometheus
490529
metrics.MissedBlockNumbers.Set(float64(expectedStartBlockNumber.Int64()))
491530

492-
if c.workMode == WorkModeLive {
531+
c.workModeMutex.RLock()
532+
currentMode := c.workMode
533+
c.workModeMutex.RUnlock()
534+
if currentMode == WorkModeLive {
493535
log.Debug().Msgf("Skipping gap handling in live mode. Expected block %s, actual first block %s", expectedStartBlockNumber.String(), actualFirstBlock.Number.String())
494536
return nil
495537
}

0 commit comments

Comments
 (0)