Skip to content

Commit aa4a3ae

Browse files
committed
Blob filestores
1 parent 4099eac commit aa4a3ae

File tree

25 files changed

+1702
-50
lines changed

25 files changed

+1702
-50
lines changed

yarn-project/archiver/src/archiver/archiver.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,7 @@ export class Archiver extends (EventEmitter as new () => ArchiverEmitter) implem
882882
this.l1Addresses,
883883
this.instrumentation,
884884
this.log,
885+
!this.initialSyncComplete, // isHistoricalSync
885886
);
886887

887888
if (retrievedCheckpoints.length === 0) {

yarn-project/archiver/src/archiver/l1/data_retrieval.ts

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,17 @@ export async function retrievedToPublishedCheckpoint({
138138

139139
/**
140140
* Fetches new checkpoints.
141+
* @param rollup - The rollup contract instance.
141142
* @param publicClient - The viem public client to use for transaction retrieval.
142143
* @param debugClient - The viem debug client to use for trace/debug RPC methods (optional).
143-
* @param rollupAddress - The address of the rollup contract.
144+
* @param blobSinkClient - The blob sink client for fetching blob data.
144145
* @param searchStartBlock - The block number to use for starting the search.
145146
* @param searchEndBlock - The highest block number that we should search up to.
146-
* @param expectedNextL2BlockNum - The next L2 block number that we expect to find.
147-
* @returns An array of block; as well as the next eth block to search from.
147+
* @param contractAddresses - The contract addresses (governanceProposerAddress, slashFactoryAddress, slashingProposerAddress).
148+
* @param instrumentation - The archiver instrumentation instance.
149+
* @param logger - The logger instance.
150+
* @param isHistoricalSync - Whether this is a historical sync.
151+
* @returns An array of retrieved checkpoints.
148152
*/
149153
export async function retrieveCheckpointsFromRollup(
150154
rollup: GetContractReturnType<typeof RollupAbi, ViemPublicClient>,
@@ -160,6 +164,7 @@ export async function retrieveCheckpointsFromRollup(
160164
},
161165
instrumentation: ArchiverInstrumentation,
162166
logger: Logger = createLogger('archiver'),
167+
isHistoricalSync: boolean = false,
163168
): Promise<RetrievedCheckpoint[]> {
164169
const retrievedCheckpoints: RetrievedCheckpoint[] = [];
165170

@@ -211,6 +216,7 @@ export async function retrieveCheckpointsFromRollup(
211216
contractAddresses,
212217
instrumentation,
213218
logger,
219+
isHistoricalSync,
214220
);
215221
retrievedCheckpoints.push(...newCheckpoints);
216222
searchStartBlock = lastLog.blockNumber! + 1n;
@@ -222,11 +228,17 @@ export async function retrieveCheckpointsFromRollup(
222228

223229
/**
224230
* Processes newly received CheckpointProposed logs.
225-
* @param rollup - The rollup contract
231+
* @param rollup - The rollup contract instance.
226232
* @param publicClient - The viem public client to use for transaction retrieval.
227233
* @param debugClient - The viem debug client to use for trace/debug RPC methods (optional).
234+
* @param blobSinkClient - The blob sink client for fetching blob data.
228235
* @param logs - CheckpointProposed logs.
229-
* @returns - An array of checkpoints.
236+
* @param rollupConstants - The rollup constants (chainId, version, targetCommitteeSize).
237+
* @param contractAddresses - The contract addresses (governanceProposerAddress, slashFactoryAddress, slashingProposerAddress).
238+
* @param instrumentation - The archiver instrumentation instance.
239+
* @param logger - The logger instance.
240+
* @param isHistoricalSync - Whether this is a historical sync.
241+
* @returns An array of retrieved checkpoints.
230242
*/
231243
async function processCheckpointProposedLogs(
232244
rollup: GetContractReturnType<typeof RollupAbi, ViemPublicClient>,
@@ -242,6 +254,7 @@ async function processCheckpointProposedLogs(
242254
},
243255
instrumentation: ArchiverInstrumentation,
244256
logger: Logger,
257+
isHistoricalSync: boolean,
245258
): Promise<RetrievedCheckpoint[]> {
246259
const retrievedCheckpoints: RetrievedCheckpoint[] = [];
247260
const calldataRetriever = new CalldataRetriever(
@@ -272,6 +285,7 @@ async function processCheckpointProposedLogs(
272285
blobHashes,
273286
checkpointNumber,
274287
logger,
288+
isHistoricalSync,
275289
);
276290

277291
const l1: L1PublishedData = {
@@ -309,8 +323,9 @@ export async function getCheckpointBlobDataFromBlobs(
309323
blobHashes: Buffer<ArrayBufferLike>[],
310324
checkpointNumber: CheckpointNumber,
311325
logger: Logger,
326+
isHistoricalSync: boolean,
312327
): Promise<CheckpointBlobData> {
313-
const blobBodies = await blobSinkClient.getBlobSidecar(blockHash, blobHashes);
328+
const blobBodies = await blobSinkClient.getBlobSidecar(blockHash, blobHashes, undefined, { isHistoricalSync });
314329
if (blobBodies.length === 0) {
315330
throw new NoBlobBodiesFoundError(checkpointNumber);
316331
}

yarn-project/aztec-node/src/aztec-node/server.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import { Archiver, createArchiver } from '@aztec/archiver';
22
import { BBCircuitVerifier, QueuedIVCVerifier, TestCircuitVerifier } from '@aztec/bb-prover';
33
import { type BlobSinkClientInterface, createBlobSinkClient } from '@aztec/blob-sink/client';
4+
import {
5+
type BlobFileStoreMetadata,
6+
createReadOnlyFileStoreBlobClients,
7+
createWritableFileStoreBlobClient,
8+
} from '@aztec/blob-sink/filestore';
49
import {
510
ARCHIVE_HEIGHT,
611
INITIAL_L2_BLOCK_NUM,
@@ -197,8 +202,6 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable {
197202
const packageVersion = getPackageVersion() ?? '';
198203
const telemetry = deps.telemetry ?? getTelemetryClient();
199204
const dateProvider = deps.dateProvider ?? new DateProvider();
200-
const blobSinkClient =
201-
deps.blobSinkClient ?? createBlobSinkClient(config, { logger: createLogger('node:blob-sink:client') });
202205
const ethereumChain = createEthereumChain(config.l1RpcUrls, config.l1ChainId);
203206

204207
// Build a key store from file if given or from environment otherwise
@@ -266,6 +269,25 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable {
266269
);
267270
}
268271

272+
const blobFileStoreMetadata: BlobFileStoreMetadata = {
273+
l1ChainId: config.l1ChainId,
274+
rollupVersion: config.rollupVersion,
275+
rollupAddress: config.l1Contracts.rollupAddress.toString(),
276+
};
277+
278+
const [fileStoreClients, fileStoreUploadClient] = await Promise.all([
279+
createReadOnlyFileStoreBlobClients(config.blobFileStoreUrls, blobFileStoreMetadata, log),
280+
createWritableFileStoreBlobClient(config.blobFileStoreUploadUrl, blobFileStoreMetadata, log),
281+
]);
282+
283+
const blobSinkClient =
284+
deps.blobSinkClient ??
285+
createBlobSinkClient(config, {
286+
logger: createLogger('node:blob-sink:client'),
287+
fileStoreClients,
288+
fileStoreUploadClient,
289+
});
290+
269291
// attempt snapshot sync if possible
270292
await trySnapshotSync(config, log);
271293

@@ -331,6 +353,7 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable {
331353
blockSource: archiver,
332354
l1ToL2MessageSource: archiver,
333355
keyStoreManager,
356+
fileStoreBlobUploadClient: fileStoreUploadClient,
334357
});
335358

336359
// If we have a validator client, register it as a source of offenses for the slasher,

yarn-project/blob-sink/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
"./server": "./dest/server/index.js",
1010
"./client": "./dest/client/index.js",
1111
"./encoding": "./dest/encoding/index.js",
12-
"./types": "./dest/types/index.js"
12+
"./types": "./dest/types/index.js",
13+
"./filestore": "./dest/filestore/index.js"
1314
},
1415
"inherits": [
1516
"../package.common.json"

yarn-project/blob-sink/src/client/config.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ export interface BlobSinkConfig extends BlobSinkArchiveApiConfig {
4545
* Whether to allow having no blob sources configured during startup
4646
*/
4747
blobAllowEmptySources?: boolean;
48+
49+
/**
50+
* URLs for reading blobs from filestore (s3://, gs://, file://, https://). Tried in order until blobs are found.
51+
*/
52+
blobFileStoreUrls?: string[];
53+
54+
/**
55+
* URL for uploading blobs to filestore (s3://, gs://, file://)
56+
*/
57+
blobFileStoreUploadUrl?: string;
4858
}
4959

5060
export const blobSinkConfigMapping: ConfigMappingsType<BlobSinkConfig> = {
@@ -84,6 +94,19 @@ export const blobSinkConfigMapping: ConfigMappingsType<BlobSinkConfig> = {
8494
description: 'Whether to allow having no blob sources configured during startup',
8595
...booleanConfigHelper(false),
8696
},
97+
blobFileStoreUrls: {
98+
env: 'BLOB_FILE_STORE_URLS',
99+
description: 'URLs for filestore blob archive, comma-separated. Tried in order until blobs are found.',
100+
parseEnv: (val: string) =>
101+
val
102+
.split(',')
103+
.map(url => url.trim())
104+
.filter(url => url.length > 0),
105+
},
106+
blobFileStoreUploadUrl: {
107+
env: 'BLOB_FILE_STORE_UPLOAD_URL',
108+
description: 'URL for uploading blobs to filestore (s3://, gs://, file://)',
109+
},
87110
...blobSinkArchiveApiConfigMappings,
88111
};
89112

@@ -99,5 +122,10 @@ export function getBlobSinkConfigFromEnv(): BlobSinkConfig {
99122
* Returns whether the given blob sink config has any remote sources defined.
100123
*/
101124
export function hasRemoteBlobSinkSources(config: BlobSinkConfig = {}): boolean {
102-
return !!(config.blobSinkUrl || config.l1ConsensusHostUrls?.length || config.archiveApiUrl);
125+
return !!(
126+
config.blobSinkUrl ||
127+
config.l1ConsensusHostUrls?.length ||
128+
config.archiveApiUrl ||
129+
config.blobFileStoreUrls?.length
130+
);
103131
}

yarn-project/blob-sink/src/client/factory.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,24 @@
11
import { type Logger, createLogger } from '@aztec/foundation/log';
22

33
import { MemoryBlobStore } from '../blobstore/memory_blob_store.js';
4+
import type { FileStoreBlobClient } from '../filestore/filestore_blob_client.js';
45
import { type BlobSinkConfig, hasRemoteBlobSinkSources } from './config.js';
56
import { HttpBlobSinkClient } from './http.js';
67
import type { BlobSinkClientInterface } from './interface.js';
78
import { LocalBlobSinkClient } from './local.js';
89

9-
export function createBlobSinkClient(config?: BlobSinkConfig, deps?: { logger: Logger }): BlobSinkClientInterface {
10+
export interface CreateBlobSinkClientDeps {
11+
logger?: Logger;
12+
/** FileStore clients for reading blobs */
13+
fileStoreClients?: FileStoreBlobClient[];
14+
/** FileStore client for uploading blobs */
15+
fileStoreUploadClient?: FileStoreBlobClient;
16+
}
17+
18+
export function createBlobSinkClient(
19+
config?: BlobSinkConfig,
20+
deps?: CreateBlobSinkClientDeps,
21+
): BlobSinkClientInterface {
1022
const log = deps?.logger ?? createLogger('blob-sink:client');
1123
if (!hasRemoteBlobSinkSources(config)) {
1224
log.info(`Creating local blob sink client.`);
@@ -18,6 +30,12 @@ export function createBlobSinkClient(config?: BlobSinkConfig, deps?: { logger: L
1830
blobSinkUrl: config?.blobSinkUrl,
1931
l1ConsensusHostUrls: config?.l1ConsensusHostUrls,
2032
archiveApiUrl: config?.archiveApiUrl,
33+
fileStoreCount: deps?.fileStoreClients?.length ?? 0,
34+
hasFileStoreUpload: !!deps?.fileStoreUploadClient,
35+
});
36+
return new HttpBlobSinkClient(config, {
37+
logger: log,
38+
fileStoreClients: deps?.fileStoreClients,
39+
fileStoreUploadClient: deps?.fileStoreUploadClient,
2140
});
22-
return new HttpBlobSinkClient(config, deps);
2341
}

0 commit comments

Comments
 (0)