Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 56 additions & 53 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,45 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
return blockNumbers, nil
}

func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, error) {
lastestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID())
log.Debug().Msgf("Committer found this last published block number in staging storage: %s", lastestPublishedBlockNumber.String())
if err != nil {
return nil, err
}

if lastestPublishedBlockNumber.Sign() == 0 {
// If no blocks have been committed yet, start from the fromBlock specified in the config
lastestPublishedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
} else {
lastPublished := new(big.Int).SetUint64(c.lastPublishedBlock.Load())
if lastestPublishedBlockNumber.Cmp(lastPublished) < 0 {
log.Warn().Msgf("Max block in storage (%s) is less than last published block in memory (%s).", lastestPublishedBlockNumber.String(), lastPublished.String())
return []*big.Int{}, nil
}
}

startBlock := new(big.Int).Add(lastestPublishedBlockNumber, big.NewInt(1))
endBlock, err := c.getBlockToCommitUntil(ctx, lastestPublishedBlockNumber)
if err != nil {
return nil, fmt.Errorf("error getting block to commit until: %v", err)
}

blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
if blockCount < 0 {
return []*big.Int{}, fmt.Errorf("more blocks have been committed than the RPC has available - possible chain reset")
}
if blockCount == 0 {
return []*big.Int{}, nil
}
blockNumbers := make([]*big.Int, blockCount)
for i := int64(0); i < blockCount; i++ {
blockNumber := new(big.Int).Add(startBlock, big.NewInt(i))
blockNumbers[i] = blockNumber
}
return blockNumbers, nil
}

func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBlockNumber *big.Int) (*big.Int, error) {
untilBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(int64(c.blocksPerCommit)))
if c.workMode == WorkModeBackfill {
Expand All @@ -274,7 +313,7 @@ func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBl
}
}

func (c *Committer) fetchBlockDataToCommit(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
func (c *Committer) fetchBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
if c.workMode == WorkModeBackfill {
startTime := time.Now()
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blockNumbers, ChainId: c.rpc.GetChainID()})
Expand All @@ -300,16 +339,8 @@ func (c *Committer) fetchBlockDataToCommit(ctx context.Context, blockNumbers []*
}
}

func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]common.BlockData, error) {
blocksToCommit, err := c.getBlockNumbersToCommit(ctx)
if err != nil {
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
}
if len(blocksToCommit) == 0 {
return nil, nil
}

blocksData, err := c.fetchBlockDataToCommit(ctx, blocksToCommit)
func (c *Committer) getSequentialBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
blocksData, err := c.fetchBlockData(ctx, blockNumbers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -338,8 +369,8 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
})

if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
return nil, c.handleGap(ctx, blocksToCommit[0], blocksData[0].Block)
if blocksData[0].Block.Number.Cmp(blockNumbers[0]) != 0 {
return nil, c.handleGap(ctx, blockNumbers[0], blocksData[0].Block)
}

var sequentialBlockData []common.BlockData
Expand Down Expand Up @@ -367,54 +398,26 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
return sequentialBlockData, nil
}

func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
chainID := c.rpc.GetChainID()
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]common.BlockData, error) {
blocksToCommit, err := c.getBlockNumbersToCommit(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get last published block number: %v", err)
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
}

startBlock := new(big.Int).Set(c.commitFromBlock)
if lastPublished != nil && lastPublished.Sign() > 0 {
startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
if len(blocksToCommit) == 0 {
return nil, nil
}
return c.getSequentialBlockData(ctx, blocksToCommit)
}

endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1)))

blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
ChainId: chainID,
StartBlock: startBlock,
EndBlock: endBlock,
})
func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
blocksToPublish, err := c.getBlockNumbersToPublish(ctx)
if err != nil {
return nil, fmt.Errorf("error fetching blocks to publish: %v", err)
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
}
if len(blocksData) == 0 {
if len(blocksToPublish) == 0 {
return nil, nil
}

sort.Slice(blocksData, func(i, j int) bool {
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
})
if blocksData[0].Block.Number.Cmp(startBlock) != 0 {
log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String())
return nil, nil
}

sequential := []common.BlockData{blocksData[0]}
expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
for i := 1; i < len(blocksData); i++ {
if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 {
continue
}
if blocksData[i].Block.Number.Cmp(expected) != 0 {
break
}
sequential = append(sequential, blocksData[i])
expected.Add(expected, big.NewInt(1))
}

return sequential, nil
return c.getSequentialBlockData(ctx, blocksToPublish)
}

func (c *Committer) publish(ctx context.Context) error {
Expand Down
Loading