Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions dashboards/lodestar_beacon_chain.json
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,18 @@
"legendFormat": "created_by_blob",
"range": true,
"refId": "F"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "lodestar_seen_block_input_cache_serialized_object_count",
"instant": false,
"legendFormat": "serialized_objects",
"range": true,
"refId": "G"
}
],
"title": "Block Input",
Expand Down
35 changes: 35 additions & 0 deletions packages/beacon-node/src/chain/blocks/blockInput/blockInput.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ abstract class AbstractBlockInput<F extends ForkName = ForkName, TData extends D
}

abstract addBlock(props: AddBlock<F>): void;
abstract getSerializedCacheKeys(): object[];

hasBlock(): boolean {
return this.state.hasBlock;
Expand Down Expand Up @@ -243,6 +244,10 @@ export class BlockInputPreData extends AbstractBlockInput<ForkPreDeneb, null> {
);
}
}

getSerializedCacheKeys(): object[] {
return [this.state.block];
}
}

// Blobs DA
Expand Down Expand Up @@ -526,6 +531,20 @@ export class BlockInputBlobs extends AbstractBlockInput<ForkBlobsDA, deneb.BlobS
getBlobs(): deneb.BlobSidecars {
return this.getAllBlobsWithSource().map(({blobSidecar}) => blobSidecar);
}

getSerializedCacheKeys(): object[] {
const objects: object[] = [];

if (this.state.hasBlock) {
objects.push(this.state.block);
}

for (const {blobSidecar} of this.blobsCache.values()) {
objects.push(blobSidecar);
}

return objects;
}
}

function blockAndBlobArePaired(block: SignedBeaconBlock<ForkBlobsDA>, blobSidecar: deneb.BlobSidecar): boolean {
Expand Down Expand Up @@ -906,6 +925,18 @@ export class BlockInputColumns extends AbstractBlockInput<ForkColumnsDA, fulu.Da
}
return Promise.resolve(this.getSampledColumns());
}

getSerializedCacheKeys(): object[] {
const objects: object[] = [];

if (this.state.hasBlock) {
objects.push(this.state.block);
}

objects.push(...this.getAllColumns());

return objects;
}
}

type BlockInputNoDataState = {
Expand Down Expand Up @@ -967,4 +998,8 @@ export class BlockInputNoData extends AbstractBlockInput<ForkPostGloas, null> {
return (this.state.block.message.body as gloas.BeaconBlockBody).signedExecutionPayloadBid.message
.blobKzgCommitments;
}

getSerializedCacheKeys(): object[] {
return [this.state.block];
}
}
5 changes: 5 additions & 0 deletions packages/beacon-node/src/chain/blocks/blockInput/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ export interface IBlockInput<F extends ForkName = ForkName, TData extends DAData

/** Only safe to call when `hasBlockAndAllData` is true */
getTimeComplete(): number;
/**
* Return object references used as keys in `SerializedCache` that can be safely removed
* once this block input lifecycle has completed.
*/
getSerializedCacheKeys(): object[];

waitForBlock(timeout: number, signal?: AbortSignal): Promise<SignedBeaconBlock<F>>;
waitForAllData(timeout: number, signal?: AbortSignal): Promise<TData>;
Expand Down
19 changes: 1 addition & 18 deletions packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@ import {SignedBeaconBlock} from "@lodestar/types";
import {fromHex, toRootHex} from "@lodestar/utils";
import {getBlobKzgCommitments} from "../../util/dataColumns.js";
import {BeaconChain} from "../chain.js";
import {
IBlockInput,
IDataColumnsInput,
isBlockInputBlobs,
isBlockInputColumns,
isBlockInputNoData,
} from "./blockInput/index.js";
import {IBlockInput, IDataColumnsInput, isBlockInputBlobs, isBlockInputColumns} from "./blockInput/index.js";
import {BLOB_AVAILABILITY_TIMEOUT} from "./verifyBlocksDataAvailability.js";

