-
-
Notifications
You must be signed in to change notification settings - Fork 438
Expand file tree
/
Copy patharchiveBlocks.ts
More file actions
479 lines (430 loc) · 18.5 KB
/
archiveBlocks.ts
File metadata and controls
479 lines (430 loc) · 18.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
import path from "node:path";
import {ChainForkConfig} from "@lodestar/config";
import {KeyValue} from "@lodestar/db";
import {IForkChoice} from "@lodestar/fork-choice";
import {ForkSeq, SLOTS_PER_EPOCH} from "@lodestar/params";
import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {Epoch, RootHex, Slot} from "@lodestar/types";
import {Logger, fromAsync, fromHex, prettyPrintIndices, toRootHex} from "@lodestar/utils";
import {IBeaconDb} from "../../../db/index.js";
import {BlockArchiveBatchPutBinaryItem} from "../../../db/repositories/index.js";
import {ensureDir, writeIfNotExist} from "../../../util/file.js";
import {BlockRootHex} from "../../../util/sszBytes.js";
import {LightClientServer} from "../../lightClient/index.js";
// Process in chunks to avoid OOM
// this number of blocks per chunk is tested in e2e test blockArchive.test.ts
// TODO: Review after merge since the size of blocks will increase significantly
const BLOCK_BATCH_SIZE = 256;
const BLOB_SIDECAR_BATCH_SIZE = 32;
type BlockRootSlot = {slot: Slot; root: Uint8Array};
type CheckpointHex = {epoch: Epoch; rootHex: RootHex};
/**
* Persist orphaned block to disk
*/
async function persistOrphanedBlock(
slot: Slot,
blockRoot: BlockRootHex,
bytes: Uint8Array,
opts: {
persistOrphanedBlocksDir: string;
}
) {
const dirpath = path.join(opts.persistOrphanedBlocksDir ?? "orphaned_blocks");
const filepath = path.join(dirpath, `${slot}_${blockRoot}.ssz`);
await ensureDir(dirpath);
await writeIfNotExist(filepath, bytes);
}
/**
* Archives finalized blocks from active bucket to archive bucket.
*
* Only archive blocks on the same chain to the finalized checkpoint.
* Each run should move all finalized blocks to blockArhive db to make it consistent
* to stateArchive, so that the node always work well when we restart.
* Note that the finalized block still stay in forkchoice to check finalize checkpoint of next onBlock calls,
* the next run should not reprocess finalzied block of this run.
*/
export async function archiveBlocks(
config: ChainForkConfig,
db: IBeaconDb,
forkChoice: IForkChoice,
lightclientServer: LightClientServer | undefined,
logger: Logger,
finalizedCheckpoint: CheckpointHex,
currentEpoch: Epoch,
archiveDataEpochs?: number,
persistOrphanedBlocks?: boolean,
persistOrphanedBlocksDir?: string
): Promise<void> {
// Use fork choice to determine the blocks to archive and delete
// getAllAncestorBlocks response includes the finalized block, so it's also moved to the cold db
const {ancestors: finalizedCanonicalBlocks, nonAncestors: finalizedNonCanonicalBlocks} =
forkChoice.getAllAncestorAndNonAncestorBlocks(finalizedCheckpoint.rootHex);
// NOTE: The finalized block will be exactly the first block of `epoch` or previous
const finalizedPostDeneb = finalizedCheckpoint.epoch >= config.DENEB_FORK_EPOCH;
const finalizedPostFulu = finalizedCheckpoint.epoch >= config.FULU_FORK_EPOCH;
const finalizedCanonicalBlockRoots: BlockRootSlot[] = finalizedCanonicalBlocks.map((block) => ({
slot: block.slot,
root: fromHex(block.blockRoot),
}));
const logCtx = {currentEpoch, finalizedEpoch: finalizedCheckpoint.epoch, finalizedRoot: finalizedCheckpoint.rootHex};
const flatFileStore = db.flatFileStore;
if (finalizedCanonicalBlockRoots.length > 0) {
await migrateBlocksFromHotToColdDb(db, finalizedCanonicalBlockRoots);
logger.verbose("Migrated blocks from hot DB to cold DB", {
...logCtx,
fromSlot: finalizedCanonicalBlockRoots[0].slot,
toSlot: finalizedCanonicalBlockRoots.at(-1)?.slot,
size: finalizedCanonicalBlockRoots.length,
});
// When flat file store is enabled, blobs/columns are already in their final location
// — no migration needed
if (!flatFileStore) {
if (finalizedPostDeneb) {
const migratedEntries = await migrateBlobSidecarsFromHotToColdDb(
config,
db,
finalizedCanonicalBlockRoots,
currentEpoch
);
logger.verbose("Migrated blobSidecars from hot DB to cold DB", {...logCtx, migratedEntries});
}
if (finalizedPostFulu) {
const migratedEntries = await migrateDataColumnSidecarsFromHotToColdDb(
config,
db,
logger,
finalizedCanonicalBlockRoots,
currentEpoch
);
logger.verbose("Migrated dataColumnSidecars from hot DB to cold DB", {...logCtx, migratedEntries});
}
}
}
// deleteNonCanonicalBlocks
// loop through forkchoice single time
const nonCanonicalBlockRoots = finalizedNonCanonicalBlocks.map((summary) => fromHex(summary.blockRoot));
if (nonCanonicalBlockRoots.length > 0) {
if (persistOrphanedBlocks) {
// Persist orphaned blocks to disk before deleting them from hot db
await Promise.all(
nonCanonicalBlockRoots.map(async (root, index) => {
const block = finalizedNonCanonicalBlocks[index];
const blockBytes = await db.block.getBinary(root);
const blockLogCtx = {slot: block.slot, root: block.blockRoot};
if (blockBytes) {
await persistOrphanedBlock(block.slot, block.blockRoot, blockBytes, {
persistOrphanedBlocksDir: persistOrphanedBlocksDir ?? "orphaned_blocks",
});
logger.verbose("Persisted orphaned block", {...logCtx, ...blockLogCtx});
} else {
logger.warn("Tried to persist orphaned block but no block found", {...logCtx, ...blockLogCtx});
}
})
);
}
await db.block.batchDelete(nonCanonicalBlockRoots);
logger.verbose("Deleted non canonical blocks from hot DB", {
...logCtx,
slots: finalizedNonCanonicalBlocks.map((summary) => summary.slot).join(","),
});
if (flatFileStore) {
// Delete non-canonical blobs/columns from flat file store
const items = finalizedNonCanonicalBlocks.map((summary) => ({
slot: summary.slot,
blockRoot: summary.blockRoot,
}));
await flatFileStore.deleteNonCanonical(items);
logger.verbose("Deleted non canonical blobs/columns from flat file store", logCtx);
} else {
if (finalizedPostDeneb) {
await db.blobSidecars.batchDelete(nonCanonicalBlockRoots);
logger.verbose("Deleted non canonical blobSidecars from hot DB", logCtx);
}
if (finalizedPostFulu) {
await db.dataColumnSidecar.deleteMany(nonCanonicalBlockRoots);
logger.verbose("Deleted non canonical dataColumnSidecars from hot DB", logCtx);
}
}
}
// Delete expired blobs
// Keep only `[current_epoch - max(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, archiveDataEpochs)]`
// if archiveDataEpochs set to Infinity do not prune`
if (finalizedPostDeneb) {
if (archiveDataEpochs !== Infinity) {
const blobsArchiveWindow = Math.max(config.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, archiveDataEpochs ?? 0);
const blobSidecarsMinEpoch = currentEpoch - blobsArchiveWindow;
if (blobSidecarsMinEpoch >= config.DENEB_FORK_EPOCH) {
const blobsPruneSlot = computeStartSlotAtEpoch(blobSidecarsMinEpoch);
if (flatFileStore) {
await flatFileStore.pruneBlobsBeforeSlot(blobsPruneSlot);
logger.verbose(`blobSidecars prune (flat file): pruned before slot ${blobsPruneSlot}`, logCtx);
} else {
const slotsToDelete = await db.blobSidecarsArchive.keys({lt: blobsPruneSlot});
if (slotsToDelete.length > 0) {
await db.blobSidecarsArchive.batchDelete(slotsToDelete);
logger.verbose(
`blobSidecars prune: batchDelete range ${slotsToDelete[0]}..${slotsToDelete.at(-1)}`,
logCtx
);
} else {
logger.verbose(`blobSidecars prune: no entries before epoch ${blobSidecarsMinEpoch}`, logCtx);
}
}
}
} else {
logger.verbose("blobSidecars pruning skipped: archiveDataEpochs set to Infinity", logCtx);
}
}
// Delete expired data column sidecars
// Keep only `[current_epoch - max(MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS, archiveDataEpochs)]`
if (finalizedPostFulu) {
if (archiveDataEpochs !== Infinity) {
const dataColumnSidecarsArchiveWindow = Math.max(
config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS,
archiveDataEpochs ?? 0
);
const dataColumnSidecarsMinEpoch = currentEpoch - dataColumnSidecarsArchiveWindow;
if (dataColumnSidecarsMinEpoch >= config.FULU_FORK_EPOCH) {
const columnsPruneSlot = computeStartSlotAtEpoch(dataColumnSidecarsMinEpoch);
if (flatFileStore) {
await flatFileStore.pruneColumnsBeforeSlot(columnsPruneSlot);
logger.verbose(`dataColumnSidecars prune (flat file): pruned before slot ${columnsPruneSlot}`, logCtx);
} else {
const prefixedKeys = await db.dataColumnSidecarArchive.keys({
// The `id` value `0` refers to the column index. So we want to fetch all sidecars less than zero column of `dataColumnSidecarsMinEpoch`
lt: {prefix: columnsPruneSlot, id: 0},
});
// for each slot there could be multiple dataColumnSidecar, so we need to deduplicate it
const slotsToDelete = [...new Set(prefixedKeys.map(({prefix}) => prefix))].sort((a, b) => a - b);
if (slotsToDelete.length > 0) {
await db.dataColumnSidecarArchive.deleteMany(slotsToDelete);
logger.verbose("dataColumnSidecars prune", {
...logCtx,
slotRange: prettyPrintIndices(slotsToDelete),
numOfSlots: slotsToDelete.length,
totalNumOfSidecars: prefixedKeys.length,
});
} else {
logger.verbose(`dataColumnSidecars prune: no entries before epoch ${dataColumnSidecarsMinEpoch}`, logCtx);
}
}
} else {
logger.verbose(
`dataColumnSidecars pruning skipped: ${dataColumnSidecarsMinEpoch} is before fulu fork epoch ${config.FULU_FORK_EPOCH}`,
logCtx
);
}
} else {
logger.verbose("dataColumnSidecars pruning skipped: archiveDataEpochs set to Infinity", logCtx);
}
}
// Prunning potential checkpoint data
const finalizedCanonicalNonCheckpointBlocks = getNonCheckpointBlocks(finalizedCanonicalBlockRoots);
const nonCheckpointBlockRoots: Uint8Array[] = [...nonCanonicalBlockRoots];
for (const block of finalizedCanonicalNonCheckpointBlocks) {
nonCheckpointBlockRoots.push(block.root);
}
if (lightclientServer) {
await lightclientServer.pruneNonCheckpointData(nonCheckpointBlockRoots);
}
logger.verbose("Archiving of finalized blocks complete", {
...logCtx,
totalArchived: finalizedCanonicalBlocks.length,
});
}
async function migrateBlocksFromHotToColdDb(db: IBeaconDb, blocks: BlockRootSlot[]): Promise<void> {
// Start from `i=0`: 1st block in iterateAncestorBlocks() is the finalized block itself
// we move it to blockArchive but forkchoice still have it to check next onBlock calls
// the next iterateAncestorBlocks call does not return this block
for (let i = 0; i < blocks.length; i += BLOCK_BATCH_SIZE) {
const toIdx = Math.min(i + BLOCK_BATCH_SIZE, blocks.length);
const canonicalBlocks = blocks.slice(i, toIdx);
// processCanonicalBlocks
if (canonicalBlocks.length === 0) return;
// load Buffer instead of SignedBeaconBlock to improve performance
const canonicalBlockEntries: BlockArchiveBatchPutBinaryItem[] = await Promise.all(
canonicalBlocks.map(async (block) => {
// Here we assume the blocks are already in the hot db
const blockBuffer = await db.block.getBinary(block.root);
if (!blockBuffer) {
throw Error(`Block not found for slot ${block.slot} root ${toRootHex(block.root)}`);
}
return {
key: block.slot,
value: blockBuffer,
slot: block.slot,
blockRoot: block.root,
// TODO: Benchmark if faster to slice Buffer or fromHex()
parentRoot: getParentRootFromSignedBlock(blockBuffer),
};
})
);
// put to blockArchive db and delete block db
await Promise.all([
db.blockArchive.batchPutBinary(canonicalBlockEntries),
db.block.batchDelete(canonicalBlocks.map((block) => block.root)),
]);
}
}
/**
* Migrate blobSidecars from hot db to cold db.
* @returns true if we do that, false if block is out of range data.
*/
async function migrateBlobSidecarsFromHotToColdDb(
config: ChainForkConfig,
db: IBeaconDb,
blocks: BlockRootSlot[],
currentEpoch: Epoch
): Promise<number> {
let migratedWrappedBlobSidecars = 0;
for (let i = 0; i < blocks.length; i += BLOB_SIDECAR_BATCH_SIZE) {
const toIdx = Math.min(i + BLOB_SIDECAR_BATCH_SIZE, blocks.length);
const canonicalBlocks = blocks.slice(i, toIdx);
// processCanonicalBlocks
if (canonicalBlocks.length === 0) break;
// load Buffer instead of ssz deserialized to improve performance
const canonicalBlobSidecarsEntries: KeyValue<Slot, Uint8Array>[] = await Promise.all(
canonicalBlocks
.filter((block) => {
const blockSlot = block.slot;
const blockEpoch = computeEpochAtSlot(blockSlot);
const forkSeq = config.getForkSeq(blockSlot);
return (
forkSeq >= ForkSeq.deneb &&
forkSeq < ForkSeq.fulu &&
// if block is out of ${config.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS}, skip this step
blockEpoch >= currentEpoch - config.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
);
})
.map(async (block) => {
// Here we assume the blob sidecars are already in the hot db
// instead of checking first the block input cache
const bytes = await db.blobSidecars.getBinary(block.root);
if (!bytes) {
throw Error(`No blobSidecars found for slot ${block.slot} root ${toRootHex(block.root)}`);
}
return {key: block.slot, value: bytes};
})
);
// put to blockArchive db and delete block db
await Promise.all([
db.blobSidecarsArchive.batchPutBinary(canonicalBlobSidecarsEntries),
db.blobSidecars.batchDelete(canonicalBlocks.map((block) => block.root)),
]);
migratedWrappedBlobSidecars += canonicalBlobSidecarsEntries.length;
}
return migratedWrappedBlobSidecars;
}
// TODO: This function can be simplified further by reducing layers of promises in a loop
async function migrateDataColumnSidecarsFromHotToColdDb(
config: ChainForkConfig,
db: IBeaconDb,
logger: Logger,
blocks: BlockRootSlot[],
currentEpoch: Epoch
): Promise<number> {
let migratedWrappedDataColumns = 0;
for (let i = 0; i < blocks.length; i += BLOB_SIDECAR_BATCH_SIZE) {
const toIdx = Math.min(i + BLOB_SIDECAR_BATCH_SIZE, blocks.length);
const canonicalBlocks = blocks.slice(i, toIdx);
// processCanonicalBlocks
if (canonicalBlocks.length === 0) break;
const promises = [];
// load Buffer instead of ssz deserialized to improve performance
for (const block of canonicalBlocks) {
const blockSlot = block.slot;
const blockEpoch = computeEpochAtSlot(blockSlot);
if (
config.getForkSeq(blockSlot) < ForkSeq.fulu ||
// if block is out of ${config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS}, skip this step
blockEpoch < currentEpoch - config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS
) {
continue;
}
// Here we assume the data column sidecars are already in the hot db
const dataColumnSidecarBytes = await fromAsync(db.dataColumnSidecar.valuesStreamBinary(block.root));
// there could be 0 dataColumnSidecarBytes if block has no blob
logger.verbose("migrateDataColumnSidecarsFromHotToColdDb", {
currentEpoch,
slot: block.slot,
root: toRootHex(block.root),
numSidecars: dataColumnSidecarBytes.length,
});
promises.push(
db.dataColumnSidecarArchive.putManyBinary(
block.slot,
dataColumnSidecarBytes.map((p) => ({key: p.id, value: p.value}))
)
);
migratedWrappedDataColumns += dataColumnSidecarBytes.length;
}
promises.push(db.dataColumnSidecar.deleteMany(canonicalBlocks.map((block) => block.root)));
// put to blockArchive db and delete block db
await Promise.all(promises);
}
return migratedWrappedDataColumns;
}
/**
* ```
* class SignedBeaconBlock(Container):
* message: BeaconBlock [offset - 4 bytes]
* signature: BLSSignature [fixed - 96 bytes]
*
* class BeaconBlock(Container):
* slot: Slot [fixed - 8 bytes]
* proposer_index: ValidatorIndex [fixed - 8 bytes]
* parent_root: Root [fixed - 32 bytes]
* state_root: Root
* body: BeaconBlockBody
* ```
* From byte: `4 + 96 + 8 + 8 = 116`
* To byte: `116 + 32 = 148`
*/
export function getParentRootFromSignedBlock(bytes: Uint8Array): Uint8Array {
return bytes.slice(116, 148);
}
/**
*
* @param blocks sequence of linear blocks, from child to ancestor.
* In ProtoArray.getAllAncestorNodes child nodes are pushed first to the returned array.
*/
export function getNonCheckpointBlocks<T extends {slot: Slot}>(blocks: T[]): T[] {
// Iterate from lowest child to highest ancestor
// Look for the checkpoint of the lowest epoch
// If block at `epoch * SLOTS_PER_EPOCH`, it's a checkpoint.
// - Then for the previous epoch all blocks but the 0 are NOT checkpoints
// - Otherwise for the previous epoch the last block is a checkpoint
if (blocks.length < 1) {
return [];
}
const nonCheckpointBlocks: T[] = [];
// Start with Infinity to always trigger `blockEpoch < epochPtr` in the first loop
let epochPtr = Infinity;
// Assume worst case, since it's unknown if a future epoch will skip the first slot or not.
// This function must return only blocks that are guaranteed to never become checkpoints.
let epochPtrHasFirstSlot = false;
// blocks order: from child to ancestor, decreasing slot
for (let i = 0; i < blocks.length; i++) {
let isCheckpoint = false;
const block = blocks[i];
const blockEpoch = computeEpochAtSlot(block.slot);
if (blockEpoch < epochPtr) {
// If future epoch has skipped the first slot, the last block in the previous epoch is a checkpoint
if (!epochPtrHasFirstSlot) {
isCheckpoint = true;
}
// Reset epoch pointer
epochPtr = blockEpoch;
epochPtrHasFirstSlot = false;
}
// The block in the first slot of an epoch is always a checkpoint slot
if (block.slot % SLOTS_PER_EPOCH === 0) {
epochPtrHasFirstSlot = true;
isCheckpoint = true;
}
if (!isCheckpoint) {
nonCheckpointBlocks.push(block);
}
}
return nonCheckpointBlocks;
}