Skip to content

Commit f8d4032

Browse files
authored
feat: simplify single node cl implementation (#755) (#757)
1 parent b9e24ce commit f8d4032

File tree

3 files changed

+65
-109
lines changed

3 files changed

+65
-109
lines changed

cl/blockbuilder/blockbuilder.go

Lines changed: 56 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ type BlockBuilder struct {
5454
logger *slog.Logger
5555
buildDelay time.Duration
5656
buildEmptyBlocksDelay time.Duration
57-
buildDelayMs uint64
58-
lastBlockTime time.Time
5957
feeRecipient common.Address
6058
executionHead *types.ExecutionHead
6159
}
@@ -74,10 +72,8 @@ func NewBlockBuilder(
7472
engineCl: engineCl,
7573
logger: logger,
7674
buildDelay: buildDelay,
77-
buildDelayMs: uint64(buildDelay.Milliseconds()),
7875
buildEmptyBlocksDelay: buildDelayEmptyBlocks,
7976
feeRecipient: common.HexToAddress(feeReceipt),
80-
lastBlockTime: time.Now().Add(-buildDelayEmptyBlocks),
8177
rpcClient: rpcClient,
8278
}
8379
}
@@ -89,12 +85,8 @@ func NewMemberBlockBuilder(engineCL EngineClient, logger *slog.Logger) *BlockBui
8985
}
9086
}
9187