/**
Expand Down Expand Up @@ -137,17 +131,6 @@ export async function persistBlockInput(this: BeaconChain, blockInput: IBlockInp
})
.finally(() => {
this.seenBlockInputCache.prune(blockInput.blockRootHex);
// Without forcefully clearing this cache, we would rely on WeakMap to evict memory which is not reliable.
// Clear here (after the DB write) so that writeBlockInputToDb can still use the cached serialized bytes.
//
// For Gloas (BlockInputNoData), the execution payload and columns arrive separately after the beacon block.
// Do NOT clear the cache here — it must remain available for writeDataColumnsToDb when the payload arrives.
// The cache is cleared in the Gloas payload persistence path instead.
if (!isBlockInputNoData(blockInput)) {
// TODO: enhance this SerializedCache for Gloas because payload may not come
// see https://github.com/ChainSafe/lodestar/pull/8974#discussion_r2885598229
this.serializedCache.clear();
}
this.logger.debug("Pruned block input", {
slot: blockInput.slot,
root: blockInput.blockRootHex,
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,14 @@ export class BeaconChain implements IBeaconChain {

this.beaconProposerCache = new BeaconProposerCache(opts);
this.checkpointBalancesCache = new CheckpointBalancesCache();
this.serializedCache = new SerializedCache();
this.seenBlockInputCache = new SeenBlockInput({
config,
custodyConfig: this.custodyConfig,
clock,
chainEvents: emitter,
signal,
serializedCache: this.serializedCache,
metrics,
logger,
});
Expand Down Expand Up @@ -412,8 +414,6 @@ export class BeaconChain implements IBeaconChain {
this.bls = bls;
this.emitter = emitter;

this.serializedCache = new SerializedCache();

this.getBlobsTracker = new GetBlobsTracker({
logger,
executionEngine: this.executionEngine,
Expand Down
48 changes: 38 additions & 10 deletions packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {Metrics} from "../../metrics/metrics.js";
import {MAX_LOOK_AHEAD_EPOCHS} from "../../sync/constants.js";
import {IClock} from "../../util/clock.js";
import {CustodyConfig} from "../../util/dataColumns.js";
import {SerializedCache} from "../../util/serializedCache.js";
import {
BlockInput,
BlockInputBlobs,
Expand Down Expand Up @@ -55,6 +56,7 @@ export type SeenBlockInputCacheModules = {
chainEvents: ChainEventEmitter;
signal: AbortSignal;
custodyConfig: CustodyConfig;
serializedCache: SerializedCache;
metrics: Metrics | null;
logger?: Logger;
};
Expand Down Expand Up @@ -101,6 +103,7 @@ export class SeenBlockInput {
private readonly clock: IClock;
private readonly chainEvents: ChainEventEmitter;
private readonly signal: AbortSignal;
private readonly serializedCache: SerializedCache;
private readonly metrics: Metrics | null;
private readonly logger?: Logger;
private blockInputs = new Map<RootHex, IBlockInput>();
Expand All @@ -109,19 +112,35 @@ export class SeenBlockInput {
// and the signature to ensure we only skip verification if both match
private verifiedProposerSignatures = new Map<Slot, Map<RootHex, BLSSignature>>();

constructor({config, custodyConfig, clock, chainEvents, signal, metrics, logger}: SeenBlockInputCacheModules) {
constructor({
config,
custodyConfig,
clock,
chainEvents,
signal,
serializedCache,
metrics,
logger,
}: SeenBlockInputCacheModules) {
this.config = config;
this.custodyConfig = custodyConfig;
this.clock = clock;
this.chainEvents = chainEvents;
this.signal = signal;
this.serializedCache = serializedCache;
this.metrics = metrics;
this.logger = logger;

if (metrics) {
metrics.seenCache.blockInput.blockInputCount.addCollect(() =>
metrics.seenCache.blockInput.blockInputCount.set(this.blockInputs.size)
);
metrics.seenCache.blockInput.blockInputCount.addCollect(() => {
metrics.seenCache.blockInput.blockInputCount.set(this.blockInputs.size);
metrics.seenCache.blockInput.serializedObjectCount.set(
Array.from(this.blockInputs.values()).reduce(
(count, blockInput) => count + blockInput.getSerializedCacheKeys().length,
0
)
);
});
}

this.chainEvents.on(ChainEvent.forkChoiceFinalized, this.onFinalized);
Expand All @@ -142,7 +161,10 @@ export class SeenBlockInput {
* Removes the single BlockInput from the cache
*/
remove(rootHex: RootHex): void {
this.blockInputs.delete(rootHex);
const blockInput = this.blockInputs.get(rootHex);
if (blockInput) {
this.evictBlockInput(blockInput);
}
}

