Skip to content

Commit 507ef74

Browse files
committed
Init publisher value on start
1 parent db4d974 commit 507ef74

File tree

1 file changed

+66
-29
lines changed

1 file changed

+66
-29
lines changed

internal/orchestrator/committer.go

Lines changed: 66 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -96,38 +96,73 @@ func (c *Committer) Start(ctx context.Context) {
9696
c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64())
9797
}
9898

99+
// Initialize publisher position - always use max(lastPublished, lastCommitted) to prevent double publishing
99100
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
100101
if err != nil {
101-
// It's okay to fail silently here; it's only used for staging cleanup and will be
102-
// corrected by the worker loop.
103-
log.Error().Err(err).Msg("failed to get last published block number")
104-
} else if lastPublished != nil && lastPublished.Sign() > 0 {
105-
// Always ensure publisher starts from at least the committed value
102+
log.Error().Err(err).Msg("Failed to get last published block number from storage")
103+
// If we can't read, assume we need to start from the beginning
104+
lastPublished = nil
105+
}
106+
107+
// Determine the correct publish position - always take the maximum to avoid going backwards
108+
var targetPublishBlock *big.Int
109+
110+
if lastPublished == nil || lastPublished.Sign() == 0 {
111+
// No previous publish position
112+
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
113+
// Start from committed position
114+
targetPublishBlock = latestCommittedBlockNumber
115+
} else if c.commitFromBlock.Sign() > 0 {
116+
// Start from configured position minus 1 (since we publish from next block)
117+
targetPublishBlock = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
118+
} else {
119+
// Start from 0
120+
targetPublishBlock = big.NewInt(0)
121+
}
122+
123+
log.Info().
124+
Str("target_publish_block", targetPublishBlock.String()).
125+
Msg("No previous publish position, initializing publisher cursor")
126+
} else {
127+
// We have a previous position - use max(lastPublished, lastCommitted)
128+
targetPublishBlock = lastPublished
106129
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
107-
if lastPublished.Cmp(latestCommittedBlockNumber) < 0 {
130+
if latestCommittedBlockNumber.Cmp(lastPublished) > 0 {
108131
gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
109132
log.Warn().
110133
Str("last_published", lastPublished.String()).
111134
Str("latest_committed", latestCommittedBlockNumber.String()).
112135
Str("gap", gap.String()).
113136
Msg("Publisher is behind committed position, seeking forward to committed value")
137+
targetPublishBlock = latestCommittedBlockNumber
138+
}
139+
}
140+
}
114141

115-
c.lastPublishedBlock.Store(latestCommittedBlockNumber.Uint64())
116-
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, latestCommittedBlockNumber); err != nil {
117-
log.Error().Err(err).Msg("Failed to update last published block number after seeking forward")
118-
// Fall back to the stored value on error
119-
c.lastPublishedBlock.Store(lastPublished.Uint64())
120-
}
121-
} else {
122-
c.lastPublishedBlock.Store(lastPublished.Uint64())
142+
// Only update storage if we're changing the position
143+
if lastPublished == nil || targetPublishBlock.Cmp(lastPublished) != 0 {
144+
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, targetPublishBlock); err != nil {
145+
log.Error().Err(err).Msg("Failed to update published block number in storage")
146+
// If we can't update storage, use what was there originally to avoid issues
147+
if lastPublished != nil {
148+
targetPublishBlock = lastPublished
123149
}
124-
} else {
125-
c.lastPublishedBlock.Store(lastPublished.Uint64())
126150
}
127-
} else {
128-
c.lastPublishedBlock.Store(c.lastCommittedBlock.Load())
129151
}
130152

153+
// Store in memory for quick access
154+
c.lastPublishedBlock.Store(targetPublishBlock.Uint64())
155+
156+
log.Info().
157+
Str("publish_from", targetPublishBlock.String()).
158+
Str("committed_at", func() string {
159+
if latestCommittedBlockNumber != nil {
160+
return latestCommittedBlockNumber.String()
161+
}
162+
return "0"
163+
}()).
164+
Msg("Publisher initialized")
165+
131166
c.cleanupProcessedStagingBlocks()
132167

133168
if config.Cfg.Publisher.Mode == "parallel" {
@@ -290,23 +325,25 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
290325
}
291326

292327
func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, error) {
328+
// Get the last published block from storage (which was already corrected in Start)
293329
lastestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID())
294-
log.Debug().Msgf("Committer found this last published block number in staging storage: %s", lastestPublishedBlockNumber.String())
295330
if err != nil {
296-
return nil, err
331+
return nil, fmt.Errorf("failed to get last published block number: %v", err)
297332
}
298333

299-
if lastestPublishedBlockNumber.Sign() == 0 {
300-
// If no blocks have been committed yet, start from the fromBlock specified in the config
301-
lastestPublishedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
302-
} else {
303-
lastPublished := new(big.Int).SetUint64(c.lastPublishedBlock.Load())
304-
if lastestPublishedBlockNumber.Cmp(lastPublished) < 0 {
305-
log.Warn().Msgf("Max block in storage (%s) is less than last published block in memory (%s).", lastestPublishedBlockNumber.String(), lastPublished.String())
306-
return []*big.Int{}, nil
307-
}
334+
// This should never happen after Start() has run, but handle it defensively
335+
if lastestPublishedBlockNumber == nil || lastestPublishedBlockNumber.Sign() == 0 {
336+
// Fall back to in-memory value which was set during Start
337+
lastestPublishedBlockNumber = new(big.Int).SetUint64(c.lastPublishedBlock.Load())
338+
log.Warn().
339+
Str("fallback_value", lastestPublishedBlockNumber.String()).
340+
Msg("Storage returned nil/0 for last published block, using in-memory value")
308341
}
309342

343+
log.Debug().
344+
Str("last_published", lastestPublishedBlockNumber.String()).
345+
Msg("Determining blocks to publish")
346+
310347
startBlock := new(big.Int).Add(lastestPublishedBlockNumber, big.NewInt(1))
311348
endBlock, err := c.getBlockToCommitUntil(ctx, lastestPublishedBlockNumber)
312349
if err != nil {

0 commit comments

Comments
 (0)