Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 214 additions & 2 deletions packages/beacon-node/src/execution/engine/http.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {Logger} from "@lodestar/logger";
import {ForkName, ForkPostFulu, ForkPreFulu, ForkSeq, SLOTS_PER_EPOCH, isForkPostFulu} from "@lodestar/params";
import {BlobsBundle, ExecutionPayload, ExecutionRequests, Root, RootHex, Wei} from "@lodestar/types";
import {BlobsBundle, ExecutionPayload, ExecutionRequests, Root, RootHex, Wei, ssz as sszCodecs} from "@lodestar/types";
import {BlobAndProof} from "@lodestar/types/deneb";
import {BlobAndProofV2} from "@lodestar/types/fulu";
import {strip0xPrefix} from "@lodestar/utils";
import {fromHex, strip0xPrefix} from "@lodestar/utils";
import {Metrics} from "../../metrics/index.js";
import {EPOCHS_PER_BATCH} from "../../sync/constants.js";
import {getLodestarClientVersion} from "../../util/metadata.js";
Expand All @@ -27,6 +27,17 @@ import {
ReqOpts,
} from "./jsonRpcHttpClient.js";
import {PayloadIdCache} from "./payloadIdCache.js";
import {SszRestClient, isSszRestNetworkError} from "./sszRestClient.js";
import {
decodeForkchoiceUpdatedResponse,
decodeGetBlobsResponse,
decodeGetPayloadResponse,
decodePayloadStatus,
encodeForkchoiceUpdatedRequest,
encodeGetBlobsRequest,
encodeGetPayloadRequest,
encodeNewPayloadRequest,
} from "./sszRestEncoding.js";
import {
BLOB_AND_PROOF_V2_RPC_BYTES,
EngineApiRpcParamTypes,
Expand Down Expand Up @@ -80,6 +91,11 @@ export type ExecutionEngineHttpOpts = {
* Lodestar commit to be used for `ClientVersion`
*/
commit?: string;
/**
* EIP-8161: SSZ-REST Engine API transport URL.
* When configured, the engine will try SSZ-REST first and fall back to JSON-RPC on network errors.
*/
sszRestUrl?: string;
};

export const defaultExecutionEngineHttpOpts: ExecutionEngineHttpOpts = {
Expand Down Expand Up @@ -139,6 +155,10 @@ export class ExecutionEngineHttp implements IExecutionEngine {
clientVersion?: ClientVersion | null;

readonly payloadIdCache = new PayloadIdCache();

/** EIP-8161: SSZ-REST client, null if not configured */
private readonly sszRestClient: SszRestClient | null;

/**
* A queue to serialize the fcUs and newPayloads calls:
*
Expand Down Expand Up @@ -170,6 +190,20 @@ export class ExecutionEngineHttp implements IExecutionEngine {
this.logger = logger;
this.metrics = metrics ?? null;

// EIP-8161: Initialize SSZ-REST client if configured
if (opts?.sszRestUrl) {
this.sszRestClient = new SszRestClient({
baseUrl: opts.sszRestUrl,
jwtSecretHex: opts.jwtSecretHex,
jwtId: opts.jwtId,
jwtVersion: opts.jwtVersion,
timeout: opts.timeout,
});
this.logger.info("SSZ-REST Engine API transport enabled (EIP-8161)", {url: opts.sszRestUrl});
} else {
this.sszRestClient = null;
}

this.rpc.emitter.on(JsonRpcHttpClientEvent.ERROR, ({error}) => {
this.updateEngineState(getExecutionEngineState({payloadError: error, oldState: this.state}));
});
Expand Down Expand Up @@ -215,6 +249,44 @@ export class ExecutionEngineHttp implements IExecutionEngine {
parentBlockRoot?: Root,
executionRequests?: ExecutionRequests
): Promise<ExecutePayloadResponse> {
// EIP-8161: Try SSZ-REST first, fall back to JSON-RPC on network errors
if (this.sszRestClient) {
try {
const version =
ForkSeq[fork] >= ForkSeq.fulu ? 5 : ForkSeq[fork] >= ForkSeq.electra ? 4 : ForkSeq[fork] >= ForkSeq.deneb ? 3 : ForkSeq[fork] >= ForkSeq.capella ? 2 : 1;
Comment on lines +251 to +252
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This version selection logic is duplicated in the getPayload method (lines 554-555). To improve maintainability and adhere to the DRY (Don't Repeat Yourself) principle, consider extracting this logic into a dedicated helper function. For example:

function getPayloadApiVersion(fork: ForkName): number {
  const forkSeq = ForkSeq[fork];
  if (forkSeq >= ForkSeq.fulu) return 5;
  if (forkSeq >= ForkSeq.electra) return 4;
  if (forkSeq >= ForkSeq.deneb) return 3;
  if (forkSeq >= ForkSeq.capella) return 2;
  return 1;
}

You can then call this function here and in getPayload.

const path = `/engine/v${version}/new_payload`;
const body = encodeNewPayloadRequest(fork, executionPayload, versionedHashes, parentBlockRoot, executionRequests);
const resp = await this.sszRestClient.doRequest(path, body);
const decoded = decodePayloadStatus(resp);
const status = decoded.status as ExecutionPayloadStatus;
this.updateEngineState(getExecutionEngineState({payloadStatus: status, oldState: this.state}));

switch (status) {
case ExecutionPayloadStatus.VALID:
return {status, latestValidHash: decoded.latestValidHash ?? "0x0", validationError: null};
case ExecutionPayloadStatus.INVALID:
return {status, latestValidHash: decoded.latestValidHash, validationError: decoded.validationError};
case ExecutionPayloadStatus.SYNCING:
case ExecutionPayloadStatus.ACCEPTED:
return {status, latestValidHash: null, validationError: null};
case ExecutionPayloadStatus.INVALID_BLOCK_HASH:
return {status, latestValidHash: null, validationError: decoded.validationError ?? "Malformed block"};
default:
return {
status: ExecutionPayloadStatus.ELERROR,
latestValidHash: null,
validationError: `Invalid EL status on executePayload: ${status}`,
};
}
} catch (e) {
if (isSszRestNetworkError(e)) {
this.logger.debug("SSZ-REST newPayload failed, falling back to JSON-RPC", {error: (e as Error).message});
} else {
throw e;
}
}
}

const method =
ForkSeq[fork] >= ForkSeq.electra
? "engine_newPayloadV4"
Expand Down Expand Up @@ -345,6 +417,60 @@ export class ExecutionEngineHttp implements IExecutionEngine {
finalizedBlockHash: RootHex,
payloadAttributes?: PayloadAttributes
): Promise<PayloadId | null> {
// EIP-8161: Try SSZ-REST first, fall back to JSON-RPC on network errors
if (this.sszRestClient) {
try {
const version = ForkSeq[fork] >= ForkSeq.deneb ? 3 : ForkSeq[fork] >= ForkSeq.capella ? 2 : 1;
const path = `/engine/v${version}/forkchoice_updated`;
const headBytes = fromHex(headBlockHash);
const safeBytes = fromHex(safeBlockHash);
const finalizedBytes = fromHex(finalizedBlockHash);
const body = encodeForkchoiceUpdatedRequest(headBytes, safeBytes, finalizedBytes, payloadAttributes);
const resp = await this.sszRestClient.doRequest(path, body);
const decoded = decodeForkchoiceUpdatedResponse(resp);
const status = decoded.payloadStatus.status as ExecutionPayloadStatus;

this.updateEngineState(getExecutionEngineState({payloadStatus: status, oldState: this.state}));
this.metrics?.engineNotifyForkchoiceUpdateResult.inc({result: status});

const payloadAttributesRpc = payloadAttributes ? serializePayloadAttributes(payloadAttributes) : undefined;

switch (status) {
case ExecutionPayloadStatus.VALID:
if (payloadAttributesRpc) {
if (!decoded.payloadId || decoded.payloadId === "0x") {
throw Error(`Received invalid payloadId=${decoded.payloadId}`);
}
this.payloadIdCache.add({headBlockHash, finalizedBlockHash, ...payloadAttributesRpc}, decoded.payloadId);
void this.prunePayloadIdCache();
}
return decoded.payloadId !== "0x" ? decoded.payloadId : null;

case ExecutionPayloadStatus.SYNCING:
if (payloadAttributes) {
throw Error("Execution Layer Syncing");
}
return null;

case ExecutionPayloadStatus.INVALID:
throw Error(
`Invalid ${payloadAttributes ? "prepare payload" : "forkchoice request"}, validationError=${
decoded.payloadStatus.validationError ?? ""
}`
);

default:
throw Error(`Unknown status ${status}`);
}
} catch (e) {
if (isSszRestNetworkError(e)) {
this.logger.debug("SSZ-REST forkchoiceUpdate failed, falling back to JSON-RPC", {error: (e as Error).message});
} else {
throw e;
}
}
}

// Once on capella, should this need to be permanently switched to v2 when payload attrs
// not provided
const method =
Expand Down Expand Up @@ -422,6 +548,44 @@ export class ExecutionEngineHttp implements IExecutionEngine {
executionRequests?: ExecutionRequests;
shouldOverrideBuilder?: boolean;
}> {
// EIP-8161: Try SSZ-REST first, fall back to JSON-RPC on network errors
if (this.sszRestClient) {
try {
const version =
ForkSeq[fork] >= ForkSeq.fulu ? 5 : ForkSeq[fork] >= ForkSeq.electra ? 4 : ForkSeq[fork] >= ForkSeq.deneb ? 3 : ForkSeq[fork] >= ForkSeq.capella ? 2 : 1;
const path = `/engine/v${version}/get_payload`;
const payloadIdBytes = fromHex(payloadId);
const body = encodeGetPayloadRequest(payloadIdBytes);
const resp = await this.sszRestClient.doRequest(path, body);
const decoded = decodeGetPayloadResponse(resp);

// The executionPayloadSsz needs to be parsed back through the JSON-RPC parseExecutionPayload path
// For now, we return the raw SSZ and let the caller handle it.
// Actually, we can deserialize the SSZ payload using @lodestar/types
const executionPayload = deserializeExecutionPayloadSsz(fork, decoded.executionPayloadSsz);
const executionPayloadValue = decoded.blockValue;
const blobsBundle =
decoded.blobsBundleSsz.length > 0 ? deserializeBlobsBundleSsz(decoded.blobsBundleSsz) : undefined;
const executionRequests =
decoded.executionRequestsSsz.length > 0
? deserializeExecutionRequestsSsz(decoded.executionRequestsSsz)
: undefined;
return {
executionPayload,
executionPayloadValue,
blobsBundle,
executionRequests,
shouldOverrideBuilder: decoded.shouldOverrideBuilder,
};
} catch (e) {
if (isSszRestNetworkError(e)) {
this.logger.debug("SSZ-REST getPayload failed, falling back to JSON-RPC", {error: (e as Error).message});
} else {
throw e;
}
}
}

let method: keyof EngineApiRpcReturnTypes;
switch (fork) {
case ForkName.phase0:
Expand Down Expand Up @@ -500,6 +664,28 @@ export class ExecutionEngineHttp implements IExecutionEngine {
versionedHashes: VersionedHashes
): Promise<BlobAndProofV2[] | (BlobAndProof | null)[] | null> {
assertReqSizeLimit(versionedHashes.length, MAX_VERSIONED_HASHES);

// EIP-8161: Try SSZ-REST first for getBlobs, fall back to JSON-RPC on network errors
if (this.sszRestClient) {
try {
const version = isForkPostFulu(fork) ? 2 : 1;
const path = `/engine/v${version}/get_blobs`;
const body = encodeGetBlobsRequest(versionedHashes);
const resp = await this.sszRestClient.doRequest(path, body);
const decoded = decodeGetBlobsResponse(resp);
return decoded.map((item) => ({
blob: item.blob,
proof: item.kzgProof,
}));
} catch (e) {
if (isSszRestNetworkError(e)) {
this.logger.debug("SSZ-REST getBlobs failed, falling back to JSON-RPC", {error: (e as Error).message});
} else {
throw e;
}
}
}

const versionedHashesHex = versionedHashes.map(bytesToData);
if (isForkPostFulu(fork)) {
return await this.getBlobsV2(versionedHashesHex);
Expand Down Expand Up @@ -629,6 +815,32 @@ export class ExecutionEngineHttp implements IExecutionEngine {
}
}

/**
* Deserialize an ExecutionPayload from SSZ bytes using the appropriate
* @lodestar/types codec for the given fork. Used by the SSZ-REST getPayload path.
*/
function deserializeExecutionPayloadSsz(fork: ForkName, data: Uint8Array): ExecutionPayload {
const forkSeq = ForkSeq[fork];
if (forkSeq >= ForkSeq.electra) {
return sszCodecs.electra.ExecutionPayload.deserialize(data);
}
if (forkSeq >= ForkSeq.deneb) {
return sszCodecs.deneb.ExecutionPayload.deserialize(data);
}
if (forkSeq >= ForkSeq.capella) {
return sszCodecs.capella.ExecutionPayload.deserialize(data);
}
return sszCodecs.bellatrix.ExecutionPayload.deserialize(data);
}

function deserializeBlobsBundleSsz(data: Uint8Array): BlobsBundle {
return sszCodecs.deneb.BlobsBundle.deserialize(data);
}

function deserializeExecutionRequestsSsz(data: Uint8Array): ExecutionRequests {
return sszCodecs.electra.ExecutionRequests.deserialize(data);
}

type EngineRequestKey = keyof EngineApiRpcParamTypes;
type EngineRequestByKey = {
[K in EngineRequestKey]: {method: K; params: EngineApiRpcParamTypes[K]; methodOpts: ReqOpts};
Expand Down
Loading
Loading