/**
Expand All @@ -154,7 +176,7 @@ export class SeenBlockInput {
let deletedCount = 0;
while (blockInput) {
deletedCount++;
this.blockInputs.delete(blockInput.blockRootHex);
this.evictBlockInput(blockInput);
blockInput = this.blockInputs.get(parentRootHex ?? "");
parentRootHex = blockInput?.parentRootHex;
}
Expand All @@ -165,10 +187,10 @@ export class SeenBlockInput {
onFinalized = (checkpoint: CheckpointWithHex) => {
let deletedCount = 0;
const cutoffSlot = computeStartSlotAtEpoch(checkpoint.epoch);
for (const [rootHex, blockInput] of this.blockInputs) {
for (const [, blockInput] of this.blockInputs) {
if (blockInput.slot < cutoffSlot) {
deletedCount++;
this.blockInputs.delete(rootHex);
this.evictBlockInput(blockInput);
}
}
this.logger?.debug(`BlockInputCache.onFinalized deleted ${deletedCount} cached BlockInputs`);
Expand Down Expand Up @@ -408,14 +430,20 @@ export class SeenBlockInput {

if (itemsToDelete > 0) {
const sorted = [...this.blockInputs.entries()].sort((a, b) => a[1].slot - b[1].slot);
for (const [rootHex] of sorted) {
this.blockInputs.delete(rootHex);
for (const [, blockInput] of sorted) {
this.evictBlockInput(blockInput);
itemsToDelete--;
if (itemsToDelete <= 0) return;
}
}
pruneSetToMax(this.verifiedProposerSignatures, MAX_BLOCK_INPUT_CACHE_SIZE);
}

private evictBlockInput(blockInput: IBlockInput): void {
// Without forcefully clearing this cache, we would rely on WeakMap to evict memory which is not reliable
this.serializedCache.delete(blockInput.getSerializedCacheKeys());
this.blockInputs.delete(blockInput.blockRootHex);
}
}

enum SeenBlockInputCacheErrorCode {
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,10 @@ export function createLodestarMetrics(
name: "lodestar_seen_block_input_cache_size",
help: "Number of cached BlockInputs",
}),
serializedObjectCount: register.gauge({
name: "lodestar_seen_block_input_cache_serialized_object_count",
help: "Number of serialized objects retained by cached BlockInputs",
}),
duplicateBlockCount: register.gauge<{source: BlockInputSource}>({
name: "lodestar_seen_block_input_cache_duplicate_block_count",
help: "Total number of duplicate blocks that pass validation and attempt to be cached but are known",
Expand Down
3 changes: 2 additions & 1 deletion packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,8 @@ export class Network implements INetwork {
this.config.getForkSeq(this.clock.currentSlot) >= ForkSeq.altair ? [Version.V2] : [Version.V2, Version.V1],
request
),
request
request,
this.chain.serializedCache
);
}

Expand Down
12 changes: 7 additions & 5 deletions packages/beacon-node/src/util/serializedCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* This is a thin wrapper around WeakMap
*/
export class SerializedCache {
map: WeakMap<object, Uint8Array> = new WeakMap();
private map: WeakMap<object, Uint8Array> = new WeakMap();

get(obj: object): Uint8Array | undefined {
return this.map.get(obj);
Expand All @@ -15,11 +15,13 @@ export class SerializedCache {
}

/**
* Replace the internal WeakMap to force GC of all cached entries.
* Must only be called after all DB writes that may read from this cache have completed,
* Delete cached serialized entries for the provided object references.
* Must only be called after all DB writes that read from this cache for these objects have completed,
* otherwise cached serialized bytes will be unavailable and data will be re-serialized unnecessarily.
*/
clear(): void {
this.map = new WeakMap();
delete(objs: object[]): void {
for (const obj of objs) {
this.map.delete(obj);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also check if this.map.size === 0 then create a new WeakMap

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't possible, there is no reliable way to get the size of WeakMap, we can only track entries manually but this would be inaccurate since we wouldn't deduct dangling entries that are dropped automatically

there is also no place in the code where we could do the previous clear() and create a new WeakMap but I don't really think this is necessary, if we want more manual control over the stored items we should just use a normal Map

I would rather keep track of how many items the BlockInput's added to SerializedCache and add more explicit clean up there if entries are pruned automatically to make sure we explicitly clean up dangling BlockInput's

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see 427cb03, I coupled the serialized cache pruning more closely to block input so there is less of a chance to have dangling entries

}
}
Loading
Loading