Skip to content

Commit 961b3b6

Browse files
committed
Fix publisher parallel mode live
1 parent cfffac0 commit 961b3b6

File tree

1 file changed

+56
-53
lines changed

1 file changed

+56
-53
lines changed

internal/orchestrator/committer.go

Lines changed: 56 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,45 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
256256
return blockNumbers, nil
257257
}
258258

259+
func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, error) {
260+
lastestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID())
261+
log.Debug().Msgf("Committer found this last published block number in staging storage: %s", lastestPublishedBlockNumber.String())
262+
if err != nil {
263+
return nil, err
264+
}
265+
266+
if lastestPublishedBlockNumber.Sign() == 0 {
267+
// If no blocks have been committed yet, start from the fromBlock specified in the config
268+
lastestPublishedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
269+
} else {
270+
lastPublished := new(big.Int).SetUint64(c.lastPublishedBlock.Load())
271+
if lastestPublishedBlockNumber.Cmp(lastPublished) < 0 {
272+
log.Warn().Msgf("Max block in storage (%s) is less than last published block in memory (%s).", lastestPublishedBlockNumber.String(), lastPublished.String())
273+
return []*big.Int{}, nil
274+
}
275+
}
276+
277+
startBlock := new(big.Int).Add(lastestPublishedBlockNumber, big.NewInt(1))
278+
endBlock, err := c.getBlockToCommitUntil(ctx, lastestPublishedBlockNumber)
279+
if err != nil {
280+
return nil, fmt.Errorf("error getting block to commit until: %v", err)
281+
}
282+
283+
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
284+
if blockCount < 0 {
285+
return []*big.Int{}, fmt.Errorf("more blocks have been committed than the RPC has available - possible chain reset")
286+
}
287+
if blockCount == 0 {
288+
return []*big.Int{}, nil
289+
}
290+
blockNumbers := make([]*big.Int, blockCount)
291+
for i := int64(0); i < blockCount; i++ {
292+
blockNumber := new(big.Int).Add(startBlock, big.NewInt(i))
293+
blockNumbers[i] = blockNumber
294+
}
295+
return blockNumbers, nil
296+
}
297+
259298
func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBlockNumber *big.Int) (*big.Int, error) {
260299
untilBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(int64(c.blocksPerCommit)))
261300
if c.workMode == WorkModeBackfill {
@@ -274,7 +313,7 @@ func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBl
274313
}
275314
}
276315

277-
func (c *Committer) fetchBlockDataToCommit(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
316+
func (c *Committer) fetchBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
278317
if c.workMode == WorkModeBackfill {
279318
startTime := time.Now()
280319
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blockNumbers, ChainId: c.rpc.GetChainID()})
@@ -300,16 +339,8 @@ func (c *Committer) fetchBlockDataToCommit(ctx context.Context, blockNumbers []*
300339
}
301340
}
302341

303-
func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]common.BlockData, error) {
304-
blocksToCommit, err := c.getBlockNumbersToCommit(ctx)
305-
if err != nil {
306-
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
307-
}
308-
if len(blocksToCommit) == 0 {
309-
return nil, nil
310-
}
311-
312-
blocksData, err := c.fetchBlockDataToCommit(ctx, blocksToCommit)
342+
func (c *Committer) getSequentialBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
343+
blocksData, err := c.fetchBlockData(ctx, blockNumbers)
313344
if err != nil {
314345
return nil, err
315346
}
@@ -338,8 +369,8 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
338369
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
339370
})
340371

341-
if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
342-
return nil, c.handleGap(ctx, blocksToCommit[0], blocksData[0].Block)
372+
if blocksData[0].Block.Number.Cmp(blockNumbers[0]) != 0 {
373+
return nil, c.handleGap(ctx, blockNumbers[0], blocksData[0].Block)
343374
}
344375

345376
var sequentialBlockData []common.BlockData
@@ -367,54 +398,26 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
367398
return sequentialBlockData, nil
368399
}
369400

370-
func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
371-
chainID := c.rpc.GetChainID()
372-
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
401+
func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]common.BlockData, error) {
402+
blocksToCommit, err := c.getBlockNumbersToCommit(ctx)
373403
if err != nil {
374-
return nil, fmt.Errorf("failed to get last published block number: %v", err)
404+
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
375405
}
376-
377-
startBlock := new(big.Int).Set(c.commitFromBlock)
378-
if lastPublished != nil && lastPublished.Sign() > 0 {
379-
startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
406+
if len(blocksToCommit) == 0 {
407+
return nil, nil
380408
}
409+
return c.getSequentialBlockData(ctx, blocksToCommit)
410+
}
381411

382-
endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1)))
383-
384-
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
385-
ChainId: chainID,
386-
StartBlock: startBlock,
387-
EndBlock: endBlock,
388-
})
412+
func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
413+
blocksToPublish, err := c.getBlockNumbersToPublish(ctx)
389414
if err != nil {
390-
return nil, fmt.Errorf("error fetching blocks to publish: %v", err)
415+
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
391416
}
392-
if len(blocksData) == 0 {
417+
if len(blocksToPublish) == 0 {
393418
return nil, nil
394419
}
395-
396-
sort.Slice(blocksData, func(i, j int) bool {
397-
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
398-
})
399-
if blocksData[0].Block.Number.Cmp(startBlock) != 0 {
400-
log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String())
401-
return nil, nil
402-
}
403-
404-
sequential := []common.BlockData{blocksData[0]}
405-
expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
406-
for i := 1; i < len(blocksData); i++ {
407-
if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 {
408-
continue
409-
}
410-
if blocksData[i].Block.Number.Cmp(expected) != 0 {
411-
break
412-
}
413-
sequential = append(sequential, blocksData[i])
414-
expected.Add(expected, big.NewInt(1))
415-
}
416-
417-
return sequential, nil
420+
return c.getSequentialBlockData(ctx, blocksToPublish)
418421
}
419422

420423
func (c *Committer) publish(ctx context.Context) error {

0 commit comments

Comments
 (0)