Skip to content

Commit 96dfd77

Browse files
committed
refactor: staged writes in private events
Fourth part of the series started with #19445. This makes the PrivateEventStore work based on staged writes. With this, private events aren't written to persistent storage until PXE decides to commit the job.
1 parent eea7fc1 commit 96dfd77

File tree

9 files changed

+778
-316
lines changed

9 files changed

+778
-316
lines changed

yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra
409409
),
410410
);
411411

412-
const eventService = new EventService(this.anchorBlockStore, this.aztecNode, this.privateEventStore);
412+
const eventService = new EventService(this.anchorBlockStore, this.aztecNode, this.privateEventStore, this.jobId);
413413
const eventStorePromises = eventValidationRequests.map(request =>
414414
eventService.storeEvent(
415415
request.contractAddress,

yarn-project/pxe/src/events/event_service.test.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,15 @@ describe('storeEvent', () => {
8080

8181
aztecNode.getTxEffect.mockImplementation(() => Promise.resolve(indexedTxEffect));
8282

83-
eventService = new EventService(anchorBlockStore, aztecNode, privateEventStore);
83+
eventService = new EventService(anchorBlockStore, aztecNode, privateEventStore, 'test');
8484
});
8585

86-
function runStoreEvent(
86+
async function runStoreEvent(
8787
overrides: {
8888
eventCommitment?: Fr;
8989
} = {},
9090
) {
91-
return eventService.storeEvent(
91+
await eventService.storeEvent(
9292
contractAddress,
9393
eventSelector,
9494
randomness,
@@ -97,6 +97,8 @@ describe('storeEvent', () => {
9797
txEffect.txHash,
9898
recipient,
9999
);
100+
101+
await privateEventStore.commit('test');
100102
}
101103

102104
it('should throw when tx does not exist or has no effects', async () => {

yarn-project/pxe/src/events/event_service.ts

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export class EventService {
1313
private readonly anchorBlockStore: AnchorBlockStore,
1414
private readonly aztecNode: AztecNode,
1515
private readonly privateEventStore: PrivateEventStore,
16+
private readonly jobId: string,
1617
) {}
1718

1819
public async storeEvent(
@@ -52,14 +53,21 @@ export class EventService {
5253
);
5354
}
5455

55-
return this.privateEventStore.storePrivateEventLog(selector, randomness, content, siloedEventCommitment, {
56-
contractAddress,
57-
scope,
58-
txHash,
59-
l2BlockNumber: txEffect.l2BlockNumber,
60-
l2BlockHash: txEffect.l2BlockHash,
61-
txIndexInBlock: txEffect.txIndexInBlock,
62-
eventIndexInTx,
63-
});
56+
return this.privateEventStore.storePrivateEventLog(
57+
selector,
58+
randomness,
59+
content,
60+
siloedEventCommitment,
61+
{
62+
contractAddress,
63+
scope,
64+
txHash,
65+
l2BlockNumber: txEffect.l2BlockNumber,
66+
l2BlockHash: txEffect.l2BlockHash,
67+
txIndexInBlock: txEffect.txIndexInBlock,
68+
eventIndexInTx,
69+
},
70+
this.jobId,
71+
);
6472
}
6573
}

yarn-project/pxe/src/events/private_event_filter_validator.test.ts

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,17 @@ describe('PrivateEventFilterValidator', () => {
2525
anchorBlockStore = mock<AnchorBlockStore>();
2626

2727
anchorBlockStore.getBlockHeader.mockResolvedValue(lastKnownBlock);
28-
validator = new PrivateEventFilterValidator(anchorBlockStore);
28+
validator = new PrivateEventFilterValidator(lastKnownBlockNumber);
2929
});
3030

31-
it('rejects empty scope', async () => {
32-
await expect(validator.validate({ contractAddress, fromBlock: lastKnownBlockNumber, scopes: [] })).rejects.toThrow(
31+
it('rejects empty scope', () => {
32+
expect(() => validator.validate({ contractAddress, fromBlock: lastKnownBlockNumber, scopes: [] })).toThrow(
3333
/At least one scope is required to get private events/,
3434
);
3535
});
3636

37-
it('defaults to whole range', async () => {
38-
const dataProviderFilter = await validator.validate({ contractAddress, scopes: [scope] });
37+
it('defaults to whole range', () => {
38+
const dataProviderFilter = validator.validate({ contractAddress, scopes: [scope] });
3939
expect(dataProviderFilter).toEqual({
4040
contractAddress,
4141
scopes: [scope],
@@ -45,8 +45,8 @@ describe('PrivateEventFilterValidator', () => {
4545
});
4646
});
4747

48-
it('toBlock defaults to lastKnownBlock + 1', async () => {
49-
const dataProviderFilter = await validator.validate({
48+
it('toBlock defaults to lastKnownBlock + 1', () => {
49+
const dataProviderFilter = validator.validate({
5050
contractAddress,
5151
scopes: [scope],
5252
fromBlock: INITIAL_L2_BLOCK_NUM,
@@ -59,8 +59,8 @@ describe('PrivateEventFilterValidator', () => {
5959
});
6060
});
6161

62-
it('toBlock without fromBlock defaults to [INITIAL_L2_BLOCK_NUM, toBlock)', async () => {
63-
const dataProviderFilter = await validator.validate({
62+
it('toBlock without fromBlock defaults to [INITIAL_L2_BLOCK_NUM, toBlock)', () => {
63+
const dataProviderFilter = validator.validate({
6464
contractAddress,
6565
scopes: [scope],
6666
toBlock: BlockNumber(lastKnownBlockNumber + 1),
@@ -73,28 +73,28 @@ describe('PrivateEventFilterValidator', () => {
7373
});
7474
});
7575

76-
it('rejects fromBlock >= toBlock', async () => {
77-
await expect(
76+
it('rejects fromBlock >= toBlock', () => {
77+
expect(() =>
7878
validator.validate({
7979
contractAddress,
8080
scopes: [scope],
8181
fromBlock: lastKnownBlockNumber,
8282
toBlock: lastKnownBlockNumber,
8383
}),
84-
).rejects.toThrow(/toBlock must be strictly greater than fromBlock/);
84+
).toThrow(/toBlock must be strictly greater than fromBlock/);
8585

86-
await expect(
86+
expect(() =>
8787
validator.validate({
8888
contractAddress,
8989
scopes: [scope],
9090
fromBlock: lastKnownBlockNumber,
9191
toBlock: BlockNumber(lastKnownBlockNumber - 1),
9292
}),
93-
).rejects.toThrow(/toBlock must be strictly greater than fromBlock/);
93+
).toThrow(/toBlock must be strictly greater than fromBlock/);
9494
});
9595

96-
it('preserves txHash', async () => {
97-
let dataProviderFilter = await validator.validate({
96+
it('preserves txHash', () => {
97+
let dataProviderFilter = validator.validate({
9898
contractAddress,
9999
scopes: [scope],
100100
fromBlock: INITIAL_L2_BLOCK_NUM,
@@ -108,7 +108,7 @@ describe('PrivateEventFilterValidator', () => {
108108
});
109109

110110
const txHash = TxHash.random();
111-
dataProviderFilter = await validator.validate({
111+
dataProviderFilter = validator.validate({
112112
contractAddress,
113113
scopes: [scope],
114114
fromBlock: INITIAL_L2_BLOCK_NUM,

yarn-project/pxe/src/events/private_event_filter_validator.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,21 @@ import type { PrivateEventFilter } from '@aztec/aztec.js/wallet';
22
import { INITIAL_L2_BLOCK_NUM } from '@aztec/constants';
33
import { BlockNumber } from '@aztec/foundation/branded-types';
44

5-
import { AnchorBlockStore } from '../storage/anchor_block_store/anchor_block_store.js';
65
import type { PrivateEventStoreFilter } from '../storage/private_event_store/private_event_store.js';
76

87
export class PrivateEventFilterValidator {
9-
constructor(private anchorBlockStore: AnchorBlockStore) {}
8+
constructor(private lastBlock: BlockNumber) {}
109

11-
async validate(filter: PrivateEventFilter): Promise<PrivateEventStoreFilter> {
10+
validate(filter: PrivateEventFilter): PrivateEventStoreFilter {
1211
let { fromBlock, toBlock } = filter;
1312

1413
// Block range filters in Aztec Node are defined as closed-open intervals [fromBlock, toBlock), so
1514
// we respect that convention here for consistency.
1615
// We then default to [INITIAL_L2_BLOCK_NUM, latestKnownBlock + 1), ie: by default return events from
1716
// the first block to the latest known block.
1817
if (!fromBlock || !toBlock) {
19-
const lastKnownBlock = (await this.anchorBlockStore.getBlockHeader()).getBlockNumber();
2018
fromBlock = fromBlock ?? BlockNumber(INITIAL_L2_BLOCK_NUM);
21-
toBlock = toBlock ?? BlockNumber(lastKnownBlock + 1);
19+
toBlock = toBlock ?? BlockNumber(this.lastBlock + 1);
2220
}
2321

2422
if (filter.scopes.length === 0) {

yarn-project/pxe/src/pxe.test.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ describe('PXE', () => {
238238
txIndexInBlock: 0,
239239
eventIndexInTx: eventCounter++,
240240
},
241+
'test',
241242
);
242243

243244
return event;
@@ -247,6 +248,7 @@ describe('PXE', () => {
247248
// Store a couple of events to exercise `getPrivateEvents`
248249
const event1 = await storeEvent();
249250
const event2 = await storeEvent();
251+
await privateEventStore.commit('test');
250252

251253
const events = await pxe.getPrivateEvents(eventSelector, {
252254
contractAddress,
@@ -287,6 +289,8 @@ describe('PXE', () => {
287289
storeEvent(lastKnownBlockNumber + 1),
288290
storeEvent(lastKnownBlockNumber + 1),
289291
]);
292+
293+
await privateEventStore.commit('test');
290294
});
291295

292296
it('filters by txHash', async () => {

yarn-project/pxe/src/pxe.ts

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { PrivateEventFilter } from '@aztec/aztec.js/wallet';
2+
import { BlockNumber } from '@aztec/foundation/branded-types';
23
import { Fr } from '@aztec/foundation/curves/bn254';
34
import { type Logger, createLogger } from '@aztec/foundation/log';
45
import { SerialQueue } from '@aztec/foundation/queue';
@@ -157,7 +158,7 @@ export class PXE {
157158
);
158159

159160
const jobCoordinator = new JobCoordinator(store);
160-
jobCoordinator.registerStores([capsuleStore, senderTaggingStore, recipientTaggingStore]);
161+
jobCoordinator.registerStores([capsuleStore, senderTaggingStore, recipientTaggingStore, privateEventStore]);
161162

162163
const debugUtils = new PXEDebugUtils(contractStore, noteStore);
163164

@@ -1009,9 +1010,17 @@ export class PXE {
10091010
* Defaults to the latest known block to PXE + 1.
10101011
* @returns - The packed events with block and tx metadata.
10111012
*/
1012-
public getPrivateEvents(eventSelector: EventSelector, filter: PrivateEventFilter): Promise<PackedPrivateEvent[]> {
1013-
return this.#putInJobQueue(async jobId => {
1013+
public async getPrivateEvents(
1014+
eventSelector: EventSelector,
1015+
filter: PrivateEventFilter,
1016+
): Promise<PackedPrivateEvent[]> {
1017+
let anchorBlockNumber: BlockNumber;
1018+
1019+
await this.#putInJobQueue(async jobId => {
10141020
await this.blockStateSynchronizer.sync();
1021+
1022+
anchorBlockNumber = (await this.anchorBlockStore.getBlockHeader()).getBlockNumber();
1023+
10151024
const contractFunctionSimulator = this.#getSimulatorForTx();
10161025

10171026
await this.contractStore.syncPrivateState(
@@ -1020,15 +1029,16 @@ export class PXE {
10201029
async privateSyncCall =>
10211030
await this.#simulateUtility(contractFunctionSimulator, privateSyncCall, [], undefined, jobId),
10221031
);
1032+
});
10231033

1024-
const sanitizedFilter = await new PrivateEventFilterValidator(this.anchorBlockStore).validate(filter);
1034+
const sanitizedFilter = new PrivateEventFilterValidator(anchorBlockNumber!).validate(filter);
10251035

1026-
this.log.debug(
1027-
`Getting private events for ${sanitizedFilter.contractAddress.toString()} from ${sanitizedFilter.fromBlock} to ${sanitizedFilter.toBlock}`,
1028-
);
1036+
this.log.debug(
1037+
`Getting private events for ${sanitizedFilter.contractAddress.toString()} from ${sanitizedFilter.fromBlock} to ${sanitizedFilter.toBlock}`,
1038+
);
10291039

1030-
return this.privateEventStore.getPrivateEvents(eventSelector, sanitizedFilter);
1031-
});
1040+
// sanitizedFilter is assigned during the synchro
1041+
return this.privateEventStore.getPrivateEvents(eventSelector, sanitizedFilter);
10321042
}
10331043

10341044
/**

0 commit comments

Comments
 (0)