Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/beacon-node/src/sync/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,21 @@ export const MAX_CONCURRENT_REQUESTS = 2;
* so having this constant too big is a waste of resources and peers may rate limit us.
*/
export const MAX_LOOK_AHEAD_EPOCHS = 2;

/**
* Maximum number of consecutive rate-limited download attempts per batch before giving up.
* Rate-limited attempts are tracked separately from regular download failures since they
* indicate the peer is healthy but throttling us — retrying with backoff is appropriate.
*/
export const MAX_RATE_LIMITED_RETRIES = 3;

/**
* Initial delay in milliseconds before retrying a rate-limited batch download.
* Doubles on each consecutive rate-limited attempt up to {@link RATE_LIMITED_MAX_DELAY_MS}.
*/
export const RATE_LIMITED_INITIAL_DELAY_MS = 50;

/**
* Maximum backoff delay in milliseconds for rate-limited batch retries.
*/
export const RATE_LIMITED_MAX_DELAY_MS = 200;
61 changes: 59 additions & 2 deletions packages/beacon-node/src/sync/range/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ import {PeerSyncMeta} from "../../network/peers/peersData.js";
import {IClock} from "../../util/clock.js";
import {CustodyConfig} from "../../util/dataColumns.js";
import {PeerIdStr} from "../../util/peerId.js";
import {MAX_BATCH_DOWNLOAD_ATTEMPTS, MAX_BATCH_PROCESSING_ATTEMPTS} from "../constants.js";
import {
MAX_BATCH_DOWNLOAD_ATTEMPTS,
MAX_BATCH_PROCESSING_ATTEMPTS,
MAX_RATE_LIMITED_RETRIES,
RATE_LIMITED_INITIAL_DELAY_MS,
RATE_LIMITED_MAX_DELAY_MS,
} from "../constants.js";
import {DownloadByRangeRequests} from "../utils/downloadByRange.js";
import {getBatchSlotRange, hashBlocks} from "./utils/index.js";

