Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions packages/beacon-node/src/sync/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,22 @@ export const MAX_CONCURRENT_REQUESTS = 2;
* so having this constant too big is a waste of resources and peers may rate limit us.
*/
export const MAX_LOOK_AHEAD_EPOCHS = 2;

/**
* Maximum number of consecutive rate-limited download attempts per batch before giving up.
* Rate-limited attempts are tracked separately from regular download failures since they
* indicate the peer is healthy but throttling us — retrying with backoff is appropriate.
*/
export const MAX_RATE_LIMITED_RETRIES = 3;

/**
* Initial delay in milliseconds before retrying a rate-limited batch download when no
* alternative peer is available. Doubles on each consecutive rate-limited attempt up to
* {@link RATE_LIMITED_MAX_DELAY_MS}.
*/
export const RATE_LIMITED_INITIAL_DELAY_MS = 50;

/**
* Maximum backoff delay in milliseconds for rate-limited batch retries.
*/
export const RATE_LIMITED_MAX_DELAY_MS = 200;
54 changes: 53 additions & 1 deletion packages/beacon-node/src/sync/range/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ import {PeerSyncMeta} from "../../network/peers/peersData.js";
import {IClock} from "../../util/clock.js";
import {CustodyConfig} from "../../util/dataColumns.js";
import {PeerIdStr} from "../../util/peerId.js";
import {MAX_BATCH_DOWNLOAD_ATTEMPTS, MAX_BATCH_PROCESSING_ATTEMPTS} from "../constants.js";
import {
MAX_BATCH_DOWNLOAD_ATTEMPTS,
MAX_BATCH_PROCESSING_ATTEMPTS,
MAX_RATE_LIMITED_RETRIES,
RATE_LIMITED_INITIAL_DELAY_MS,
RATE_LIMITED_MAX_DELAY_MS,
} from "../constants.js";
import {DownloadByRangeRequests} from "../utils/downloadByRange.js";
import {getBatchSlotRange, hashBlocks} from "./utils/index.js";

