Skip to content

Commit 00b5822

Browse files
committed
Cleanup committer block numbers initialization
1 parent b7b4916 commit 00b5822

File tree

1 file changed

+73
-158
lines changed

1 file changed

+73
-158
lines changed

internal/orchestrator/committer.go

Lines changed: 73 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -47,121 +47,30 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller,
4747
commitToBlock = -1
4848
}
4949

50-
commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock))
5150
committer := &Committer{
5251
blocksPerCommit: blocksPerCommit,
5352
storage: storage,
54-
commitFromBlock: commitFromBlock,
53+
commitFromBlock: big.NewInt(int64(config.Cfg.Committer.FromBlock)),
5554
commitToBlock: big.NewInt(int64(commitToBlock)),
5655
rpc: rpc,
5756
publisher: publisher.GetInstance(),
5857
poller: poller,
5958
validator: NewValidator(rpc, storage, worker.NewWorker(rpc)), // validator uses worker without sources
6059
}
61-
cfb := commitFromBlock.Uint64()
62-
committer.lastCommittedBlock.Store(cfb)
63-
committer.lastPublishedBlock.Store(cfb)
6460

6561
for _, opt := range opts {
6662
opt(committer)
6763
}
6864

65+
if err := committer.initCommittedAndPublishedBlockNumbers(); err != nil {
66+
log.Fatal().Err(err).Msg("Failed to initialize committer block numbers")
67+
}
68+
6969
return committer
7070
}
7171

7272
func (c *Committer) Start(ctx context.Context) {
7373
log.Debug().Msgf("Committer running")
74-
chainID := c.rpc.GetChainID()
75-
76-
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(chainID)
77-
if err != nil {
78-
// It's okay to fail silently here; this value is only used for staging cleanup and
79-
// the worker loop will eventually correct the state and delete as needed.
80-
log.Error().Msgf("Error getting latest committed block number: %v", err)
81-
} else if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
82-
c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64())
83-
}
84-
85-
// Initialize publisher position - always use max(lastPublished, lastCommitted) to prevent double publishing
86-
lastPublished, err := c.storage.OrchestratorStorage.GetLastPublishedBlockNumber(chainID)
87-
if err != nil {
88-
// It's okay to fail silently here; it's only used for staging cleanup and will be
89-
// corrected by the worker loop.
90-
log.Error().Err(err).Msg("failed to get last published block number")
91-
} else if lastPublished != nil && lastPublished.Sign() > 0 {
92-
// Always ensure publisher starts from at least the committed value
93-
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
94-
if lastPublished.Cmp(latestCommittedBlockNumber) < 0 {
95-
gap := new(big.Int).Sub(latestCommittedBlockNumber, lastPublished)
96-
log.Warn().
97-
Str("last_published", lastPublished.String()).
98-
Str("latest_committed", latestCommittedBlockNumber.String()).
99-
Str("gap", gap.String()).
100-
Msg("Publisher is behind committed position, seeking forward to committed value")
101-
102-
c.lastPublishedBlock.Store(latestCommittedBlockNumber.Uint64())
103-
if err := c.storage.OrchestratorStorage.SetLastPublishedBlockNumber(chainID, latestCommittedBlockNumber); err != nil {
104-
log.Error().Err(err).Msg("Failed to update last published block number after seeking forward")
105-
// Fall back to the stored value on error
106-
c.lastPublishedBlock.Store(lastPublished.Uint64())
107-
}
108-
} else {
109-
c.lastPublishedBlock.Store(lastPublished.Uint64())
110-
}
111-
} else {
112-
c.lastPublishedBlock.Store(lastPublished.Uint64())
113-
}
114-
} else {
115-
c.lastPublishedBlock.Store(c.lastCommittedBlock.Load())
116-
}
117-
118-
// Determine the correct publish position - always take the maximum to avoid going backwards
119-
var targetPublishBlock *big.Int
120-
121-
if lastPublished == nil || lastPublished.Sign() == 0 {
122-
// No previous publish position
123-
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
124-
// Start from committed position
125-
targetPublishBlock = latestCommittedBlockNumber
126-
} else if c.commitFromBlock.Sign() > 0 {
127-
// Start from configured position minus 1 (since we publish from next block)
128-
targetPublishBlock = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
129-
} else {
130-
// Start from 0
131-
targetPublishBlock = big.NewInt(0)
132-
}
133-
134-
log.Info().
135-
Str("target_publish_block", targetPublishBlock.String()).
136-
Msg("No previous publish position, initializing publisher cursor")
137-
} else {
138-
// We have a previous position
139-
targetPublishBlock = lastPublished
140-
}
141-
142-
// Only update storage if we're changing the position
143-
if lastPublished == nil || targetPublishBlock.Cmp(lastPublished) != 0 {
144-
if err := c.storage.OrchestratorStorage.SetLastPublishedBlockNumber(chainID, targetPublishBlock); err != nil {
145-
log.Error().Err(err).Msg("Failed to update published block number in storage")
146-
// If we can't update storage, use what was there originally to avoid issues
147-
if lastPublished != nil {
148-
targetPublishBlock = lastPublished
149-
}
150-
}
151-
}
152-
153-
// Store in memory for quick acess
154-
c.lastPublishedBlock.Store(targetPublishBlock.Uint64())
155-
156-
log.Info().
157-
Str("publish_from", targetPublishBlock.String()).
158-
Str("committed_at", func() string {
159-
if latestCommittedBlockNumber != nil {
160-
return latestCommittedBlockNumber.String()
161-
}
162-
return "0"
163-
}()).
164-
Msg("Publisher initialized")
16574