Expand All @@ -22,6 +28,8 @@ export enum BatchStatus {
AwaitingDownload = "AwaitingDownload",
/** The batch is being downloaded. */
Downloading = "Downloading",
/** The batch download was rate-limited and is waiting for cooldown before retrying. */
RateLimited = "RateLimited",
/** The batch has been completely downloaded and is ready for processing. */
AwaitingProcessing = "AwaitingProcessing",
/** The batch is being processed. */
Expand Down Expand Up @@ -56,6 +64,7 @@ export type DownloadSuccessState = {
export type BatchState =
| AwaitingDownloadState
| {status: BatchStatus.Downloading; peer: PeerIdStr; blocks: IBlockInput[]}
| {status: BatchStatus.RateLimited; blocks: IBlockInput[]}
| DownloadSuccessState
| {status: BatchStatus.Processing; blocks: IBlockInput[]; attempt: Attempt}
| {status: BatchStatus.AwaitingValidation; blocks: IBlockInput[]; attempt: Attempt};
Expand Down Expand Up @@ -94,6 +103,8 @@ export class Batch {
readonly executionErrorAttempts: Attempt[] = [];
/** The number of download retries this batch has undergone due to a failed request. */
private readonly failedDownloadAttempts: PeerIdStr[] = [];
/** Peers that rate-limited us during download attempts. Reset when a download attempt does not fail due to rate limiting (e.g. on successful downloads or non-rate-limit errors). Used by peerBalancer to prefer alternative peers. */
readonly rateLimitedPeers: PeerIdStr[] = [];
private readonly config: ChainForkConfig;
private readonly clock: IClock;
private readonly custodyConfig: CustodyConfig;
Expand Down Expand Up @@ -252,7 +263,11 @@ export class Batch {
* Gives a list of peers from which this batch has had a failed download or processing attempt.
*/
getFailedPeers(): PeerIdStr[] {
return [...this.failedDownloadAttempts, ...this.failedProcessingAttempts.flatMap((a) => a.peers)];
return [
...this.failedDownloadAttempts,
...this.failedProcessingAttempts.flatMap((a) => a.peers),
...this.rateLimitedPeers,
];
}

getMetadata(): BatchMetadata {
Expand Down Expand Up @@ -285,6 +300,7 @@ export class Batch {
// ensure that blocks are always sorted before getting stored on the batch.state or being used to getRequests
blocks.sort((a, b) => a.slot - b.slot);

this.rateLimitedPeers.length = 0; // Reset rate-limit streak on successful download
this.goodPeers.push(peer);

let allComplete = true;
Expand Down Expand Up @@ -323,6 +339,7 @@ export class Batch {
throw new BatchError(this.wrongStatusErrorType(BatchStatus.Downloading));
}

this.rateLimitedPeers.length = 0; // Reset rate-limit streak on a non-rate-limit error
this.failedDownloadAttempts.push(peer);
if (this.failedDownloadAttempts.length > MAX_BATCH_DOWNLOAD_ATTEMPTS) {
throw new BatchError(this.errorType({code: BatchErrorCode.MAX_DOWNLOAD_ATTEMPTS}));
Expand All @@ -331,6 +348,46 @@ export class Batch {
this.state = {status: BatchStatus.AwaitingDownload, blocks: this.state.blocks};
}

/**
* Downloading -> RateLimited (with cooldown) or falls through to regular error
*
* Returns the cooldown delay in ms. If 0, max retries exhausted and state has been
* transitioned via downloadingError() — caller should NOT call endCoolDown().
*/
downloadingRateLimited(peer: PeerIdStr): number {
if (this.state.status !== BatchStatus.Downloading) {
throw new BatchError(this.wrongStatusErrorType(BatchStatus.Downloading));
}

if (this.rateLimitedPeers.length >= MAX_RATE_LIMITED_RETRIES) {
// After exhausting rate-limit retries, fall through to regular download error handling.
// downloadingError() will reset the rate-limit counter and handle attempt tracking.
this.downloadingError(peer);
return 0;
}

this.rateLimitedPeers.push(peer);

const delayMs = Math.min(
RATE_LIMITED_INITIAL_DELAY_MS * 2 ** (this.rateLimitedPeers.length - 1),
RATE_LIMITED_MAX_DELAY_MS
);

this.state = {status: BatchStatus.RateLimited, blocks: this.state.blocks};
return delayMs;
}

/**
* RateLimited -> AwaitingDownload
*/
endCoolDown(): void {
if (this.state.status !== BatchStatus.RateLimited) {
throw new BatchError(this.wrongStatusErrorType(BatchStatus.RateLimited));
}

this.state = {status: BatchStatus.AwaitingDownload, blocks: this.state.blocks};
}

/**
* AwaitingProcessing -> Processing
*/
Expand Down
107 changes: 72 additions & 35 deletions packages/beacon-node/src/sync/range/chain.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {ChainForkConfig} from "@lodestar/config";
import {Epoch, Root, Slot} from "@lodestar/types";
import {ErrorAborted, LodestarError, Logger, toRootHex} from "@lodestar/utils";
import {ErrorAborted, LodestarError, Logger, sleep, toRootHex} from "@lodestar/utils";
import {isBlockInputBlobs, isBlockInputColumns} from "../../chain/blocks/blockInput/blockInput.js";
import {BlockInputErrorCode} from "../../chain/blocks/blockInput/errors.js";
import {IBlockInput} from "../../chain/blocks/blockInput/types.js";
Expand All @@ -15,7 +15,7 @@ import {ItTrigger} from "../../util/itTrigger.js";
import {PeerIdStr} from "../../util/peerId.js";
import {WarnResult, wrapError} from "../../util/wrapError.js";
import {BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH, MAX_LOOK_AHEAD_EPOCHS} from "../constants.js";
import {DownloadByRangeError, DownloadByRangeErrorCode} from "../utils/downloadByRange.js";
import {DownloadByRangeError, DownloadByRangeErrorCode, isRateLimitRequestError} from "../utils/downloadByRange.js";
import {RangeSyncType} from "../utils/remoteSyncType.js";
import {Batch, BatchError, BatchErrorCode, BatchMetadata, BatchStatus} from "./batch.js";
import {
Expand Down Expand Up @@ -420,7 +420,11 @@ export class SyncChain {
// Note: Don't count batches in the AwaitingValidation state, to prevent stalling sync
// if the current processing window is contained in a long range of skip slots.
const batchesInBuffer = batches.filter((batch) => {
return batch.state.status === BatchStatus.Downloading || batch.state.status === BatchStatus.AwaitingProcessing;
return (
batch.state.status === BatchStatus.Downloading ||
batch.state.status === BatchStatus.RateLimited ||
batch.state.status === BatchStatus.AwaitingProcessing
);
});
if (batchesInBuffer.length > BATCH_BUFFER_SIZE) {
return null;
Expand Down Expand Up @@ -471,43 +475,76 @@ export class SyncChain {

if (res.err) {
// There's several known error cases where we want to take action on the peer
const errCode = (res.err as LodestarError<{code: string}>).type?.code;
const errCode = (res.err as LodestarError<{code: string}>).type?.code ?? (res.err as {code?: string}).code;
this.metrics?.syncRange.downloadByRange.error.inc({client: peer.client, code: errCode ?? "UNKNOWN"});
if (this.syncType === RangeSyncType.Finalized) {
// For finalized sync, we are stricter with peers as there is no ambiguity about which chain we're syncing.
// The below cases indicate the peer may be on a different chain, so are not penalized during head sync.

// Rate-limited responses are handled with backoff rather than peer penalties.
// The peer is healthy but throttling us — penalizing it would make things worse.
const isRateLimited = isRateLimitRequestError(errCode);
if (isRateLimited) {
const delayMs = batch.downloadingRateLimited(peer.peerId);
if (delayMs > 0) {
const uniqueRateLimitedPeers = [...new Set(batch.rateLimitedPeers)];
this.logger.debug("Batch download rate limited", {
id: this.logId,
...batch.getMetadata(),
peer: prettyPrintPeerIdStr(peer.peerId),
rateLimitedPeers: uniqueRateLimitedPeers.map((peerId) => prettyPrintPeerIdStr(peerId)).join(", "),
delayMs,
});
// Wait for cooldown before transitioning back to AwaitingDownload so triggerBatchDownloader can select
// a different peer. Rate-limited peers are tracked in getFailedPeers(),
// so peerBalancer will prefer alternative peers. If no alternative is available
// the backoff delay is applied before retrying with the same peer pool.
await sleep(delayMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delay needs to go into requestBatches I think. Likely the peer selection, with filtering of rate limited peers, will happen in bestPeerToRetryBatch and/or idlePeerForBatch but I leave that up to you to implement. You also need to figure out how to get the delay time into that context so if no other peers are available the batch waits before proceeding. I'm guessing you do not want to return delayMs from downloadingRateLimited but instead store it in the Batch so that its available to the peerBalancer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You’re right that this needs to be enforced at scheduling boundaries, but the effect is equivalent with the current layout: on rate-limit we now set BatchStatus.RateLimited, apply await sleep(delayMs) there, then batch.endCoolDown() transitions back to AwaitingDownload. requestBatches() only picks up that batch after it exits rate-limited state and peer selection still goes through bestPeerToRetryBatch/idlePeerForBatch, which already deprioritizes recent rateLimitedPeers from getFailedPeers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented this in ded88f4423 by moving cooldown timing out of sendBatch and into the request/retry flow.

What changed

  • Batch now stores cooldown deadline (rateLimitCoolDownUntilMs) and exposes getRateLimitCoolDownRemainingMs(peer).
  • sendBatch no longer awaits sleep for rate-limit responses. It just marks rate-limited state + schedules a downloader wakeup.
  • requestBatches() now handles both AwaitingDownload and RateLimited batches:
    • asks peerBalancer for best retry peer,
    • if selected peer is still in cooldown, skips and reschedules wakeup,
    • if cooldown elapsed (or selected peer is not in rate-limited set), transitions with endCoolDown() and retries immediately.
  • peerBalancer now accepts RateLimited batches for peer selection, so alternative peers can be chosen without blocking the send path.
  • Added cleanup for cooldown timers on batch removal / chain end / chain removal.

Validation run:

  • pnpm lint
  • pnpm test:unit .../sync/range/batch.test.ts .../sync/range/utils/peerBalancer.test.ts .../sync/range/chain.test.ts
  • pnpm check-types

batch.endCoolDown();
} else {
this.logger.debug("Batch download rate limited, max retries exhausted", {
id: this.logId,
...batch.getMetadata(),
peer: prettyPrintPeerIdStr(peer.peerId),
});
}
}

// Important: avoid duplicate error logging and downloadingError() for rate-limited responses.
if (!isRateLimited) {
if (this.syncType === RangeSyncType.Finalized) {
// For finalized sync, we are stricter with peers as there is no ambiguity about which chain we're syncing.
// The below cases indicate the peer may be on a different chain, so are not penalized during head sync.
switch (errCode) {
case BlockInputErrorCode.MISMATCHED_ROOT_HEX:
case DownloadByRangeErrorCode.MISSING_BLOBS:
case DownloadByRangeErrorCode.EXTRA_BLOBS:
case DownloadByRangeErrorCode.MISSING_COLUMNS:
case DownloadByRangeErrorCode.EXTRA_COLUMNS:
case BlobSidecarErrorCode.INCORRECT_SIDECAR_COUNT:
case BlobSidecarErrorCode.INCORRECT_BLOCK:
case DataColumnSidecarErrorCode.INCORRECT_SIDECAR_COUNT:
case DataColumnSidecarErrorCode.INCORRECT_BLOCK:
this.reportPeer(peer.peerId, PeerAction.LowToleranceError, res.err.message);
}
}
switch (errCode) {
case BlockInputErrorCode.MISMATCHED_ROOT_HEX:
case DownloadByRangeErrorCode.MISSING_BLOBS:
case DownloadByRangeErrorCode.EXTRA_BLOBS:
case DownloadByRangeErrorCode.MISSING_COLUMNS:
case DownloadByRangeErrorCode.EXTRA_COLUMNS:
case BlobSidecarErrorCode.INCORRECT_SIDECAR_COUNT:
case BlobSidecarErrorCode.INCORRECT_BLOCK:
case DataColumnSidecarErrorCode.INCORRECT_SIDECAR_COUNT:
case DataColumnSidecarErrorCode.INCORRECT_BLOCK:
case DownloadByRangeErrorCode.EXTRA_BLOCKS:
case DownloadByRangeErrorCode.OUT_OF_ORDER_BLOCKS:
case DownloadByRangeErrorCode.OUT_OF_RANGE_BLOCKS:
case DownloadByRangeErrorCode.PARENT_ROOT_MISMATCH:
case BlobSidecarErrorCode.INCLUSION_PROOF_INVALID:
case BlobSidecarErrorCode.INVALID_KZG_PROOF_BATCH:
case DataColumnSidecarErrorCode.INCORRECT_KZG_COMMITMENTS_COUNT:
case DataColumnSidecarErrorCode.INCORRECT_KZG_PROOF_COUNT:
case DataColumnSidecarErrorCode.INVALID_KZG_PROOF_BATCH:
case DataColumnSidecarErrorCode.INCLUSION_PROOF_INVALID:
this.reportPeer(peer.peerId, PeerAction.LowToleranceError, res.err.message);
}
this.logger.verbose(
"Batch download error",
{id: this.logId, ...batch.getMetadata(), peer: prettyPrintPeerIdStr(peer.peerId)},
res.err
);
batch.downloadingError(peer.peerId); // Throws after MAX_DOWNLOAD_ATTEMPTS
}
switch (errCode) {
case DownloadByRangeErrorCode.EXTRA_BLOCKS:
case DownloadByRangeErrorCode.OUT_OF_ORDER_BLOCKS:
case DownloadByRangeErrorCode.OUT_OF_RANGE_BLOCKS:
case DownloadByRangeErrorCode.PARENT_ROOT_MISMATCH:
case BlobSidecarErrorCode.INCLUSION_PROOF_INVALID:
case BlobSidecarErrorCode.INVALID_KZG_PROOF_BATCH:
case DataColumnSidecarErrorCode.INCORRECT_KZG_COMMITMENTS_COUNT:
case DataColumnSidecarErrorCode.INCORRECT_KZG_PROOF_COUNT:
case DataColumnSidecarErrorCode.INVALID_KZG_PROOF_BATCH:
case DataColumnSidecarErrorCode.INCLUSION_PROOF_INVALID:
this.reportPeer(peer.peerId, PeerAction.LowToleranceError, res.err.message);
}
this.logger.verbose(
"Batch download error",
{id: this.logId, ...batch.getMetadata(), peer: prettyPrintPeerIdStr(peer.peerId)},
res.err
);
batch.downloadingError(peer.peerId); // Throws after MAX_DOWNLOAD_ATTEMPTS
} else {
this.logger.verbose("Batch download success", {
id: this.logId,
Expand Down
4 changes: 3 additions & 1 deletion packages/beacon-node/src/sync/range/utils/batches.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {Batch, BatchStatus} from "../batch.js";
/**
* Validates that the status and ordering of batches is valid
* ```
* [AwaitingValidation]* [Processing]? [AwaitingDownload,Downloading,AwaitingProcessing]*
* [AwaitingValidation]* [Processing]? [AwaitingDownload,Downloading,RateLimited,AwaitingProcessing]*
* ```
*/
export function validateBatchesStatus(batches: Batch[]): void {
Expand All @@ -29,6 +29,7 @@ export function validateBatchesStatus(batches: Batch[]): void {

case BatchStatus.AwaitingDownload:
case BatchStatus.Downloading:
case BatchStatus.RateLimited:
case BatchStatus.AwaitingProcessing:
preProcessing++;
break;
Expand Down Expand Up @@ -56,6 +57,7 @@ export function getNextBatchToProcess(batches: Batch[]): Batch | null {
// There MUST be no AwaitingProcessing state after AwaitingDownload, Downloading, Processing
case BatchStatus.AwaitingDownload:
case BatchStatus.Downloading:
case BatchStatus.RateLimited:
case BatchStatus.Processing:
return null;
}
Expand Down
18 changes: 18 additions & 0 deletions packages/beacon-node/src/sync/utils/downloadByRange.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {ChainForkConfig} from "@lodestar/config";
import {ForkPostDeneb, ForkPostFulu, ForkPreFulu, isForkPostFulu} from "@lodestar/params";
import {RequestErrorCode} from "@lodestar/reqresp";
import {SignedBeaconBlock, Slot, deneb, fulu, phase0} from "@lodestar/types";
import {LodestarError, Logger, byteArrayEquals, fromHex, prettyPrintIndices, toRootHex} from "@lodestar/utils";
import {
Expand Down Expand Up @@ -207,6 +208,12 @@ export async function downloadByRange({
columnsRequest,
});
} catch (err) {
const errCode = (err as LodestarError<{code: string}>).type?.code ?? (err as {code?: string}).code;
// Let rate-limit errors propagate with original reqresp code so chain.ts
// can detect them via isRateLimitRequestError without code remapping.
if (isRateLimitRequestError(errCode)) {
throw err;
}
throw new DownloadByRangeError({
code: DownloadByRangeErrorCode.REQ_RESP_ERROR,
reason: (err as Error).message,
Expand Down Expand Up @@ -842,6 +849,17 @@ function requestsLogMeta({blocksRequest, blobsRequest, columnsRequest}: Download
return logMeta;
}

/**
* Check whether a reqresp error code indicates the request was rate-limited.
*/
export function isRateLimitRequestError(code: string | undefined): boolean {
return (
code === RequestErrorCode.REQUEST_RATE_LIMITED ||
code === RequestErrorCode.REQUEST_SELF_RATE_LIMITED ||
code === RequestErrorCode.RESP_RATE_LIMITED
);
}

export enum DownloadByRangeErrorCode {
MISSING_BLOCKS_RESPONSE = "DOWNLOAD_BY_RANGE_ERROR_MISSING_BLOCK_RESPONSE",
MISSING_BLOBS_RESPONSE = "DOWNLOAD_BY_RANGE_ERROR_MISSING_BLOBS_RESPONSE",
Expand Down
Loading