92-
func (bb *BlockBuilder) SetLastCallTimeToZero() {
93-
bb.lastBlockTime = time.Time{}
94-
}
95-
96-
func (bb *BlockBuilder) startBuild(ctx context.Context, head *types.ExecutionHead, ts uint64) (engine.ForkChoiceResponse, error) {
97-
hash := common.BytesToHash(head.BlockHash)
88+
func (bb *BlockBuilder) startBuild(ctx context.Context, ts uint64) (engine.ForkChoiceResponse, error) {
89+
hash := common.BytesToHash(bb.executionHead.BlockHash)
9890

9991
fcs := engine.ForkchoiceStateV1{
10092
HeadBlockHash: hash,
@@ -123,19 +115,39 @@ func (bb *BlockBuilder) startBuild(ctx context.Context, head *types.ExecutionHea
123115
func (bb *BlockBuilder) GetPayload(ctx context.Context) error {
124116
var (
125117
payloadID *engine.PayloadID
126-
head *types.ExecutionHead
127118
err error
128119
)
129-
currentCallTime := time.Now()
120+
121+
if bb.executionHead == nil {
122+
bb.logger.Info("executionHead is nil, it'll be set by RPC. CL is likely being restarted")
123+
err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error {
124+
innerErr := bb.setExecutionHeadFromRPC(ctx)
125+
if innerErr != nil {
126+
bb.logger.Warn(
127+
"Failed to set execution head from rpc, retrying...",
128+
"error", innerErr,
129+
)
130+
return innerErr // Will retry
131+
}
132+
return nil // Success
133+
})
134+
if err != nil {
135+
return fmt.Errorf("failed to set execution head from rpc: %w", err)
136+
}
137+
} else {
138+
bb.logger.Debug("executionHead is not nil, using cached value")
139+
}
130140

131141
mempoolStatus, err := bb.GetMempoolStatus(ctx)
132142
if err != nil {
133143
return fmt.Errorf("failed to get pending transaction count: %w", err)
134144
}
135-
bb.logger.Debug("GetMempoolStatus rpc duration", "duration", time.Since(currentCallTime))
145+
146+
lastBlockTime := time.UnixMilli(int64(bb.executionHead.BlockTime))
147+
bb.logger.Debug("lastBlockTime from execution head", "lastBlockTime", lastBlockTime)
136148

137149
if mempoolStatus.Pending == 0 {
138-
timeSinceLastBlock := currentCallTime.Sub(bb.lastBlockTime)
150+
timeSinceLastBlock := time.Since(lastBlockTime)
139151
if timeSinceLastBlock < bb.buildEmptyBlocksDelay {
140152
bb.logger.Debug(
141153
"Leader: Skipping empty block",
@@ -153,50 +165,24 @@ func (bb *BlockBuilder) GetPayload(ctx context.Context) error {
153165
)
154166
}
155167

156-
// Load execution head to get previous block timestamp
157-
err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error {
158-
head, err = bb.loadExecutionHead(ctx)
159-
if err != nil {
160-
bb.logger.Warn(
161-
"Failed to load execution head, retrying...",
162-
"error", err,
163-
)
164-
return err // Will retry
165-
}
166-
return nil // Success
167-
})
168-
if err != nil {
169-
return fmt.Errorf("latest execution block: %w", err)
170-
}
171-
172-
prevTimestamp := head.BlockTime
173-
174-
var ts uint64
175-
176-
if bb.lastBlockTime.IsZero() {
177-
// First block, initialize LastCallTime and set default timestamp
178-
ts = uint64(time.Now().UnixMilli()) + bb.buildDelayMs
179-
} else {
180-
// Compute diff in milliseconds
181-
diff := currentCallTime.Sub(bb.lastBlockTime)
182-
diffMillis := diff.Milliseconds()
183-
184-
if uint64(diffMillis) <= bb.buildDelayMs {
185-
ts = prevTimestamp + bb.buildDelayMs
186-
} else {
187-
// For every multiple of buildDelay that diff exceeds, increment the block time by that multiple.
188-
multiples := (uint64(diffMillis) + bb.buildDelayMs - 1) / bb.buildDelayMs // Round up to next multiple of buildDelay
189-
ts = prevTimestamp + multiples*bb.buildDelayMs
168+
ts := uint64(time.Now().UnixMilli())
169+
if ts <= bb.executionHead.BlockTime {
170+
bb.logger.Warn(`Leader: Current timestamp is less than or equal to
171+
previous block timestamp. Genesis timestamp could have been set incorrectly`,
172+
"current_ts", ts,
173+
"prev_head_block_time", bb.executionHead.BlockTime,
174+
)
175+
ts = bb.executionHead.BlockTime + 1
176+
select {
177+
case <-ctx.Done():
178+
return fmt.Errorf("context cancelled")
179+
case <-time.After(time.Until(time.UnixMilli(int64(ts)))):
180+
bb.logger.Info("Leader: Waited until proper block timestamp", "ts", ts)
190181
}
191182
}
192183

193-
// Very low chance to happen, only after restart and time.Now is broken
194-
if ts <= head.BlockTime {
195-
ts = head.BlockTime + 1 // Subsequent blocks must have a higher timestamp.
196-
}
197-
198184
err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error {
199-
response, err := bb.startBuild(ctx, head, ts)
185+
response, err := bb.startBuild(ctx, ts)
200186
if err != nil {
201187
bb.logger.Warn(
202188
"Failed to build new EVM payload, will retry",
@@ -275,8 +261,6 @@ func (bb *BlockBuilder) GetPayload(ctx context.Context) error {
275261
"Leader: BuildBlock completed and block is distributed",
276262
"PayloadID", payloadIDStr,
277263
)
278-
279-
bb.lastBlockTime = time.Now()
280264
return nil
281265
}
282266

@@ -418,30 +402,13 @@ func (bb *BlockBuilder) FinalizeBlock(ctx context.Context, payloadIDStr, executi
418402
return fmt.Errorf("failed to deserialize ExecutionPayload: %w", err)
419403
}
420404

421-
var head *types.ExecutionHead
422-
err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error {
423-
head, err = bb.loadExecutionHead(ctx)
424-
if err != nil {
425-
bb.logger.Warn(
426-
"Failed to load execution head, retrying...",
427-
"error", err,
428-
)
429-
return err // Will retry
430-
}
431-
return nil // Success
432-
})
433-
if err != nil {
434-
return fmt.Errorf("failed to load execution head: %w", err)
435-
}
436-
437-
if err := bb.validateExecutionPayload(executionPayload, head); err != nil {
405+
if err := bb.validateExecutionPayload(executionPayload); err != nil {
438406
return fmt.Errorf("failed to validate execution payload: %w", err)
439407
}
440408

441-
hash := common.BytesToHash(head.BlockHash)
442409
retryFunc := bb.selectRetryFunction(ctx, msgID)
443410

444-
if err := bb.pushNewPayload(ctx, executionPayload, hash, retryFunc); err != nil {
411+
if err := bb.pushNewPayload(ctx, executionPayload, retryFunc); err != nil {
445412
return fmt.Errorf("failed to push new payload: %w", err)
446413
}
447414

@@ -464,20 +431,20 @@ func (bb *BlockBuilder) FinalizeBlock(ctx context.Context, payloadIDStr, executi
464431
return nil
465432
}
466433

467-
func (bb *BlockBuilder) validateExecutionPayload(executionPayload engine.ExecutableData, head *types.ExecutionHead) error {
468-
if executionPayload.Number != head.BlockHeight+1 {
469-
return fmt.Errorf("invalid block height: %d, expected: %d", executionPayload.Number, head.BlockHeight+1)
434+
func (bb *BlockBuilder) validateExecutionPayload(executionPayload engine.ExecutableData) error {
435+
if executionPayload.Number != bb.executionHead.BlockHeight+1 {
436+
return fmt.Errorf("invalid block height: %d, expected: %d", executionPayload.Number, bb.executionHead.BlockHeight+1)
470437
}
471-
if executionPayload.ParentHash != common.Hash(head.BlockHash) {
472-
return fmt.Errorf("invalid parent hash: %s, head: %s", executionPayload.ParentHash, head.BlockHash)
438+
if executionPayload.ParentHash != common.Hash(bb.executionHead.BlockHash) {
439+
return fmt.Errorf("invalid parent hash: %s, head: %s", executionPayload.ParentHash, bb.executionHead.BlockHash)
473440
}
474-
minTimestamp := head.BlockTime + 1
441+
minTimestamp := bb.executionHead.BlockTime + 1
475442
if executionPayload.Timestamp < minTimestamp && executionPayload.Number != 1 {
476443
return fmt.Errorf("invalid timestamp: %d, min: %d", executionPayload.Timestamp, minTimestamp)
477444
}
478-
hash := common.BytesToHash(head.BlockHash)
445+
hash := common.BytesToHash(bb.executionHead.BlockHash)
479446
if executionPayload.Random != hash {
480-
return fmt.Errorf("invalid random: %s, head: %s", executionPayload.Random, head.BlockHash)
447+
return fmt.Errorf("invalid random: %s, head: %s", executionPayload.Random, bb.executionHead.BlockHash)
481448
}
482449
return nil
483450
}
@@ -493,10 +460,11 @@ func (bb *BlockBuilder) selectRetryFunction(ctx context.Context, msgID string) f
493460
}
494461
}
495462

496-
func (bb *BlockBuilder) pushNewPayload(ctx context.Context, executionPayload engine.ExecutableData, hash common.Hash, retryFunc func(f func() error) error) error {
463+
func (bb *BlockBuilder) pushNewPayload(ctx context.Context, executionPayload engine.ExecutableData, retryFunc func(f func() error) error) error {
497464
emptyVersionHashes := []common.Hash{}
465+
parentHash := common.BytesToHash(bb.executionHead.BlockHash)
498466
return retryFunc(func() error {
499-
status, err := bb.engineCl.NewPayloadV4(ctx, executionPayload, emptyVersionHashes, &hash, []hexutil.Bytes{})
467+
status, err := bb.engineCl.NewPayloadV4(ctx, executionPayload, emptyVersionHashes, &parentHash, []hexutil.Bytes{})
500468
bb.logger.Debug("newPayload result",
501469
"status", status.Status,
502470
"validationError", status.ValidationError,
@@ -534,23 +502,17 @@ func (bb *BlockBuilder) updateForkChoice(ctx context.Context, fcs engine.Forkcho
534502
})
535503
}
536504

537-
func (bb *BlockBuilder) loadExecutionHead(ctx context.Context) (*types.ExecutionHead, error) {
538-
if bb.executionHead != nil {
539-
return bb.executionHead, nil
540-
}
541-
505+
func (bb *BlockBuilder) setExecutionHeadFromRPC(ctx context.Context) error {
542506
header, err := bb.engineCl.HeaderByNumber(ctx, nil) // nil for the latest block
543507
if err != nil {
544-
return nil, fmt.Errorf("failed to get the latest block header: %w", err)
508+
return fmt.Errorf("failed to get the latest block header: %w", err)
545509
}
546-
547510
bb.executionHead = &types.ExecutionHead{
548511
BlockHeight: header.Number.Uint64(),
549512
BlockHash: header.Hash().Bytes(),
550513
BlockTime: header.Time,
551514
}
552-
553-
return bb.executionHead, nil
515+
return nil
554516
}
555517

556518
func (bb *BlockBuilder) GetExecutionHead() *types.ExecutionHead {

cl/blockbuilder/blockbuilder_test.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ func TestBlockBuilder_startBuild(t *testing.T) {
9797
engineCl: mockEngineClient,
9898
rpcClient: createMockRPCClient(),
9999
buildDelay: buildDelay,
100-
buildDelayMs: uint64(buildDelay.Milliseconds()),
101100
logger: stLog,
102101
}
103102
timestamp := time.Now()
@@ -117,7 +116,9 @@ func TestBlockBuilder_startBuild(t *testing.T) {
117116
}
118117
mockEngineClient.On("ForkchoiceUpdatedV3", mock.Anything, expectedFCS, mock.MatchedBy(matchPayloadAttributes(hash, executionHead.BlockTime))).Return(forkChoiceResponse, nil)
119118

120-
resp, err := blockBuilder.startBuild(ctx, executionHead, uint64(timestamp.UnixMilli()))
119+
blockBuilder.executionHead = executionHead
120+
121+
resp, err := blockBuilder.startBuild(ctx, uint64(timestamp.UnixMilli()))
121122

122123
require.NoError(t, err)
123124
assert.Equal(t, forkChoiceResponse, resp)
@@ -162,7 +163,6 @@ func TestBlockBuilder_getPayload(t *testing.T) {
162163
engineCl: mockEngineClient,
163164
rpcClient: createMockRPCClient(),
164165
buildDelay: buildDelay,
165-
buildDelayMs: uint64(buildDelay.Milliseconds()),
166166
logger: stLog,
167167
}
168168

@@ -229,7 +229,6 @@ func TestBlockBuilder_FinalizeBlock(t *testing.T) {
229229
engineCl: mockEngineClient,
230230
rpcClient: createMockRPCClient(),
231231
buildDelay: buildDelay,
232-
buildDelayMs: uint64(buildDelay.Milliseconds()),
233232
logger: stLog,
234233
}
235234

@@ -312,7 +311,6 @@ func TestBlockBuilder_startBuild_ForkchoiceUpdatedError(t *testing.T) {
312311
engineCl: mockEngineClient,
313312
rpcClient: createMockRPCClient(),
314313
buildDelay: buildDelay,
315-
buildDelayMs: uint64(buildDelay.Milliseconds()),
316314
logger: stLog,
317315
}
318316

@@ -327,7 +325,9 @@ func TestBlockBuilder_startBuild_ForkchoiceUpdatedError(t *testing.T) {
327325

328326
mockEngineClient.On("ForkchoiceUpdatedV3", mock.Anything, expectedFCS, mock.MatchedBy(matchPayloadAttributes(hash, executionHead.BlockTime))).Return(engine.ForkChoiceResponse{}, errors.New("engine error"))
329327

330-
resp, err := blockBuilder.startBuild(ctx, executionHead, uint64(timestamp.UnixMilli()))
328+
blockBuilder.executionHead = executionHead
329+
330+
resp, err := blockBuilder.startBuild(ctx, uint64(timestamp.UnixMilli()))
331331

332332
require.Error(t, err)
333333
assert.Contains(t, err.Error(), "forkchoice update")
@@ -357,7 +357,6 @@ func TestBlockBuilder_startBuild_InvalidPayloadStatus(t *testing.T) {
357357
engineCl: mockEngineClient,
358358
rpcClient: createMockRPCClient(),
359359
buildDelay: buildDelay,
360-
buildDelayMs: uint64(buildDelay.Milliseconds()),
361360
logger: stLog,
362361
}
363362

@@ -378,7 +377,9 @@ func TestBlockBuilder_startBuild_InvalidPayloadStatus(t *testing.T) {
378377
}
379378
mockEngineClient.On("ForkchoiceUpdatedV3", mock.Anything, expectedFCS, mock.MatchedBy(matchPayloadAttributes(hash, executionHead.BlockTime))).Return(forkChoiceResponse, nil)
380379

381-
resp, err := blockBuilder.startBuild(ctx, executionHead, uint64(timestamp.UnixMilli()))
380+
blockBuilder.executionHead = executionHead
381+
382+
resp, err := blockBuilder.startBuild(ctx, uint64(timestamp.UnixMilli()))
382383

383384
require.NoError(t, err)
384385
assert.Equal(t, forkChoiceResponse, resp)
@@ -459,7 +460,6 @@ func TestBlockBuilder_FinalizeBlock_InvalidBlockHeight(t *testing.T) {
459460
engineCl: mockEngineClient,
460461
rpcClient: createMockRPCClient(),
461462
buildDelay: buildDelay,
462-
buildDelayMs: uint64(buildDelay.Milliseconds()),
463463
logger: stLog,
464464
}
465465

@@ -515,7 +515,6 @@ func TestBlockBuilder_FinalizeBlock_NewPayloadInvalidStatus(t *testing.T) {
515515
engineCl: mockEngineClient,
516516
rpcClient: createMockRPCClient(),
517517
buildDelay: buildDelay,
518-
buildDelayMs: uint64(buildDelay.Milliseconds()),
519518
logger: stLog,
520519
}
521520

@@ -576,7 +575,6 @@ func TestBlockBuilder_FinalizeBlock_ForkchoiceUpdatedInvalidStatus(t *testing.T)
576575
engineCl: mockEngineClient,
577576
rpcClient: createMockRPCClient(),
578577
buildDelay: buildDelay,
579-
buildDelayMs: uint64(buildDelay.Milliseconds()),
580578
logger: stLog,
581579
}
582580

cl/redisapp/leaderfollower/leaderfollower.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,6 @@ type blockBuilder interface {
3939

4040
// Processes any unfinished payload from a previous session
4141
ProcessLastPayload(ctx context.Context) error
42-
43-
// Sets the last call time to zero
44-
SetLastCallTimeToZero()
4542
}
4643

4744
// todo: work with block state through block builder, not directly
@@ -263,7 +260,6 @@ func (lfm *LeaderFollowerManager) leaderWork(ctx context.Context) error {
263260
lfm.logger.Error("Leader: failed to reach geth node after max attempts, exiting")
264261
stopElecErr := lfm.leaderProc.Stop()
265262
// todo: refactor to generate timestamp outside blockbuilder
266-
lfm.blockBuilder.SetLastCallTimeToZero()
267263
if stopElecErr != nil {
268264
lfm.logger.Error(
269265
"Leader: Failed to stop leader election",

0 commit comments

Comments
 (0)