16675
if config.Cfg.Publisher.Mode == "parallel" {
16776
var wg sync.WaitGroup
@@ -188,6 +97,43 @@ func (c *Committer) Start(ctx context.Context) {
18897
c.publisher.Close()
18998
}
19099

100+
func (c *Committer) initCommittedAndPublishedBlockNumbers() error {
101+
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
102+
if err != nil {
103+
return err
104+
}
105+
106+
if latestCommittedBlockNumber == nil {
107+
latestCommittedBlockNumber = new(big.Int).SetUint64(0)
108+
}
109+
110+
if c.commitFromBlock.Sign() > 0 && latestCommittedBlockNumber.Cmp(c.commitFromBlock) < 0 {
111+
latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
112+
}
113+
c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64())
114+
115+
lastPublished, err := c.storage.OrchestratorStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID())
116+
if err != nil {
117+
return err
118+
}
119+
120+
if lastPublished == nil {
121+
lastPublished = new(big.Int).SetUint64(0)
122+
}
123+
124+
// If the last published block is not initialized yet, set it to the last committed block number
125+
if lastPublished.Sign() == 0 && c.lastCommittedBlock.Load() > 0 {
126+
lastPublished = new(big.Int).SetUint64(c.lastCommittedBlock.Load())
127+
128+
if err := c.storage.OrchestratorStorage.SetLastPublishedBlockNumber(c.rpc.GetChainID(), lastPublished); err != nil {
129+
return err
130+
}
131+
}
132+
c.lastPublishedBlock.Store(lastPublished.Uint64())
133+
134+
return nil
135+
}
136+
191137
func (c *Committer) runCommitLoop(ctx context.Context) {
192138
for {
193139
select {
@@ -283,40 +229,17 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
283229
if err != nil {
284230
return nil, err
285231
}
286-
if latestCommittedBlockNumber == nil {
287-
latestCommittedBlockNumber = new(big.Int).SetUint64(0)
288-
}
289-
log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
290232

291-
if latestCommittedBlockNumber.Sign() == 0 {
292-
// If no blocks have been committed yet, start from the fromBlock specified in the config
293-
latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
294-
} else {
295-
lastCommitted := new(big.Int).SetUint64(c.lastCommittedBlock.Load())
296-
if latestCommittedBlockNumber.Cmp(lastCommitted) < 0 {
297-
log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), lastCommitted.String())
298-
return []*big.Int{}, nil
299-
}
233+
if latestCommittedBlockNumber == nil || c.lastCommittedBlock.Load() != latestCommittedBlockNumber.Uint64() {
234+
log.Fatal().Msgf("Inconsistent last committed block state between memory (%d) and storage (%v)", c.lastCommittedBlock.Load(), latestCommittedBlockNumber)
235+
return nil, fmt.Errorf("last committed block number is not initialized correctly")
300236
}
301237

302-
startBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(1))
303-
endBlock, err := c.getBlockToCommitUntil(ctx, latestCommittedBlockNumber)
238+
blockNumbers, err := c.getBlockRange(ctx, latestCommittedBlockNumber)
304239
if err != nil {
305-
return nil, fmt.Errorf("error getting block to commit until: %v", err)
240+
return nil, fmt.Errorf("failed to get block range to commit: %v", err)
306241
}
307242

308-
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
309-
if blockCount < 0 {
310-
return []*big.Int{}, fmt.Errorf("more blocks have been committed than the RPC has available - possible chain reset")
311-
}
312-
if blockCount == 0 {
313-
return []*big.Int{}, nil
314-
}
315-
blockNumbers := make([]*big.Int, blockCount)
316-
for i := int64(0); i < blockCount; i++ {
317-
blockNumber := new(big.Int).Add(startBlock, big.NewInt(i))
318-
blockNumbers[i] = blockNumber
319-
}
320243
return blockNumbers, nil
321244
}
322245

