Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
808 changes: 808 additions & 0 deletions docs/design-flat-file-storage.md

Large diffs are not rendered by default.

123 changes: 77 additions & 46 deletions packages/beacon-node/src/chain/archiveStore/utils/archiveBlocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ export async function archiveBlocks(

const logCtx = {currentEpoch, finalizedEpoch: finalizedCheckpoint.epoch, finalizedRoot: finalizedCheckpoint.rootHex};

const flatFileStore = db.flatFileStore;

if (finalizedCanonicalBlockRoots.length > 0) {
await migrateBlocksFromHotToColdDb(db, finalizedCanonicalBlockRoots);
logger.verbose("Migrated blocks from hot DB to cold DB", {
Expand All @@ -84,25 +86,29 @@ export async function archiveBlocks(
size: finalizedCanonicalBlockRoots.length,
});

if (finalizedPostDeneb) {
const migratedEntries = await migrateBlobSidecarsFromHotToColdDb(
config,
db,
finalizedCanonicalBlockRoots,
currentEpoch
);
logger.verbose("Migrated blobSidecars from hot DB to cold DB", {...logCtx, migratedEntries});
}
// When flat file store is enabled, blobs/columns are already in their final location
// — no migration needed
if (!flatFileStore) {
if (finalizedPostDeneb) {
const migratedEntries = await migrateBlobSidecarsFromHotToColdDb(
config,
db,
finalizedCanonicalBlockRoots,
currentEpoch
);
logger.verbose("Migrated blobSidecars from hot DB to cold DB", {...logCtx, migratedEntries});
}

if (finalizedPostFulu) {
const migratedEntries = await migrateDataColumnSidecarsFromHotToColdDb(
config,
db,
logger,
finalizedCanonicalBlockRoots,
currentEpoch
);
logger.verbose("Migrated dataColumnSidecars from hot DB to cold DB", {...logCtx, migratedEntries});
if (finalizedPostFulu) {
const migratedEntries = await migrateDataColumnSidecarsFromHotToColdDb(
config,
db,
logger,
finalizedCanonicalBlockRoots,
currentEpoch
);
logger.verbose("Migrated dataColumnSidecars from hot DB to cold DB", {...logCtx, migratedEntries});
}
}
}

Expand Down Expand Up @@ -136,14 +142,24 @@ export async function archiveBlocks(
slots: finalizedNonCanonicalBlocks.map((summary) => summary.slot).join(","),
});

if (finalizedPostDeneb) {
await db.blobSidecars.batchDelete(nonCanonicalBlockRoots);
logger.verbose("Deleted non canonical blobSidecars from hot DB", logCtx);
}
if (flatFileStore) {
// Delete non-canonical blobs/columns from flat file store
const items = finalizedNonCanonicalBlocks.map((summary) => ({
slot: summary.slot,
blockRoot: summary.blockRoot,
}));
await flatFileStore.deleteNonCanonical(items);
logger.verbose("Deleted non canonical blobs/columns from flat file store", logCtx);
} else {
if (finalizedPostDeneb) {
await db.blobSidecars.batchDelete(nonCanonicalBlockRoots);
logger.verbose("Deleted non canonical blobSidecars from hot DB", logCtx);
}

if (finalizedPostFulu) {
await db.dataColumnSidecar.deleteMany(nonCanonicalBlockRoots);
logger.verbose("Deleted non canonical dataColumnSidecars from hot DB", logCtx);
if (finalizedPostFulu) {
await db.dataColumnSidecar.deleteMany(nonCanonicalBlockRoots);
logger.verbose("Deleted non canonical dataColumnSidecars from hot DB", logCtx);
}
}
}

Expand All @@ -155,12 +171,21 @@ export async function archiveBlocks(
const blobsArchiveWindow = Math.max(config.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, archiveDataEpochs ?? 0);
const blobSidecarsMinEpoch = currentEpoch - blobsArchiveWindow;
if (blobSidecarsMinEpoch >= config.DENEB_FORK_EPOCH) {
const slotsToDelete = await db.blobSidecarsArchive.keys({lt: computeStartSlotAtEpoch(blobSidecarsMinEpoch)});
if (slotsToDelete.length > 0) {
await db.blobSidecarsArchive.batchDelete(slotsToDelete);
logger.verbose(`blobSidecars prune: batchDelete range ${slotsToDelete[0]}..${slotsToDelete.at(-1)}`, logCtx);
const blobsPruneSlot = computeStartSlotAtEpoch(blobSidecarsMinEpoch);
if (flatFileStore) {
await flatFileStore.pruneBlobsBeforeSlot(blobsPruneSlot);
logger.verbose(`blobSidecars prune (flat file): pruned before slot ${blobsPruneSlot}`, logCtx);
} else {
logger.verbose(`blobSidecars prune: no entries before epoch ${blobSidecarsMinEpoch}`, logCtx);
const slotsToDelete = await db.blobSidecarsArchive.keys({lt: blobsPruneSlot});
if (slotsToDelete.length > 0) {
await db.blobSidecarsArchive.batchDelete(slotsToDelete);
logger.verbose(
`blobSidecars prune: batchDelete range ${slotsToDelete[0]}..${slotsToDelete.at(-1)}`,
logCtx
);
} else {
logger.verbose(`blobSidecars prune: no entries before epoch ${blobSidecarsMinEpoch}`, logCtx);
}
}
}
} else {
Expand All @@ -178,23 +203,29 @@ export async function archiveBlocks(
);
const dataColumnSidecarsMinEpoch = currentEpoch - dataColumnSidecarsArchiveWindow;
if (dataColumnSidecarsMinEpoch >= config.FULU_FORK_EPOCH) {
const prefixedKeys = await db.dataColumnSidecarArchive.keys({
// The `id` value `0` refers to the column index. So we want to fetch all sidecars less than zero column of `dataColumnSidecarsMinEpoch`
lt: {prefix: computeStartSlotAtEpoch(dataColumnSidecarsMinEpoch), id: 0},
});
// for each slot there could be multiple dataColumnSidecar, so we need to deduplicate it
const slotsToDelete = [...new Set(prefixedKeys.map(({prefix}) => prefix))].sort((a, b) => a - b);

if (slotsToDelete.length > 0) {
await db.dataColumnSidecarArchive.deleteMany(slotsToDelete);
logger.verbose("dataColumnSidecars prune", {
...logCtx,
slotRange: prettyPrintIndices(slotsToDelete),
numOfSlots: slotsToDelete.length,
totalNumOfSidecars: prefixedKeys.length,
});
const columnsPruneSlot = computeStartSlotAtEpoch(dataColumnSidecarsMinEpoch);
if (flatFileStore) {
await flatFileStore.pruneColumnsBeforeSlot(columnsPruneSlot);
logger.verbose(`dataColumnSidecars prune (flat file): pruned before slot ${columnsPruneSlot}`, logCtx);
} else {
logger.verbose(`dataColumnSidecars prune: no entries before epoch ${dataColumnSidecarsMinEpoch}`, logCtx);
const prefixedKeys = await db.dataColumnSidecarArchive.keys({
// The `id` value `0` refers to the column index. So we want to fetch all sidecars less than zero column of `dataColumnSidecarsMinEpoch`
lt: {prefix: columnsPruneSlot, id: 0},
});
// for each slot there could be multiple dataColumnSidecar, so we need to deduplicate it
const slotsToDelete = [...new Set(prefixedKeys.map(({prefix}) => prefix))].sort((a, b) => a - b);

if (slotsToDelete.length > 0) {
await db.dataColumnSidecarArchive.deleteMany(slotsToDelete);
logger.verbose("dataColumnSidecars prune", {
...logCtx,
slotRange: prettyPrintIndices(slotsToDelete),
numOfSlots: slotsToDelete.length,
totalNumOfSidecars: prefixedKeys.length,
});
} else {
logger.verbose(`dataColumnSidecars prune: no entries before epoch ${dataColumnSidecarsMinEpoch}`, logCtx);
}
}
} else {
logger.verbose(
Expand Down
44 changes: 31 additions & 13 deletions packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {fulu} from "@lodestar/types";
import {fulu, ssz} from "@lodestar/types";
import {prettyPrintIndices, toRootHex} from "@lodestar/utils";
import {blobSidecarsWrapperSsz} from "../../db/repositories/blobSidecars.js";
import {BeaconChain} from "../chain.js";
import {IBlockInput, isBlockInputBlobs, isBlockInputColumns} from "./blockInput/index.js";
import {BLOB_AVAILABILITY_TIMEOUT} from "./verifyBlocksDataAvailability.js";
Expand Down Expand Up @@ -69,19 +70,31 @@ export async function writeBlockInputToDb(this: BeaconChain, blocksInputs: IBloc
);
}

const binaryPuts = [];
const nonbinaryPuts = [];
for (const dataColumnSidecar of dataColumnSidecars) {
// skip reserializing column if we already have it
const serialized = this.serializedCache.get(dataColumnSidecar);
if (serialized) {
binaryPuts.push({key: dataColumnSidecar.index, value: serialized});
} else {
nonbinaryPuts.push(dataColumnSidecar);
if (this.db.flatFileStore) {
const binaryColumns: {index: number; data: Uint8Array}[] = [];
for (const dataColumnSidecar of dataColumnSidecars) {
const serialized = this.serializedCache.get(dataColumnSidecar);
binaryColumns.push({
index: dataColumnSidecar.index,
data: serialized ?? ssz.fulu.DataColumnSidecar.serialize(dataColumnSidecar),
});
}
fnPromises.push(this.db.flatFileStore.putDataColumnsBinary(slot, blockRootHex, binaryColumns));
} else {
const binaryPuts = [];
const nonbinaryPuts = [];
for (const dataColumnSidecar of dataColumnSidecars) {
// skip reserializing column if we already have it
const serialized = this.serializedCache.get(dataColumnSidecar);
if (serialized) {
binaryPuts.push({key: dataColumnSidecar.index, value: serialized});
} else {
nonbinaryPuts.push(dataColumnSidecar);
}
}
fnPromises.push(this.db.dataColumnSidecar.putManyBinary(blockRoot, binaryPuts));
fnPromises.push(this.db.dataColumnSidecar.putMany(blockRoot, nonbinaryPuts));
}
fnPromises.push(this.db.dataColumnSidecar.putManyBinary(blockRoot, binaryPuts));
fnPromises.push(this.db.dataColumnSidecar.putMany(blockRoot, nonbinaryPuts));
this.logger.debug("Persisted dataColumnSidecars to hot DB", {
slot: block.message.slot,
root: blockRootHex,
Expand All @@ -91,7 +104,12 @@ export async function writeBlockInputToDb(this: BeaconChain, blocksInputs: IBloc
});
} else if (isBlockInputBlobs(blockInput)) {
const blobSidecars = blockInput.getBlobs();
fnPromises.push(this.db.blobSidecars.add({blockRoot, slot: block.message.slot, blobSidecars}));
if (this.db.flatFileStore) {
const wrapperBytes = blobSidecarsWrapperSsz.serialize({blockRoot, slot: block.message.slot, blobSidecars});
fnPromises.push(this.db.flatFileStore.putBlobSidecars(block.message.slot, blockRootHex, wrapperBytes));
} else {
fnPromises.push(this.db.blobSidecars.add({blockRoot, slot: block.message.slot, blobSidecars}));
}
this.logger.debug("Persisted blobSidecars to hot DB", {
blobsLen: blobSidecars.length,
slot: block.message.slot,
Expand Down
17 changes: 17 additions & 0 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,10 @@ export class BeaconChain implements IBeaconChain {
}
return blockInput.getBlobs();
}
if (this.db.flatFileStore) {
const wrapper = await this.db.flatFileStore.getBlobSidecars(blockSlot, blockRootHex);
return wrapper?.blobSidecars ?? null;
}
const unfinalizedBlobSidecars = (await this.db.blobSidecars.get(fromHex(blockRootHex)))?.blobSidecars ?? null;
if (unfinalizedBlobSidecars) {
return unfinalizedBlobSidecars;
Expand All @@ -797,6 +801,13 @@ export class BeaconChain implements IBeaconChain {
}
return ssz.deneb.BlobSidecars.serialize(blockInput.getBlobs());
}
if (this.db.flatFileStore) {
const wrapper = await this.db.flatFileStore.getBlobSidecarsBinary(blockSlot, blockRootHex);
if (wrapper) {
return wrapper.slice(BLOB_SIDECARS_IN_WRAPPER_INDEX);
}
return null;
Comment on lines +817 to +820

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Add LevelDB fallback when flat file lookup misses

When flatFileStore is enabled, this path returns early and never falls back to blobSidecars/blobSidecarsArchive (and the same pattern is used for data columns), so upgraded nodes with pre-existing sidecars in LevelDB but no flat-file copies will suddenly serve null for historical data. Because flatFileStorage is now default-on, this creates an upgrade regression unless a migration or fallback read path is added.

Useful? React with 👍 / 👎.

}
const unfinalizedBlobSidecarsWrapper = await this.db.blobSidecars.getBinary(fromHex(blockRootHex));
if (unfinalizedBlobSidecarsWrapper) {
return unfinalizedBlobSidecarsWrapper.slice(BLOB_SIDECARS_IN_WRAPPER_INDEX);
Expand All @@ -816,6 +827,9 @@ export class BeaconChain implements IBeaconChain {
}
return blockInput.getAllColumns();
}
if (this.db.flatFileStore) {
return this.db.flatFileStore.getDataColumns(blockSlot, blockRootHex);
}
const sidecarsUnfinalized = await this.db.dataColumnSidecar.values(fromHex(blockRootHex));
if (sidecarsUnfinalized.length > 0) {
return sidecarsUnfinalized;
Expand Down Expand Up @@ -846,6 +860,9 @@ export class BeaconChain implements IBeaconChain {
return ssz.fulu.DataColumnSidecar.serialize(sidecar);
});
}
if (this.db.flatFileStore) {
return this.db.flatFileStore.getDataColumnsBinary(blockSlot, blockRootHex, indices);
}
const sidecarsUnfinalized = await this.db.dataColumnSidecar.getManyBinary(fromHex(blockRootHex), indices);
if (sidecarsUnfinalized.some((sidecar) => sidecar != null)) {
return sidecarsUnfinalized;
Expand Down
3 changes: 3 additions & 0 deletions packages/beacon-node/src/chain/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ export type IChainOptions = BlockProcessOpts &
minSameMessageSignatureSetsToBatch: number;
archiveDateEpochs?: number;
nHistoricalStatesFileDataStore?: boolean;
/** Use flat file storage for blobs and data columns instead of LevelDB */
flatFileStorage?: boolean;
};

export type BlockProcessOpts = {
Expand Down Expand Up @@ -123,6 +125,7 @@ export const defaultChainOptions: IChainOptions = {
// - users can prune the persisted checkpoint state files manually to save disc space
// - it helps debug easier when network is unfinalized
nHistoricalStatesFileDataStore: true,
flatFileStorage: true,
maxBlockStates: DEFAULT_MAX_BLOCK_STATES,
maxCPStateEpochsInMemory: DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY,
maxCPStateEpochsOnDisk: DEFAULT_MAX_CP_STATE_ON_DISK,
Expand Down
24 changes: 21 additions & 3 deletions packages/beacon-node/src/db/beacon.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import {ChainForkConfig} from "@lodestar/config";
import {Db, LevelDbControllerMetrics, encodeKey} from "@lodestar/db";
import {Logger} from "@lodestar/utils";
import {Bucket} from "./buckets.js";
import {FlatFileStore} from "./flatFileStore/flatFileStore.js";
import type {IFlatFileStore} from "./flatFileStore/interface.js";
import {IBeaconDb} from "./interface.js";
import {CheckpointStateRepository} from "./repositories/checkpointState.js";
import {
Expand Down Expand Up @@ -52,6 +55,8 @@ export class BeaconDb implements IBeaconDb {

backfilledRanges: BackfilledRanges;

flatFileStore: IFlatFileStore | null = null;

constructor(
config: ChainForkConfig,
protected readonly db: Db
Expand Down Expand Up @@ -81,7 +86,16 @@ export class BeaconDb implements IBeaconDb {
this.backfilledRanges = new BackfilledRanges(config, db);
}

close(): Promise<void> {
async initFlatFileStore(dataDir: string, logger: Logger): Promise<void> {
const store = new FlatFileStore(dataDir, logger);
await store.init();
this.flatFileStore = store;
}

async close(): Promise<void> {
if (this.flatFileStore) {
await this.flatFileStore.close();
}
return this.db.close();
}

Expand All @@ -90,8 +104,12 @@ export class BeaconDb implements IBeaconDb {
}

async pruneHotDb(): Promise<void> {
// Prune all hot blobs
await this.blobSidecars.batchDelete(await this.blobSidecars.keys());
if (this.flatFileStore) {
await this.flatFileStore.pruneHotBlobs();
} else {
// Prune all hot blobs
await this.blobSidecars.batchDelete(await this.blobSidecars.keys());
}
// Prune all hot blocks
// TODO: Enable once it's deemed safe
// await this.block.batchDelete(await this.block.keys());
Expand Down
Loading
Loading