Expand All @@ -22,6 +28,8 @@ export enum BatchStatus {
AwaitingDownload = "AwaitingDownload",
/** The batch is being downloaded. */
Downloading = "Downloading",
/** The batch download was rate-limited and is waiting for cooldown before retrying. */
RateLimited = "RateLimited",
/** The batch has been completely downloaded and is ready for processing. */
AwaitingProcessing = "AwaitingProcessing",
/** The batch is being processed. */
Expand Down Expand Up @@ -56,6 +64,7 @@ export type DownloadSuccessState = {
export type BatchState =
| AwaitingDownloadState
| {status: BatchStatus.Downloading; peer: PeerIdStr; blocks: IBlockInput[]}
| {status: BatchStatus.RateLimited; blocks: IBlockInput[]}
| DownloadSuccessState
| {status: BatchStatus.Processing; blocks: IBlockInput[]; attempt: Attempt}
| {status: BatchStatus.AwaitingValidation; blocks: IBlockInput[]; attempt: Attempt};
Expand Down Expand Up @@ -94,6 +103,8 @@ export class Batch {
readonly executionErrorAttempts: Attempt[] = [];
/** The number of download retries this batch has undergone due to a failed request. */
private readonly failedDownloadAttempts: PeerIdStr[] = [];
/** The number of consecutive rate-limited download attempts. Reset on any successful download. */
private rateLimitedAttempts = 0;
private readonly config: ChainForkConfig;
private readonly clock: IClock;
private readonly custodyConfig: CustodyConfig;
Expand Down Expand Up @@ -285,6 +296,7 @@ export class Batch {
// ensure that blocks are always sorted before getting stored on the batch.state or being used to getRequests
blocks.sort((a, b) => a.slot - b.slot);

this.rateLimitedAttempts = 0; // Reset rate-limit streak on successful download
this.goodPeers.push(peer);

let allComplete = true;
Expand Down Expand Up @@ -323,6 +335,7 @@ export class Batch {
throw new BatchError(this.wrongStatusErrorType(BatchStatus.Downloading));
}

this.rateLimitedAttempts = 0; // Reset rate-limit streak on a non-rate-limit error
this.failedDownloadAttempts.push(peer);
if (this.failedDownloadAttempts.length > MAX_BATCH_DOWNLOAD_ATTEMPTS) {
throw new BatchError(this.errorType({code: BatchErrorCode.MAX_DOWNLOAD_ATTEMPTS}));
Expand All @@ -331,6 +344,45 @@ export class Batch {
this.state = {status: BatchStatus.AwaitingDownload, blocks: this.state.blocks};
}

/**
* Downloading -> RateLimited (with cooldown) or falls through to regular error
*
* Returns the cooldown delay in ms. If 0, max retries exhausted and state has been
* transitioned via downloadingError() — caller should NOT call endCoolDown().
*/
downloadingRateLimited(peer: PeerIdStr): number {
if (this.state.status !== BatchStatus.Downloading) {
throw new BatchError(this.wrongStatusErrorType(BatchStatus.Downloading));
}

this.rateLimitedAttempts++;
if (this.rateLimitedAttempts > MAX_RATE_LIMITED_RETRIES) {
// After exhausting rate-limit retries, fall through to regular download error handling.
// downloadingError() will reset the rate-limit counter and handle attempt tracking.
this.downloadingError(peer);
return 0;
}

const delayMs = Math.min(
RATE_LIMITED_INITIAL_DELAY_MS * 2 ** (this.rateLimitedAttempts - 1),
RATE_LIMITED_MAX_DELAY_MS
);

this.state = {status: BatchStatus.RateLimited, blocks: this.state.blocks};
return delayMs;
}

/**
* RateLimited -> AwaitingDownload
*/
endCoolDown(): void {
if (this.state.status !== BatchStatus.RateLimited) {
throw new BatchError(this.wrongStatusErrorType(BatchStatus.RateLimited));
}

this.state = {status: BatchStatus.AwaitingDownload, blocks: this.state.blocks};
}

/**
* AwaitingProcessing -> Processing
*/
Expand Down
99 changes: 65 additions & 34 deletions packages/beacon-node/src/sync/range/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {ItTrigger} from "../../util/itTrigger.js";
import {PeerIdStr} from "../../util/peerId.js";
import {WarnResult, wrapError} from "../../util/wrapError.js";
import {BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH, MAX_LOOK_AHEAD_EPOCHS} from "../constants.js";
import {DownloadByRangeError, DownloadByRangeErrorCode} from "../utils/downloadByRange.js";
import {DownloadByRangeError, DownloadByRangeErrorCode, isRateLimitRequestError} from "../utils/downloadByRange.js";
import {RangeSyncType} from "../utils/remoteSyncType.js";
import {Batch, BatchError, BatchErrorCode, BatchMetadata, BatchStatus} from "./batch.js";
import {
Expand Down Expand Up @@ -420,7 +420,11 @@ export class SyncChain {
// Note: Don't count batches in the AwaitingValidation state, to prevent stalling sync
// if the current processing window is contained in a long range of skip slots.
const batchesInBuffer = batches.filter((batch) => {
return batch.state.status === BatchStatus.Downloading || batch.state.status === BatchStatus.AwaitingProcessing;
return (
batch.state.status === BatchStatus.Downloading ||
batch.state.status === BatchStatus.RateLimited ||
batch.state.status === BatchStatus.AwaitingProcessing
);
});
if (batchesInBuffer.length > BATCH_BUFFER_SIZE) {
return null;
Expand Down Expand Up @@ -471,43 +475,70 @@ export class SyncChain {

if (res.err) {
// There's several known error cases where we want to take action on the peer
const errCode = (res.err as LodestarError<{code: string}>).type?.code;
const errCode = (res.err as LodestarError<{code: string}>).type?.code ?? (res.err as {code?: string}).code;
this.metrics?.syncRange.downloadByRange.error.inc({client: peer.client, code: errCode ?? "UNKNOWN"});
if (this.syncType === RangeSyncType.Finalized) {
// For finalized sync, we are stricter with peers as there is no ambiguity about which chain we're syncing.
// The below cases indicate the peer may be on a different chain, so are not penalized during head sync.

// Rate-limited responses are handled with backoff rather than peer penalties.
// The peer is healthy but throttling us — penalizing it would make things worse.
const isRateLimited = isRateLimitRequestError(errCode);
if (isRateLimited) {
const delayMs = batch.downloadingRateLimited(peer.peerId);
if (delayMs > 0) {
this.logger.debug("Batch download rate limited, backing off", {
id: this.logId,
...batch.getMetadata(),
peer: prettyPrintPeerIdStr(peer.peerId),
delayMs,
});
await new Promise((r) => setTimeout(r, delayMs));
batch.endCoolDown();
} else {
this.logger.debug("Batch download rate limited, max retries exhausted", {
id: this.logId,
...batch.getMetadata(),
peer: prettyPrintPeerIdStr(peer.peerId),
});
}
}

// Important: avoid duplicate error logging and downloadingError() for rate-limited responses.
if (!isRateLimited) {
if (this.syncType === RangeSyncType.Finalized) {
// For finalized sync, we are stricter with peers as there is no ambiguity about which chain we're syncing.
// The below cases indicate the peer may be on a different chain, so are not penalized during head sync.
switch (errCode) {
case BlockInputErrorCode.MISMATCHED_ROOT_HEX:
case DownloadByRangeErrorCode.MISSING_BLOBS:
case DownloadByRangeErrorCode.EXTRA_BLOBS:
case DownloadByRangeErrorCode.MISSING_COLUMNS:
case DownloadByRangeErrorCode.EXTRA_COLUMNS:
case BlobSidecarErrorCode.INCORRECT_SIDECAR_COUNT:
case BlobSidecarErrorCode.INCORRECT_BLOCK:
case DataColumnSidecarErrorCode.INCORRECT_SIDECAR_COUNT:
case DataColumnSidecarErrorCode.INCORRECT_BLOCK:
this.reportPeer(peer.peerId, PeerAction.LowToleranceError, res.err.message);
}
}
switch (errCode) {
case BlockInputErrorCode.MISMATCHED_ROOT_HEX:
case DownloadByRangeErrorCode.MISSING_BLOBS:
case DownloadByRangeErrorCode.EXTRA_BLOBS:
case DownloadByRangeErrorCode.MISSING_COLUMNS:
case DownloadByRangeErrorCode.EXTRA_COLUMNS:
case BlobSidecarErrorCode.INCORRECT_SIDECAR_COUNT:
case BlobSidecarErrorCode.INCORRECT_BLOCK:
case DataColumnSidecarErrorCode.INCORRECT_SIDECAR_COUNT:
case DataColumnSidecarErrorCode.INCORRECT_BLOCK:
case DownloadByRangeErrorCode.EXTRA_BLOCKS:
case DownloadByRangeErrorCode.OUT_OF_ORDER_BLOCKS:
case DownloadByRangeErrorCode.OUT_OF_RANGE_BLOCKS:
case DownloadByRangeErrorCode.PARENT_ROOT_MISMATCH:
case BlobSidecarErrorCode.INCLUSION_PROOF_INVALID:
case BlobSidecarErrorCode.INVALID_KZG_PROOF_BATCH:
case DataColumnSidecarErrorCode.INCORRECT_KZG_COMMITMENTS_COUNT:
case DataColumnSidecarErrorCode.INCORRECT_KZG_PROOF_COUNT:
case DataColumnSidecarErrorCode.INVALID_KZG_PROOF_BATCH:
case DataColumnSidecarErrorCode.INCLUSION_PROOF_INVALID:
this.reportPeer(peer.peerId, PeerAction.LowToleranceError, res.err.message);
}
this.logger.verbose(
"Batch download error",
{id: this.logId, ...batch.getMetadata(), peer: prettyPrintPeerIdStr(peer.peerId)},
res.err
);
batch.downloadingError(peer.peerId); // Throws after MAX_DOWNLOAD_ATTEMPTS
}
switch (errCode) {
case DownloadByRangeErrorCode.EXTRA_BLOCKS:
case DownloadByRangeErrorCode.OUT_OF_ORDER_BLOCKS:
case DownloadByRangeErrorCode.OUT_OF_RANGE_BLOCKS:
case DownloadByRangeErrorCode.PARENT_ROOT_MISMATCH:
case BlobSidecarErrorCode.INCLUSION_PROOF_INVALID:
case BlobSidecarErrorCode.INVALID_KZG_PROOF_BATCH:
case DataColumnSidecarErrorCode.INCORRECT_KZG_COMMITMENTS_COUNT:
case DataColumnSidecarErrorCode.INCORRECT_KZG_PROOF_COUNT:
case DataColumnSidecarErrorCode.INVALID_KZG_PROOF_BATCH:
case DataColumnSidecarErrorCode.INCLUSION_PROOF_INVALID:
this.reportPeer(peer.peerId, PeerAction.LowToleranceError, res.err.message);
}
this.logger.verbose(
"Batch download error",
{id: this.logId, ...batch.getMetadata(), peer: prettyPrintPeerIdStr(peer.peerId)},
res.err
);
batch.downloadingError(peer.peerId); // Throws after MAX_DOWNLOAD_ATTEMPTS
} else {
this.logger.verbose("Batch download success", {
id: this.logId,
Expand Down
4 changes: 3 additions & 1 deletion packages/beacon-node/src/sync/range/utils/batches.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {Batch, BatchStatus} from "../batch.js";
/**
* Validates that the status and ordering of batches is valid
* ```
* [AwaitingValidation]* [Processing]? [AwaitingDownload,Downloading,AwaitingProcessing]*
* [AwaitingValidation]* [Processing]? [AwaitingDownload,Downloading,RateLimited,AwaitingProcessing]*
* ```
*/
export function validateBatchesStatus(batches: Batch[]): void {
Expand All @@ -29,6 +29,7 @@ export function validateBatchesStatus(batches: Batch[]): void {

case BatchStatus.AwaitingDownload:
case BatchStatus.Downloading:
case BatchStatus.RateLimited:
case BatchStatus.AwaitingProcessing:
preProcessing++;
break;
Expand Down Expand Up @@ -56,6 +57,7 @@ export function getNextBatchToProcess(batches: Batch[]): Batch | null {
// There MUST be no AwaitingProcessing state after AwaitingDownload, Downloading, Processing
case BatchStatus.AwaitingDownload:
case BatchStatus.Downloading:
case BatchStatus.RateLimited:
case BatchStatus.Processing:
return null;
}
Expand Down
18 changes: 18 additions & 0 deletions packages/beacon-node/src/sync/utils/downloadByRange.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {ChainForkConfig} from "@lodestar/config";
import {ForkPostDeneb, ForkPostFulu, ForkPreFulu, isForkPostFulu} from "@lodestar/params";
import {RequestErrorCode} from "@lodestar/reqresp";
import {SignedBeaconBlock, Slot, deneb, fulu, phase0} from "@lodestar/types";
import {LodestarError, Logger, byteArrayEquals, fromHex, prettyPrintIndices, toRootHex} from "@lodestar/utils";
import {
Expand Down Expand Up @@ -207,6 +208,12 @@ export async function downloadByRange({
columnsRequest,
});
} catch (err) {
const errCode = (err as LodestarError<{code: string}>).type?.code ?? (err as {code?: string}).code;
// Let rate-limit errors propagate with original reqresp code so chain.ts
// can detect them via isRateLimitRequestError without code remapping.
if (isRateLimitRequestError(errCode)) {
throw err;
}
throw new DownloadByRangeError({
code: DownloadByRangeErrorCode.REQ_RESP_ERROR,
reason: (err as Error).message,
Expand Down Expand Up @@ -842,6 +849,17 @@ function requestsLogMeta({blocksRequest, blobsRequest, columnsRequest}: Download
return logMeta;
}

/**
* Check whether a reqresp error code indicates the request was rate-limited.
*/
export function isRateLimitRequestError(code: string | undefined): boolean {
return (
code === RequestErrorCode.REQUEST_RATE_LIMITED ||
code === RequestErrorCode.REQUEST_SELF_RATE_LIMITED ||
code === RequestErrorCode.RESP_RATE_LIMITED
);
}

export enum DownloadByRangeErrorCode {
MISSING_BLOCKS_RESPONSE = "DOWNLOAD_BY_RANGE_ERROR_MISSING_BLOCK_RESPONSE",
MISSING_BLOBS_RESPONSE = "DOWNLOAD_BY_RANGE_ERROR_MISSING_BLOBS_RESPONSE",
Expand Down
Loading
Loading