@@ -327,46 +250,25 @@ func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, e
327250
return nil, fmt.Errorf("failed to get last published block number: %v", err)
328251
}
329252

330-
// This should never happen after Start() has run, but handle it defensively
331-
if latestPublishedBlockNumber == nil || latestPublishedBlockNumber.Sign() == 0 {
332-
// Fall back to in-memory value which was set during Start
333-
latestPublishedBlockNumber = new(big.Int).SetUint64(c.lastPublishedBlock.Load())
334-
log.Warn().
335-
Str("fallback_value", latestPublishedBlockNumber.String()).
336-
Msg("Storage returned nil/0 for last published block, using in-memory value")
253+
if latestPublishedBlockNumber == nil || c.lastPublishedBlock.Load() != latestPublishedBlockNumber.Uint64() {
254+
log.Fatal().Msgf("Inconsistent last published block state between memory (%d) and storage (%v)", c.lastPublishedBlock.Load(), latestPublishedBlockNumber)
255+
return nil, fmt.Errorf("last published block number is not initialized correctly")
337256
}
338257

339-
log.Debug().
340-
Str("last_published", latestPublishedBlockNumber.String()).
341-
Msg("Determining blocks to publish")
342-
343-
startBlock := new(big.Int).Add(latestPublishedBlockNumber, big.NewInt(1))
344-
endBlock, err := c.getBlockToCommitUntil(ctx, latestPublishedBlockNumber)
258+
blockNumbers, err := c.getBlockRange(ctx, latestPublishedBlockNumber)
345259
if err != nil {
346-
return nil, fmt.Errorf("error getting block to commit until: %v", err)
260+
return nil, fmt.Errorf("failed to get block range to publish: %v", err)
347261
}
348262

349-
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
350-
if blockCount < 0 {
351-
return []*big.Int{}, fmt.Errorf("more blocks have been committed than the RPC has available - possible chain reset")
352-
}
353-
if blockCount == 0 {
354-
return []*big.Int{}, nil
355-
}
356-
blockNumbers := make([]*big.Int, blockCount)
357-
for i := int64(0); i < blockCount; i++ {
358-
blockNumber := new(big.Int).Add(startBlock, big.NewInt(i))
359-
blockNumbers[i] = blockNumber
360-
}
361263
return blockNumbers, nil
362264
}
363265

364-
func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBlockNumber *big.Int) (*big.Int, error) {
365-
untilBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(int64(c.blocksPerCommit)))
266+
func (c *Committer) getBlockRange(ctx context.Context, lastBlockNumber *big.Int) ([]*big.Int, error) {
267+
endBlock := new(big.Int).Add(lastBlockNumber, big.NewInt(int64(c.blocksPerCommit)))
366268

367269
// If a commit until block is set, then set a limit on the commit until block
368-
if c.commitToBlock.Sign() > 0 && untilBlock.Cmp(c.commitToBlock) > 0 {
369-
return new(big.Int).Set(c.commitToBlock), nil
270+
if c.commitToBlock.Sign() > 0 && endBlock.Cmp(c.commitToBlock) > 0 {
271+
endBlock = new(big.Int).Set(c.commitToBlock)
370272
}
371273

372274
// get latest block from RPC and if that's less than until block, return that
@@ -375,12 +277,25 @@ func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBl
375277
return nil, fmt.Errorf("error getting latest block from RPC: %v", err)
376278
}
377279

378-
if latestBlock.Cmp(untilBlock) < 0 {
379-
log.Debug().Msgf("Committing until latest block: %s", latestBlock.String())
380-
return latestBlock, nil
280+
if latestBlock.Cmp(endBlock) < 0 {
281+
endBlock = new(big.Int).Set(latestBlock)
381282
}
382283

383-
return untilBlock, nil
284+
startBlock := new(big.Int).Add(lastBlockNumber, big.NewInt(1))
285+
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
286+
if blockCount < 0 {
287+
return []*big.Int{}, fmt.Errorf("more blocks have been committed than the RPC has available - possible chain reset")
288+
}
289+
if blockCount == 0 {
290+
return []*big.Int{}, nil
291+
}
292+
293+
blockNumbers := make([]*big.Int, blockCount)
294+
for i := int64(0); i < blockCount; i++ {
295+
blockNumber := new(big.Int).Add(startBlock, big.NewInt(i))
296+
blockNumbers[i] = blockNumber
297+
}
298+
return blockNumbers, nil
384299
}
385300

386301
func (c *Committer) fetchBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {

0 commit comments

Comments
 (0)