diff --git a/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts b/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts index 1fcd05d35c6b..9b7384e287fa 100644 --- a/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts +++ b/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts @@ -409,7 +409,7 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra ), ); - const eventService = new EventService(this.anchorBlockStore, this.aztecNode, this.privateEventStore); + const eventService = new EventService(this.anchorBlockStore, this.aztecNode, this.privateEventStore, this.jobId); const eventDeliveries = eventValidationRequests.map(request => eventService.deliverEvent( request.contractAddress, diff --git a/yarn-project/pxe/src/events/event_service.test.ts b/yarn-project/pxe/src/events/event_service.test.ts index 98da1a2132e4..fc4959634dd6 100644 --- a/yarn-project/pxe/src/events/event_service.test.ts +++ b/yarn-project/pxe/src/events/event_service.test.ts @@ -91,7 +91,7 @@ describe('deliverEvent', () => { ]), ); - eventService = new EventService(anchorBlockStore, aztecNode, privateEventStore); + eventService = new EventService(anchorBlockStore, aztecNode, privateEventStore, 'test'); }); function runDeliverEvent( @@ -141,12 +141,16 @@ describe('deliverEvent', () => { await runDeliverEvent(); // I should be able to retrieve the private event I just saved using getPrivateEvents - const result = await privateEventStore.getPrivateEvents(eventSelector, { - contractAddress, - fromBlock: blockNumber, - toBlock: blockNumber + 1, - scopes: [recipient], - }); + const result = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: blockNumber, + toBlock: blockNumber + 1, + scopes: [recipient], + }, + 'test', + ); expect(result.length).toEqual(1); expect(result[0].packedEvent).toEqual(eventContent); diff --git a/yarn-project/pxe/src/events/event_service.ts b/yarn-project/pxe/src/events/event_service.ts index 4e71ce70741b..4e990eaf8e23 100644 --- a/yarn-project/pxe/src/events/event_service.ts +++ b/yarn-project/pxe/src/events/event_service.ts @@ -14,6 +14,7 @@ export class EventService { private readonly anchorBlockStore: AnchorBlockStore, private readonly aztecNode: AztecNode, private readonly privateEventStore: PrivateEventStore, + private readonly jobId: string, ) {} public async deliverEvent( @@ -74,6 +75,7 @@ export class EventService { l2BlockNumber: nullifierIndex.l2BlockNumber, // Block number in which the event was emitted l2BlockHash: nullifierIndex.l2BlockHash, // Block hash in which the event was emitted }, + this.jobId, ); } } diff --git a/yarn-project/pxe/src/pxe.test.ts b/yarn-project/pxe/src/pxe.test.ts index 1ead43423678..e3f4bf4d719d 100644 --- a/yarn-project/pxe/src/pxe.test.ts +++ b/yarn-project/pxe/src/pxe.test.ts @@ -220,13 +220,20 @@ describe('PXE', () => { const randomness = Fr.random(); - await privateEventStore.storePrivateEventLog(eventSelector, randomness, event.packedEvent, eventIndex++, { - contractAddress, - scope, - txHash: event.txHash, - l2BlockNumber: event.l2BlockNumber, - l2BlockHash: event.l2BlockHash, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + event.packedEvent, + eventIndex++, + { + contractAddress, + scope, + txHash: event.txHash, + l2BlockNumber: event.l2BlockNumber, + l2BlockHash: event.l2BlockHash, + }, + 'test', + ); return event; } @@ -235,6 +242,7 @@ describe('PXE', () => { // Store a couple of events to exercise `getPrivateEvents` const event1 = await storeEvent(); const event2 = await storeEvent(); + await privateEventStore.commit('test'); const events = await pxe.getPrivateEvents(eventSelector, { contractAddress, @@ -275,6 +283,8 @@ describe('PXE', () => { storeEvent(lastKnownBlockNumber + 1), storeEvent(lastKnownBlockNumber + 1), ]); + + await privateEventStore.commit('test'); }); it('filters by txHash', async () => { diff --git a/yarn-project/pxe/src/pxe.ts b/yarn-project/pxe/src/pxe.ts index 371fd3bf95be..ba6e987194e1 100644 --- a/yarn-project/pxe/src/pxe.ts +++ b/yarn-project/pxe/src/pxe.ts @@ -157,7 +157,7 @@ export class PXE { ); const jobCoordinator = new JobCoordinator(store); - jobCoordinator.registerStores([capsuleStore, senderTaggingStore, recipientTaggingStore]); + jobCoordinator.registerStores([capsuleStore, senderTaggingStore, recipientTaggingStore, privateEventStore]); const debugUtils = new PXEDebugUtils(contractStore, noteStore); @@ -1099,7 +1099,7 @@ export class PXE { `Getting private events for ${sanitizedFilter.contractAddress.toString()} from ${sanitizedFilter.fromBlock} to ${sanitizedFilter.toBlock}`, ); - return this.privateEventStore.getPrivateEvents(eventSelector, sanitizedFilter); + return this.privateEventStore.getPrivateEvents(eventSelector, sanitizedFilter, jobId); }); } diff --git a/yarn-project/pxe/src/storage/private_event_store/private_event_store.test.ts b/yarn-project/pxe/src/storage/private_event_store/private_event_store.test.ts index 4db12f20c395..47fa25f1b9e9 100644 --- a/yarn-project/pxe/src/storage/private_event_store/private_event_store.test.ts +++ b/yarn-project/pxe/src/storage/private_event_store/private_event_store.test.ts @@ -50,37 +50,59 @@ describe('PrivateEventStore', () => { }); it('stores and retrieves private events', async () => { - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent, eventCommitmentIndex, { - contractAddress, - scope, - txHash, - l2BlockNumber, - l2BlockHash, - }); - const events = await privateEventStore.getPrivateEvents(eventSelector, { - contractAddress, - fromBlock: l2BlockNumber, - toBlock: l2BlockNumber + 1, - scopes: [scope], - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent, + eventCommitmentIndex, + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + }, + 'test', + ); + const events = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: l2BlockNumber, + toBlock: l2BlockNumber + 1, + scopes: [scope], + }, + 'test', + ); expect(events).toEqual([expectedEvent]); }); it('ignores duplicate events with same eventCommitmentIndex', async () => { - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent, eventCommitmentIndex, { - contractAddress, - scope, - txHash, - l2BlockNumber, - l2BlockHash, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent, + eventCommitmentIndex, + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + }, + 'test', + ); - const events = await privateEventStore.getPrivateEvents(eventSelector, { - contractAddress, - fromBlock: l2BlockNumber, - toBlock: l2BlockNumber + 1, - scopes: [scope], - }); + const events = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: l2BlockNumber, + toBlock: l2BlockNumber + 1, + scopes: [scope], + }, + 'test', + ); expect(events).toEqual([expectedEvent]); }); @@ -88,27 +110,45 @@ describe('PrivateEventStore', () => { it('allows multiple events with same content but different eventCommitmentIndex', async () => { const otherEventCommitmentIndex = eventCommitmentIndex + 1; - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent, eventCommitmentIndex, { - contractAddress, - scope, - txHash, - l2BlockNumber, - l2BlockHash, - }); - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent, otherEventCommitmentIndex, { - contractAddress, - scope, - txHash, - l2BlockNumber, - l2BlockHash, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent, + eventCommitmentIndex, + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + }, + 'test', + ); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent, + otherEventCommitmentIndex, + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + }, + 'test', + ); - const events = await privateEventStore.getPrivateEvents(eventSelector, { - contractAddress, - fromBlock: l2BlockNumber, - toBlock: l2BlockNumber + 1, - scopes: [scope], - }); + const events = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: l2BlockNumber, + toBlock: l2BlockNumber + 1, + scopes: [scope], + }, + 'test', + ); expect(events).toEqual([expectedEvent, expectedEvent]); }); @@ -120,34 +160,59 @@ describe('PrivateEventStore', () => { l2BlockNumber: BlockNumber(200), }; - await privateEventStore.storePrivateEventLog(eventSelector, randomness, getRandomMsgContent(), 0, { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(100), - l2BlockHash, - }); - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent, 1, { - contractAddress, - scope, - txHash: expectedEvent.txHash, - l2BlockNumber: expectedEvent.l2BlockNumber, - l2BlockHash: expectedEvent.l2BlockHash, - }); - await privateEventStore.storePrivateEventLog(eventSelector, randomness, getRandomMsgContent(), 2, { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(300), - l2BlockHash, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + getRandomMsgContent(), + 0, + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(100), + l2BlockHash, + }, + 'test', + ); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent, + 1, + { + contractAddress, + scope, + txHash: expectedEvent.txHash, + l2BlockNumber: expectedEvent.l2BlockNumber, + l2BlockHash: expectedEvent.l2BlockHash, + }, + 'test', + ); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + getRandomMsgContent(), + 2, + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(300), + l2BlockHash, + }, + 'test', + ); - const events = await privateEventStore.getPrivateEvents(eventSelector, { - contractAddress, - fromBlock: 150, - toBlock: 150 + 100, - scopes: [scope], - }); + const events = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: 150, + toBlock: 150 + 100, + scopes: [scope], + }, + 'test', + ); expect(events).toEqual([expectedEvent]); // Only includes event from block 200 }); @@ -155,38 +220,60 @@ describe('PrivateEventStore', () => { it('filters events by recipient', async () => { const otherScope = await AztecAddress.random(); - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent, eventCommitmentIndex, { - contractAddress, - scope, - txHash, - l2BlockNumber, - l2BlockHash, - }); - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent, eventCommitmentIndex + 1, { - contractAddress, - scope: otherScope, - txHash: TxHash.random(), - l2BlockNumber, - l2BlockHash, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent, + eventCommitmentIndex, + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + }, + 'test', + ); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent, + eventCommitmentIndex + 1, + { + contractAddress, + scope: otherScope, + txHash: TxHash.random(), + l2BlockNumber, + l2BlockHash, + }, + 'test', + ); - const events = await privateEventStore.getPrivateEvents(eventSelector, { - contractAddress, - fromBlock: l2BlockNumber, - toBlock: l2BlockNumber + 1, - scopes: [scope], - }); + const events = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: l2BlockNumber, + toBlock: l2BlockNumber + 1, + scopes: [scope], + }, + 'test', + ); expect(events).toEqual([expectedEvent]); }); it('returns empty array when no events match criteria', async () => { - const events = await privateEventStore.getPrivateEvents(eventSelector, { - contractAddress, - fromBlock: l2BlockNumber, - toBlock: l2BlockNumber + 1, - scopes: [scope], - }); + const events = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: l2BlockNumber, + toBlock: l2BlockNumber + 1, + scopes: [scope], + }, + 'test', + ); expect(events).toEqual([]); }); @@ -203,36 +290,61 @@ describe('PrivateEventStore', () => { }); it('returns events in order by eventCommitmentIndex', async () => { - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent2, 1, { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(200), - l2BlockHash, - }); - - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent1, 0, { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(100), - l2BlockHash, - }); - - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent3, 2, { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(300), - l2BlockHash, - }); - - const events = await privateEventStore.getPrivateEvents(eventSelector, { - contractAddress, - fromBlock: 0, - toBlock: 0 + 1000, - scopes: [scope], - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent2, + 1, + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(200), + l2BlockHash, + }, + 'test', + ); + + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent1, + 0, + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(100), + l2BlockHash, + }, + 'test', + ); + + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent3, + 2, + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(300), + l2BlockHash, + }, + 'test', + ); + + const events = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: 0, + toBlock: 0 + 1000, + scopes: [scope], + }, + 'test', + ); expect(events.map(e => e.packedEvent)).toEqual([msgContent1, msgContent2, msgContent3]); }); @@ -253,47 +365,81 @@ describe('PrivateEventStore', () => { it('removes events after rollback block', async () => { // Store events in blocks 100, 200, 300 - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent1, 0, { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(100), - l2BlockHash, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent1, + 0, + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(100), + l2BlockHash, + }, + 'before-rollback', + ); // We add another event in the same block to verify that more events per block work. - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent2, 1, { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(100), - l2BlockHash, - }); - - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent3, 2, { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(200), - l2BlockHash, - }); - - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent4, 3, { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(300), - l2BlockHash, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent2, + 1, + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(100), + l2BlockHash, + }, + 'before-rollback', + ); + + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent3, + 2, + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(200), + l2BlockHash, + }, + 'before-rollback', + ); + + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent4, + 3, + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(300), + l2BlockHash, + }, + 'before-rollback', + ); + + await privateEventStore.commit('before-rollback'); // Rollback to block 150 (should remove events from blocks 200 and 300) await privateEventStore.rollbackEventsAfterBlock(150, 300); - const events = await privateEventStore.getPrivateEvents(eventSelector, { - contractAddress, - fromBlock: 0, - toBlock: 1000, - scopes: [scope], - }); + const events = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: 0, + toBlock: 1000, + scopes: [scope], + }, + 'after-rollback', + ); expect(events.length).toBe(2); expect(events[0].packedEvent).toEqual(msgContent1); @@ -306,66 +452,270 @@ describe('PrivateEventStore', () => { const reorgTxHash = TxHash.random(); // Store event at block 200 - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent1, 0, { - contractAddress, - scope, - txHash: reorgTxHash, - l2BlockNumber: BlockNumber(200), - l2BlockHash, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent1, + 0, + { + contractAddress, + scope, + txHash: reorgTxHash, + l2BlockNumber: BlockNumber(200), + l2BlockHash, + }, + 'before-rollback', + ); + + await privateEventStore.commit('before-rollback'); // Rollback to block 100 await privateEventStore.rollbackEventsAfterBlock(100, 200); // Verify event was removed - let events = await privateEventStore.getPrivateEvents(eventSelector, { - contractAddress, - fromBlock: 0, - toBlock: 1000, - scopes: [scope], - }); + let events = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: 0, + toBlock: 1000, + scopes: [scope], + }, + 'after-rollback', + ); expect(events.length).toBe(0); // Re-add the same event (same eventCommitmentIndex and txHash, as happens after a reorg) - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent1, 0, { - contractAddress, - scope, - txHash: reorgTxHash, - l2BlockNumber: BlockNumber(200), - l2BlockHash, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent1, + 0, + { + contractAddress, + scope, + txHash: reorgTxHash, + l2BlockNumber: BlockNumber(200), + l2BlockHash, + }, + 're-add', + ); + + await privateEventStore.commit('re-add'); // Verify event can be retrieved again - events = await privateEventStore.getPrivateEvents(eventSelector, { - contractAddress, - fromBlock: 0, - toBlock: 1000, - scopes: [scope], - }); + events = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: 0, + toBlock: 1000, + scopes: [scope], + }, + 'after-re-add', + ); expect(events.length).toBe(1); }); it('handles rollback with no events to remove', async () => { - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent1, 0, { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(100), - l2BlockHash, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent1, + 0, + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(100), + l2BlockHash, + }, + 'before-rollback', + ); + + await privateEventStore.commit('before-rollback'); // Rollback after all existing events await privateEventStore.rollbackEventsAfterBlock(200, 300); - const events = await privateEventStore.getPrivateEvents(eventSelector, { - contractAddress, - fromBlock: 0, - toBlock: 1000, - scopes: [scope], - }); + const events = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: 0, + toBlock: 1000, + scopes: [scope], + }, + 'after-rollback', + ); expect(events.length).toBe(1); expect(events[0].packedEvent).toEqual(msgContent1); }); }); + + describe('staging', () => { + it('stages events without affecting committed storage', async () => { + const commitJobId: string = 'commit-job'; + const stagingJobId: string = 'staging-job'; + + const committedEventRandomness = Fr.random(); + const stagedEventRandomness = Fr.random(); + + // Store committed event + await privateEventStore.storePrivateEventLog( + eventSelector, + committedEventRandomness, + msgContent, + eventCommitmentIndex, + { contractAddress, scope, txHash, l2BlockNumber, l2BlockHash }, + commitJobId, + ); + await privateEventStore.commit(commitJobId); + + // Store staged event (not committed) + const stagedMsgContent = getRandomMsgContent(); + await privateEventStore.storePrivateEventLog( + eventSelector, + stagedEventRandomness, + stagedMsgContent, + eventCommitmentIndex + 1, + { contractAddress, scope, txHash: TxHash.random(), l2BlockNumber, l2BlockHash }, + stagingJobId, + ); + + // With a fresh jobId, should only see committed event + const events = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: l2BlockNumber, + toBlock: l2BlockNumber + 1, + scopes: [scope], + }, + 'some-job-id', + ); + expect(events).toHaveLength(1); + expect(events[0].packedEvent).toEqual(msgContent); + }); + + it('staged events are visible when reading with jobId', async () => { + const stagingJobId: string = 'staging-job'; + const stagedEventRandomness = Fr.random(); + + const stagedMsgContent = getRandomMsgContent(); + await privateEventStore.storePrivateEventLog( + eventSelector, + stagedEventRandomness, + stagedMsgContent, + eventCommitmentIndex, + { contractAddress, scope, txHash, l2BlockNumber, l2BlockHash }, + stagingJobId, + ); + + // With fresh jobId, should not see the staged event + const eventsWithoutJobId = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: l2BlockNumber, + toBlock: l2BlockNumber + 1, + scopes: [scope], + }, + 'some-job-id', + ); + expect(eventsWithoutJobId).toHaveLength(0); + + // With jobId, should see the staged event + const eventsWithJobId = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: l2BlockNumber, + toBlock: l2BlockNumber + 1, + scopes: [scope], + }, + stagingJobId, + ); + expect(eventsWithJobId).toHaveLength(1); + expect(eventsWithJobId[0].packedEvent).toEqual(stagedMsgContent); + }); + + it('commit promotes staged events to main storage', async () => { + const stagingJobId: string = 'staging-job'; + const stagedEventRandomness = Fr.random(); + const stagedMsgContent = getRandomMsgContent(); + + await privateEventStore.storePrivateEventLog( + eventSelector, + stagedEventRandomness, + stagedMsgContent, + eventCommitmentIndex, + { contractAddress, scope, txHash, l2BlockNumber, l2BlockHash }, + stagingJobId, + ); + + await privateEventStore.commit(stagingJobId); + + // Now should see the event with a fresh jobId + const events = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: l2BlockNumber, + toBlock: l2BlockNumber + 1, + scopes: [scope], + }, + 'some-job-id', + ); + expect(events).toHaveLength(1); + expect(events[0].packedEvent).toEqual(stagedMsgContent); + }); + + it('discardStaged removes staged events without affecting main', async () => { + const commitJobId: string = 'commit-job'; + const stagingJobId: string = 'staging-job'; + const committedEventRandomness = Fr.random(); + const stagedEventRandomness = Fr.random(); + + // Store committed event + await privateEventStore.storePrivateEventLog( + eventSelector, + committedEventRandomness, + msgContent, + eventCommitmentIndex, + { contractAddress, scope, txHash, l2BlockNumber, l2BlockHash }, + commitJobId, + ); + await privateEventStore.commit(commitJobId); + + // Store staged event (not committed) + const stagedMsgContent = getRandomMsgContent(); + await privateEventStore.storePrivateEventLog( + eventSelector, + stagedEventRandomness, + stagedMsgContent, + eventCommitmentIndex + 1, + { contractAddress, scope, txHash: TxHash.random(), l2BlockNumber, l2BlockHash }, + stagingJobId, + ); + + // Discard staging + await privateEventStore.discardStaged(stagingJobId); + + // Should only see committed event + const events = await privateEventStore.getPrivateEvents( + eventSelector, + { + contractAddress, + fromBlock: l2BlockNumber, + toBlock: l2BlockNumber + 1, + scopes: [scope], + }, + 'some-job-id', + ); + expect(events).toHaveLength(1); + expect(events[0].packedEvent).toEqual(msgContent); + }); + }); }); diff --git a/yarn-project/pxe/src/storage/private_event_store/private_event_store.ts b/yarn-project/pxe/src/storage/private_event_store/private_event_store.ts index 3b30354da26c..6ca6c1c78a7a 100644 --- a/yarn-project/pxe/src/storage/private_event_store/private_event_store.ts +++ b/yarn-project/pxe/src/storage/private_event_store/private_event_store.ts @@ -8,6 +8,7 @@ import type { AztecAddress } from '@aztec/stdlib/aztec-address'; import { L2BlockHash } from '@aztec/stdlib/block'; import { type InTx, TxHash } from '@aztec/stdlib/tx'; +import type { StagedStore } from '../../job_coordinator/job_coordinator.js'; import type { PackedPrivateEvent } from '../../pxe.js'; export type PrivateEventStoreFilter = { @@ -37,7 +38,9 @@ type PrivateEventMetadata = InTx & { /** * Stores decrypted private event logs. */ -export class PrivateEventStore { +export class PrivateEventStore implements StagedStore { + readonly storeName: string = 'private_event'; + #store: AztecAsyncKVStore; /** Map storing the actual private event log entries, keyed by eventCommitmentIndex */ #eventLogs: AztecAsyncMap; @@ -48,6 +51,9 @@ export class PrivateEventStore { /** Map from eventCommitmentIndex to boolean indicating if log has been seen. */ #seenLogs: AztecAsyncMap; + /** jobId => eventCommitmentIndex => PrivateEventEntry */ + #eventLogsInJobStage: Map>; + logger = createLogger('private_event_store'); constructor(store: AztecAsyncKVStore) { @@ -56,12 +62,103 @@ export class PrivateEventStore { this.#eventsByContractScopeSelector = this.#store.openMap('events_by_contract_scope_selector'); this.#seenLogs = this.#store.openMap('seen_logs'); this.#eventsByBlockNumber = this.#store.openMap('events_by_block_number'); + + this.#eventLogsInJobStage = new Map(); } #keyFor(contractAddress: AztecAddress, scope: AztecAddress, eventSelector: EventSelector): string { return `${contractAddress.toString()}_${scope.toString()}_${eventSelector.toString()}`; } + async commit(jobId: string): Promise { + for (const [eventCommitmentIndex, eventEntry] of this.#getEventLogsInJobStage(jobId)) { + this.logger.verbose('storing private event log (KV store)', { + eventCommitmentIndex: eventEntry.eventCommitmentIndex, + l2BlockNumber: eventEntry.l2BlockNumber, + l2BlockHash: eventEntry.l2BlockHash, + txHash: eventEntry.txHash, + lookupKey: eventEntry.lookupKey, + }); + + await Promise.all([ + this.#eventLogs.set(eventCommitmentIndex, eventEntry), + (async () => { + const existingIndices = (await this.#eventsByContractScopeSelector.getAsync(eventEntry.lookupKey)) || []; + return this.#eventsByContractScopeSelector.set(eventEntry.lookupKey, [ + ...existingIndices, + eventCommitmentIndex, + ]); + })(), + (async () => { + const existingBlockIndices = (await this.#eventsByBlockNumber.getAsync(eventEntry.l2BlockNumber)) || []; + await this.#eventsByBlockNumber.set(eventEntry.l2BlockNumber, [ + ...existingBlockIndices, + eventCommitmentIndex, + ]); + })(), + this.#seenLogs.set(eventCommitmentIndex, true), + ]); + } + + this.#eventLogsInJobStage.delete(jobId); + } + + discardStaged(jobId: string): Promise { + this.#eventLogsInJobStage.delete(jobId); + return Promise.resolve(); + } + + #getEventLogsInJobStage(jobId: string): Map { + let jobStage = this.#eventLogsInJobStage.get(jobId); + if (jobStage === undefined) { + jobStage = new Map(); + this.#eventLogsInJobStage.set(jobId, jobStage); + } + return jobStage; + } + + async #isSeenLog(jobId: string, eventCommitmentIndex: number): Promise { + // getSeenLogsFromJobStage + const eventLogsInJobStage = this.#getEventLogsInJobStage(jobId).get(eventCommitmentIndex); + + if (eventLogsInJobStage) { + return true; + } + + // if not in job stage, fallback on #seenLogs index + return (await this.#seenLogs.getAsync(eventCommitmentIndex)) ?? false; + } + + #addEventLogToStage(jobId: string, eventCommitmentIndex: number, eventEntry: PrivateEventEntry) { + this.#getEventLogsInJobStage(jobId).set(eventCommitmentIndex, eventEntry); + } + + async #getEventCommitmentIndices( + jobId: string, + contractAddress: AztecAddress, + scope: AztecAddress, + eventSelector: EventSelector, + ): Promise { + const key = this.#keyFor(contractAddress, scope, eventSelector); + const eventCommitmentIndicesInStorage = new Set((await this.#eventsByContractScopeSelector.getAsync(key)) || []); + const eventCommitmentIndicesInJobStage = new Set( + [...this.#getEventLogsInJobStage(jobId).entries()] + .filter(([_, entry]) => entry.lookupKey === key) + .map(([idx, _]) => idx), + ); + return [...eventCommitmentIndicesInStorage.union(eventCommitmentIndicesInJobStage)]; + } + + async #getEventLogByEventCommitmentIndex( + jobId: string, + eventCommitmentIndex: number, + ): Promise { + return ( + this.#getEventLogsInJobStage(jobId).get(eventCommitmentIndex) ?? + (await this.#eventLogs.getAsync(eventCommitmentIndex)) + ); + } + /** * Store a private event log. * @param eventSelector - The event selector of the event. @@ -79,6 +176,7 @@ export class PrivateEventStore { msgContent: Fr[], eventCommitmentIndex: number, metadata: PrivateEventMetadata, + jobId: string, ): Promise { const { contractAddress, scope, txHash, l2BlockNumber, l2BlockHash } = metadata; @@ -86,15 +184,20 @@ export class PrivateEventStore { const key = this.#keyFor(contractAddress, scope, eventSelector); // Check if this exact log has already been stored using eventCommitmentIndex as unique identifier - const hasBeenSeen = await this.#seenLogs.getAsync(eventCommitmentIndex); + const hasBeenSeen = await this.#isSeenLog(jobId, eventCommitmentIndex); if (hasBeenSeen) { this.logger.verbose('Ignoring duplicate event log', { txHash: txHash.toString(), eventCommitmentIndex }); return; } - this.logger.verbose('storing private event log', { contractAddress, scope, msgContent, l2BlockNumber }); + this.logger.verbose('storing private event log (job stage)', { + contractAddress, + scope, + msgContent, + l2BlockNumber, + }); - await this.#eventLogs.set(eventCommitmentIndex, { + this.#addEventLogToStage(jobId, eventCommitmentIndex, { randomness, msgContent: serializeToBuffer(msgContent), l2BlockNumber, @@ -103,15 +206,6 @@ export class PrivateEventStore { txHash: txHash.toBuffer(), lookupKey: key, }); - - const existingIndices = (await this.#eventsByContractScopeSelector.getAsync(key)) || []; - await this.#eventsByContractScopeSelector.set(key, [...existingIndices, eventCommitmentIndex]); - - const existingBlockIndices = (await this.#eventsByBlockNumber.getAsync(l2BlockNumber)) || []; - await this.#eventsByBlockNumber.set(l2BlockNumber, [...existingBlockIndices, eventCommitmentIndex]); - - // Mark this log as seen using eventCommitmentIndex - await this.#seenLogs.set(eventCommitmentIndex, true); }); } @@ -129,15 +223,20 @@ export class PrivateEventStore { public async getPrivateEvents( eventSelector: EventSelector, filter: PrivateEventStoreFilter, + jobId: string, ): Promise { const events: Array<{ eventCommitmentIndex: number; event: PackedPrivateEvent }> = []; for (const scope of filter.scopes) { - const key = this.#keyFor(filter.contractAddress, scope, eventSelector); - const eventCommitmentIndices = (await this.#eventsByContractScopeSelector.getAsync(key)) || []; + const eventCommitmentIndices = await this.#getEventCommitmentIndices( + jobId, + filter.contractAddress, + scope, + eventSelector, + ); for (const eventCommitmentIndex of eventCommitmentIndices) { - const entry = await this.#eventLogs.getAsync(eventCommitmentIndex); + const entry = await this.#getEventLogByEventCommitmentIndex(jobId, eventCommitmentIndex); if (!entry || entry.l2BlockNumber < filter.fromBlock || entry.l2BlockNumber >= filter.toBlock) { continue; } @@ -174,6 +273,14 @@ export class PrivateEventStore { /** * Rolls back private events that were stored after a given `blockNumber` and up to `synchedBlockNumber` (the block * number up to which PXE managed to sync before the reorg happened). + * + * We don't need staged writes for a rollback since it's handled in the context of a blockchain rewind. + * An interruption of that process midway through leaves the store in a state in which PXE still will + * know that it has to rewind, so there's no risk of data integrity corruption in this case. + * + * We could still decide to encase this in staged writes for the sake of symmetry, but that would entail also making + * the rest of the anchor block synchronization components work like that, and given the reasons above it doesn't + * seem necessary. */ public async rollbackEventsAfterBlock(blockNumber: number, synchedBlockNumber: number): Promise { await this.#store.transactionAsync(async () => { diff --git a/yarn-project/txe/src/oracle/txe_oracle_top_level_context.ts b/yarn-project/txe/src/oracle/txe_oracle_top_level_context.ts index 9793e262ecdd..4ae9f5d098bc 100644 --- a/yarn-project/txe/src/oracle/txe_oracle_top_level_context.ts +++ b/yarn-project/txe/src/oracle/txe_oracle_top_level_context.ts @@ -170,12 +170,16 @@ export class TXEOracleTopLevelContext implements IMiscOracle, ITxeExecutionOracl async txeGetPrivateEvents(selector: EventSelector, contractAddress: AztecAddress, scope: AztecAddress) { return ( - await this.privateEventStore.getPrivateEvents(selector, { - contractAddress, - scopes: [scope], - fromBlock: 0, - toBlock: (await this.getLastBlockNumber()) + 1, - }) + await this.privateEventStore.getPrivateEvents( + selector, + { + contractAddress, + scopes: [scope], + fromBlock: 0, + toBlock: (await this.getLastBlockNumber()) + 1, + }, + TXE_JOB_ID, + ) ).map(e => e.packedEvent); }