Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 56 additions & 94 deletions cl/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ type BlockBuilder struct {
logger *slog.Logger
buildDelay time.Duration
buildEmptyBlocksDelay time.Duration
buildDelayMs uint64
lastBlockTime time.Time
feeRecipient common.Address
executionHead *types.ExecutionHead
}
Expand All @@ -74,10 +72,8 @@ func NewBlockBuilder(
engineCl: engineCl,
logger: logger,
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
buildEmptyBlocksDelay: buildDelayEmptyBlocks,
feeRecipient: common.HexToAddress(feeReceipt),
lastBlockTime: time.Now().Add(-buildDelayEmptyBlocks),
rpcClient: rpcClient,
}
}
Expand All @@ -89,12 +85,8 @@ func NewMemberBlockBuilder(engineCL EngineClient, logger *slog.Logger) *BlockBui
}
}

func (bb *BlockBuilder) SetLastCallTimeToZero() {
bb.lastBlockTime = time.Time{}
}

func (bb *BlockBuilder) startBuild(ctx context.Context, head *types.ExecutionHead, ts uint64) (engine.ForkChoiceResponse, error) {
hash := common.BytesToHash(head.BlockHash)
func (bb *BlockBuilder) startBuild(ctx context.Context, ts uint64) (engine.ForkChoiceResponse, error) {
hash := common.BytesToHash(bb.executionHead.BlockHash)

fcs := engine.ForkchoiceStateV1{
HeadBlockHash: hash,
Expand Down Expand Up @@ -123,19 +115,39 @@ func (bb *BlockBuilder) startBuild(ctx context.Context, head *types.ExecutionHea
func (bb *BlockBuilder) GetPayload(ctx context.Context) error {
var (
payloadID *engine.PayloadID
head *types.ExecutionHead
err error
)
currentCallTime := time.Now()

if bb.executionHead == nil {
bb.logger.Info("executionHead is nil, it'll be set by RPC. CL is likely being restarted")
err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error {
innerErr := bb.setExecutionHeadFromRPC(ctx)
if innerErr != nil {
bb.logger.Warn(
"Failed to set execution head from rpc, retrying...",
"error", innerErr,
)
return innerErr // Will retry
}
return nil // Success
})
if err != nil {
return fmt.Errorf("failed to set execution head from rpc: %w", err)
}
} else {
bb.logger.Debug("executionHead is not nil, using cached value")
}

mempoolStatus, err := bb.GetMempoolStatus(ctx)
if err != nil {
return fmt.Errorf("failed to get pending transaction count: %w", err)
}
bb.logger.Debug("GetMempoolStatus rpc duration", "duration", time.Since(currentCallTime))

lastBlockTime := time.UnixMilli(int64(bb.executionHead.BlockTime))
bb.logger.Debug("lastBlockTime from execution head", "lastBlockTime", lastBlockTime)

if mempoolStatus.Pending == 0 {
timeSinceLastBlock := currentCallTime.Sub(bb.lastBlockTime)
timeSinceLastBlock := time.Since(lastBlockTime)
if timeSinceLastBlock < bb.buildEmptyBlocksDelay {
bb.logger.Debug(
"Leader: Skipping empty block",
Expand All @@ -153,50 +165,24 @@ func (bb *BlockBuilder) GetPayload(ctx context.Context) error {
)
}

// Load execution head to get previous block timestamp
err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error {
head, err = bb.loadExecutionHead(ctx)
if err != nil {
bb.logger.Warn(
"Failed to load execution head, retrying...",
"error", err,
)
return err // Will retry
}
return nil // Success
})
if err != nil {
return fmt.Errorf("latest execution block: %w", err)
}

prevTimestamp := head.BlockTime

var ts uint64

if bb.lastBlockTime.IsZero() {
// First block, initialize LastCallTime and set default timestamp
ts = uint64(time.Now().UnixMilli()) + bb.buildDelayMs
} else {
// Compute diff in milliseconds
diff := currentCallTime.Sub(bb.lastBlockTime)
diffMillis := diff.Milliseconds()

if uint64(diffMillis) <= bb.buildDelayMs {
ts = prevTimestamp + bb.buildDelayMs
} else {
// For every multiple of buildDelay that diff exceeds, increment the block time by that multiple.
multiples := (uint64(diffMillis) + bb.buildDelayMs - 1) / bb.buildDelayMs // Round up to next multiple of buildDelay
ts = prevTimestamp + multiples*bb.buildDelayMs
ts := uint64(time.Now().UnixMilli())
if ts <= bb.executionHead.BlockTime {
bb.logger.Warn(`Leader: Current timestamp is less than or equal to
previous block timestamp. Genesis timestamp could have been set incorrectly`,
"current_ts", ts,
"prev_head_block_time", bb.executionHead.BlockTime,
)
ts = bb.executionHead.BlockTime + 1
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled")
case <-time.After(time.Until(time.UnixMilli(int64(ts)))):
bb.logger.Info("Leader: Waited until proper block timestamp", "ts", ts)
}
}

// Very low chance to happen, only after restart and time.Now is broken
if ts <= head.BlockTime {
ts = head.BlockTime + 1 // Subsequent blocks must have a higher timestamp.
}

err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error {
response, err := bb.startBuild(ctx, head, ts)
response, err := bb.startBuild(ctx, ts)
if err != nil {
bb.logger.Warn(
"Failed to build new EVM payload, will retry",
Expand Down Expand Up @@ -275,8 +261,6 @@ func (bb *BlockBuilder) GetPayload(ctx context.Context) error {
"Leader: BuildBlock completed and block is distributed",
"PayloadID", payloadIDStr,
)

bb.lastBlockTime = time.Now()
return nil
}

Expand Down Expand Up @@ -418,30 +402,13 @@ func (bb *BlockBuilder) FinalizeBlock(ctx context.Context, payloadIDStr, executi
return fmt.Errorf("failed to deserialize ExecutionPayload: %w", err)
}

var head *types.ExecutionHead
err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error {
head, err = bb.loadExecutionHead(ctx)
if err != nil {
bb.logger.Warn(
"Failed to load execution head, retrying...",
"error", err,
)
return err // Will retry
}
return nil // Success
})
if err != nil {
return fmt.Errorf("failed to load execution head: %w", err)
}

if err := bb.validateExecutionPayload(executionPayload, head); err != nil {
if err := bb.validateExecutionPayload(executionPayload); err != nil {
return fmt.Errorf("failed to validate execution payload: %w", err)
}

hash := common.BytesToHash(head.BlockHash)
retryFunc := bb.selectRetryFunction(ctx, msgID)

if err := bb.pushNewPayload(ctx, executionPayload, hash, retryFunc); err != nil {
if err := bb.pushNewPayload(ctx, executionPayload, retryFunc); err != nil {
return fmt.Errorf("failed to push new payload: %w", err)
}

Expand All @@ -464,20 +431,20 @@ func (bb *BlockBuilder) FinalizeBlock(ctx context.Context, payloadIDStr, executi
return nil
}

func (bb *BlockBuilder) validateExecutionPayload(executionPayload engine.ExecutableData, head *types.ExecutionHead) error {
if executionPayload.Number != head.BlockHeight+1 {
return fmt.Errorf("invalid block height: %d, expected: %d", executionPayload.Number, head.BlockHeight+1)
func (bb *BlockBuilder) validateExecutionPayload(executionPayload engine.ExecutableData) error {
if executionPayload.Number != bb.executionHead.BlockHeight+1 {
return fmt.Errorf("invalid block height: %d, expected: %d", executionPayload.Number, bb.executionHead.BlockHeight+1)
}
if executionPayload.ParentHash != common.Hash(head.BlockHash) {
return fmt.Errorf("invalid parent hash: %s, head: %s", executionPayload.ParentHash, head.BlockHash)
if executionPayload.ParentHash != common.Hash(bb.executionHead.BlockHash) {
return fmt.Errorf("invalid parent hash: %s, head: %s", executionPayload.ParentHash, bb.executionHead.BlockHash)
}
minTimestamp := head.BlockTime + 1
minTimestamp := bb.executionHead.BlockTime + 1
if executionPayload.Timestamp < minTimestamp && executionPayload.Number != 1 {
return fmt.Errorf("invalid timestamp: %d, min: %d", executionPayload.Timestamp, minTimestamp)
}
hash := common.BytesToHash(head.BlockHash)
hash := common.BytesToHash(bb.executionHead.BlockHash)
if executionPayload.Random != hash {
return fmt.Errorf("invalid random: %s, head: %s", executionPayload.Random, head.BlockHash)
return fmt.Errorf("invalid random: %s, head: %s", executionPayload.Random, bb.executionHead.BlockHash)
}
return nil
}
Expand All @@ -493,10 +460,11 @@ func (bb *BlockBuilder) selectRetryFunction(ctx context.Context, msgID string) f
}
}

func (bb *BlockBuilder) pushNewPayload(ctx context.Context, executionPayload engine.ExecutableData, hash common.Hash, retryFunc func(f func() error) error) error {
func (bb *BlockBuilder) pushNewPayload(ctx context.Context, executionPayload engine.ExecutableData, retryFunc func(f func() error) error) error {
emptyVersionHashes := []common.Hash{}
parentHash := common.BytesToHash(bb.executionHead.BlockHash)
return retryFunc(func() error {
status, err := bb.engineCl.NewPayloadV4(ctx, executionPayload, emptyVersionHashes, &hash, []hexutil.Bytes{})
status, err := bb.engineCl.NewPayloadV4(ctx, executionPayload, emptyVersionHashes, &parentHash, []hexutil.Bytes{})
bb.logger.Debug("newPayload result",
"status", status.Status,
"validationError", status.ValidationError,
Expand Down Expand Up @@ -534,23 +502,17 @@ func (bb *BlockBuilder) updateForkChoice(ctx context.Context, fcs engine.Forkcho
})
}

func (bb *BlockBuilder) loadExecutionHead(ctx context.Context) (*types.ExecutionHead, error) {
if bb.executionHead != nil {
return bb.executionHead, nil
}

func (bb *BlockBuilder) setExecutionHeadFromRPC(ctx context.Context) error {
header, err := bb.engineCl.HeaderByNumber(ctx, nil) // nil for the latest block
if err != nil {
return nil, fmt.Errorf("failed to get the latest block header: %w", err)
return fmt.Errorf("failed to get the latest block header: %w", err)
}

bb.executionHead = &types.ExecutionHead{
BlockHeight: header.Number.Uint64(),
BlockHash: header.Hash().Bytes(),
BlockTime: header.Time,
}

return bb.executionHead, nil
return nil
}

func (bb *BlockBuilder) GetExecutionHead() *types.ExecutionHead {
Expand Down
20 changes: 9 additions & 11 deletions cl/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func TestBlockBuilder_startBuild(t *testing.T) {
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
}
timestamp := time.Now()
Expand All @@ -117,7 +116,9 @@ func TestBlockBuilder_startBuild(t *testing.T) {
}
mockEngineClient.On("ForkchoiceUpdatedV3", mock.Anything, expectedFCS, mock.MatchedBy(matchPayloadAttributes(hash, executionHead.BlockTime))).Return(forkChoiceResponse, nil)

resp, err := blockBuilder.startBuild(ctx, executionHead, uint64(timestamp.UnixMilli()))
blockBuilder.executionHead = executionHead

resp, err := blockBuilder.startBuild(ctx, uint64(timestamp.UnixMilli()))

require.NoError(t, err)
assert.Equal(t, forkChoiceResponse, resp)
Expand Down Expand Up @@ -162,7 +163,6 @@ func TestBlockBuilder_getPayload(t *testing.T) {
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
}

Expand Down Expand Up @@ -229,7 +229,6 @@ func TestBlockBuilder_FinalizeBlock(t *testing.T) {
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
}

Expand Down Expand Up @@ -312,7 +311,6 @@ func TestBlockBuilder_startBuild_ForkchoiceUpdatedError(t *testing.T) {
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
}

Expand All @@ -327,7 +325,9 @@ func TestBlockBuilder_startBuild_ForkchoiceUpdatedError(t *testing.T) {

mockEngineClient.On("ForkchoiceUpdatedV3", mock.Anything, expectedFCS, mock.MatchedBy(matchPayloadAttributes(hash, executionHead.BlockTime))).Return(engine.ForkChoiceResponse{}, errors.New("engine error"))

resp, err := blockBuilder.startBuild(ctx, executionHead, uint64(timestamp.UnixMilli()))
blockBuilder.executionHead = executionHead

resp, err := blockBuilder.startBuild(ctx, uint64(timestamp.UnixMilli()))

require.Error(t, err)
assert.Contains(t, err.Error(), "forkchoice update")
Expand Down Expand Up @@ -357,7 +357,6 @@ func TestBlockBuilder_startBuild_InvalidPayloadStatus(t *testing.T) {
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
}

Expand All @@ -378,7 +377,9 @@ func TestBlockBuilder_startBuild_InvalidPayloadStatus(t *testing.T) {
}
mockEngineClient.On("ForkchoiceUpdatedV3", mock.Anything, expectedFCS, mock.MatchedBy(matchPayloadAttributes(hash, executionHead.BlockTime))).Return(forkChoiceResponse, nil)

resp, err := blockBuilder.startBuild(ctx, executionHead, uint64(timestamp.UnixMilli()))
blockBuilder.executionHead = executionHead

resp, err := blockBuilder.startBuild(ctx, uint64(timestamp.UnixMilli()))

require.NoError(t, err)
assert.Equal(t, forkChoiceResponse, resp)
Expand Down Expand Up @@ -459,7 +460,6 @@ func TestBlockBuilder_FinalizeBlock_InvalidBlockHeight(t *testing.T) {
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
}

Expand Down Expand Up @@ -515,7 +515,6 @@ func TestBlockBuilder_FinalizeBlock_NewPayloadInvalidStatus(t *testing.T) {
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
}

Expand Down Expand Up @@ -576,7 +575,6 @@ func TestBlockBuilder_FinalizeBlock_ForkchoiceUpdatedInvalidStatus(t *testing.T)
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
}

Expand Down
4 changes: 0 additions & 4 deletions cl/redisapp/leaderfollower/leaderfollower.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ type blockBuilder interface {

// Processes any unfinished payload from a previous session
ProcessLastPayload(ctx context.Context) error

// Sets the last call time to zero
SetLastCallTimeToZero()
}

// todo: work with block state through block builder, not directly
Expand Down Expand Up @@ -263,7 +260,6 @@ func (lfm *LeaderFollowerManager) leaderWork(ctx context.Context) error {
lfm.logger.Error("Leader: failed to reach geth node after max attempts, exiting")
stopElecErr := lfm.leaderProc.Stop()
// todo: refactor to generate timestamp outside blockbuilder
lfm.blockBuilder.SetLastCallTimeToZero()
if stopElecErr != nil {
lfm.logger.Error(
"Leader: Failed to stop leader election",
Expand Down
Loading