diff --git a/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution_oracle.ts b/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution_oracle.ts index a19c34d72991..64186b31ce30 100644 --- a/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution_oracle.ts +++ b/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution_oracle.ts @@ -356,7 +356,7 @@ export class PrivateExecutionOracle extends UtilityExecutionOracle implements IP const pendingNullifiers = this.noteCache.getNullifiers(this.callContext.contractAddress); - const noteService = new NoteService(this.noteStore, this.aztecNode, this.anchorBlockStore); + const noteService = new NoteService(this.noteStore, this.aztecNode, this.anchorBlockStore, this.jobId); const dbNotes = await noteService.getNotes( this.callContext.contractAddress, owner, diff --git a/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts b/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts index 1fcd05d35c6b..4a560e6310ef 100644 --- a/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts +++ b/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts @@ -257,7 +257,7 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra offset: number, status: NoteStatus, ): Promise { - const noteService = new NoteService(this.noteStore, this.aztecNode, this.anchorBlockStore); + const noteService = new NoteService(this.noteStore, this.aztecNode, this.anchorBlockStore, this.jobId); const dbNotes = await noteService.getNotes(this.contractAddress, owner, storageSlot, status, this.scopes); return pickNotes(dbNotes, { @@ -359,7 +359,7 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra await logService.syncTaggedLogs(this.contractAddress, pendingTaggedLogArrayBaseSlot, this.scopes); - const noteService = new NoteService(this.noteStore, this.aztecNode, this.anchorBlockStore); + const noteService = new NoteService(this.noteStore, this.aztecNode, this.anchorBlockStore, this.jobId); await noteService.syncNoteNullifiers(this.contractAddress); } @@ -393,7 +393,7 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra await this.capsuleStore.readCapsuleArray(contractAddress, eventValidationRequestsArrayBaseSlot, this.jobId) ).map(EventValidationRequest.fromFields); - const noteService = new NoteService(this.noteStore, this.aztecNode, this.anchorBlockStore); + const noteService = new NoteService(this.noteStore, this.aztecNode, this.anchorBlockStore, this.jobId); const noteDeliveries = noteValidationRequests.map(request => noteService.deliverNote( request.contractAddress, @@ -409,7 +409,7 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra ), ); - const eventService = new EventService(this.anchorBlockStore, this.aztecNode, this.privateEventStore); + const eventService = new EventService(this.anchorBlockStore, this.aztecNode, this.privateEventStore, this.jobId); const eventDeliveries = eventValidationRequests.map(request => eventService.deliverEvent( request.contractAddress, diff --git a/yarn-project/pxe/src/debug/pxe_debug_utils.ts b/yarn-project/pxe/src/debug/pxe_debug_utils.ts index ce107a17c073..2d6d32443e0b 100644 --- a/yarn-project/pxe/src/debug/pxe_debug_utils.ts +++ b/yarn-project/pxe/src/debug/pxe_debug_utils.ts @@ -1,3 +1,4 @@ +import { randomBytes } from '@aztec/foundation/crypto/random'; import type { NoteDao, NotesFilter } from '@aztec/stdlib/note'; import type { PXE } from '../pxe.js'; @@ -43,6 +44,6 @@ export class PXEDebugUtils { const call = await this.contractStore.getFunctionCall('sync_private_state', [], filter.contractAddress); await this.#pxe.simulateUtility(call); - return this.noteStore.getNotes(filter); + return this.noteStore.getNotes(filter, randomBytes(8).toString('hex')); } } diff --git a/yarn-project/pxe/src/events/event_service.test.ts b/yarn-project/pxe/src/events/event_service.test.ts index ca675dc6df70..572a39a77aa0 100644 --- a/yarn-project/pxe/src/events/event_service.test.ts +++ b/yarn-project/pxe/src/events/event_service.test.ts @@ -80,15 +80,15 @@ describe('deliverEvent', () => { aztecNode.getTxEffect.mockImplementation(() => Promise.resolve(indexedTxEffect)); - eventService = new EventService(anchorBlockStore, aztecNode, privateEventStore); + eventService = new EventService(anchorBlockStore, aztecNode, privateEventStore, 'test'); }); - function runDeliverEvent( + async function runDeliverEvent( overrides: { eventCommitment?: Fr; } = {}, ) { - return eventService.deliverEvent( + await eventService.deliverEvent( contractAddress, eventSelector, randomness, @@ -97,6 +97,8 @@ describe('deliverEvent', () => { txEffect.txHash, recipient, ); + + await privateEventStore.commit('test'); } it('should throw when tx does not exist or has no effects', async () => { diff --git a/yarn-project/pxe/src/events/event_service.ts b/yarn-project/pxe/src/events/event_service.ts index f3ebd2ff7727..68363e035958 100644 --- a/yarn-project/pxe/src/events/event_service.ts +++ b/yarn-project/pxe/src/events/event_service.ts @@ -13,6 +13,7 @@ export class EventService { private readonly anchorBlockStore: AnchorBlockStore, private readonly aztecNode: AztecNode, private readonly privateEventStore: PrivateEventStore, + private readonly jobId: string, ) {} public async deliverEvent( @@ -52,14 +53,21 @@ export class EventService { ); } - return this.privateEventStore.storePrivateEventLog(selector, randomness, content, siloedEventCommitment, { - contractAddress, - scope, - txHash, - l2BlockNumber: txEffect.l2BlockNumber, - l2BlockHash: txEffect.l2BlockHash, - txIndexInBlock: txEffect.txIndexInBlock, - eventIndexInTx, - }); + return this.privateEventStore.storePrivateEventLog( + selector, + randomness, + content, + siloedEventCommitment, + { + contractAddress, + scope, + txHash, + l2BlockNumber: txEffect.l2BlockNumber, + l2BlockHash: txEffect.l2BlockHash, + txIndexInBlock: txEffect.txIndexInBlock, + eventIndexInTx, + }, + this.jobId, + ); } } diff --git a/yarn-project/pxe/src/events/private_event_filter_validator.test.ts b/yarn-project/pxe/src/events/private_event_filter_validator.test.ts index 58f6e9585d03..f98afd3d7638 100644 --- a/yarn-project/pxe/src/events/private_event_filter_validator.test.ts +++ b/yarn-project/pxe/src/events/private_event_filter_validator.test.ts @@ -25,17 +25,17 @@ describe('PrivateEventFilterValidator', () => { anchorBlockStore = mock(); anchorBlockStore.getBlockHeader.mockResolvedValue(lastKnownBlock); - validator = new PrivateEventFilterValidator(anchorBlockStore); + validator = new PrivateEventFilterValidator(lastKnownBlockNumber); }); - it('rejects empty scope', async () => { - await expect(validator.validate({ contractAddress, fromBlock: lastKnownBlockNumber, scopes: [] })).rejects.toThrow( + it('rejects empty scope', () => { + expect(() => validator.validate({ contractAddress, fromBlock: lastKnownBlockNumber, scopes: [] })).toThrow( /At least one scope is required to get private events/, ); }); - it('defaults to whole range', async () => { - const dataProviderFilter = await validator.validate({ contractAddress, scopes: [scope] }); + it('defaults to whole range', () => { + const dataProviderFilter = validator.validate({ contractAddress, scopes: [scope] }); expect(dataProviderFilter).toEqual({ contractAddress, scopes: [scope], @@ -45,8 +45,8 @@ describe('PrivateEventFilterValidator', () => { }); }); - it('toBlock defaults to lastKnownBlock + 1', async () => { - const dataProviderFilter = await validator.validate({ + it('toBlock defaults to lastKnownBlock + 1', () => { + const dataProviderFilter = validator.validate({ contractAddress, scopes: [scope], fromBlock: INITIAL_L2_BLOCK_NUM, @@ -59,8 +59,8 @@ describe('PrivateEventFilterValidator', () => { }); }); - it('toBlock without fromBlock defaults to [INITIAL_L2_BLOCK_NUM, toBlock)', async () => { - const dataProviderFilter = await validator.validate({ + it('toBlock without fromBlock defaults to [INITIAL_L2_BLOCK_NUM, toBlock)', () => { + const dataProviderFilter = validator.validate({ contractAddress, scopes: [scope], toBlock: BlockNumber(lastKnownBlockNumber + 1), @@ -73,28 +73,28 @@ describe('PrivateEventFilterValidator', () => { }); }); - it('rejects fromBlock >= toBlock', async () => { - await expect( + it('rejects fromBlock >= toBlock', () => { + expect(() => validator.validate({ contractAddress, scopes: [scope], fromBlock: lastKnownBlockNumber, toBlock: lastKnownBlockNumber, }), - ).rejects.toThrow(/toBlock must be strictly greater than fromBlock/); + ).toThrow(/toBlock must be strictly greater than fromBlock/); - await expect( + expect(() => validator.validate({ contractAddress, scopes: [scope], fromBlock: lastKnownBlockNumber, toBlock: BlockNumber(lastKnownBlockNumber - 1), }), - ).rejects.toThrow(/toBlock must be strictly greater than fromBlock/); + ).toThrow(/toBlock must be strictly greater than fromBlock/); }); - it('preserves txHash', async () => { - let dataProviderFilter = await validator.validate({ + it('preserves txHash', () => { + let dataProviderFilter = validator.validate({ contractAddress, scopes: [scope], fromBlock: INITIAL_L2_BLOCK_NUM, @@ -108,7 +108,7 @@ describe('PrivateEventFilterValidator', () => { }); const txHash = TxHash.random(); - dataProviderFilter = await validator.validate({ + dataProviderFilter = validator.validate({ contractAddress, scopes: [scope], fromBlock: INITIAL_L2_BLOCK_NUM, diff --git a/yarn-project/pxe/src/events/private_event_filter_validator.ts b/yarn-project/pxe/src/events/private_event_filter_validator.ts index dda3ca856db3..b7805675f83e 100644 --- a/yarn-project/pxe/src/events/private_event_filter_validator.ts +++ b/yarn-project/pxe/src/events/private_event_filter_validator.ts @@ -2,13 +2,12 @@ import type { PrivateEventFilter } from '@aztec/aztec.js/wallet'; import { INITIAL_L2_BLOCK_NUM } from '@aztec/constants'; import { BlockNumber } from '@aztec/foundation/branded-types'; -import { AnchorBlockStore } from '../storage/anchor_block_store/anchor_block_store.js'; import type { PrivateEventStoreFilter } from '../storage/private_event_store/private_event_store.js'; export class PrivateEventFilterValidator { - constructor(private anchorBlockStore: AnchorBlockStore) {} + constructor(private lastBlock: BlockNumber) {} - async validate(filter: PrivateEventFilter): Promise { + validate(filter: PrivateEventFilter): PrivateEventStoreFilter { let { fromBlock, toBlock } = filter; // Block range filters in Aztec Node are defined as closed-open intervals [fromBlock, toBlock), so @@ -16,9 +15,8 @@ export class PrivateEventFilterValidator { // We then default to [INITIAL_L2_BLOCK_NUM, latestKnownBlock + 1), ie: by default return events from // the first block to the latest known block. if (!fromBlock || !toBlock) { - const lastKnownBlock = (await this.anchorBlockStore.getBlockHeader()).getBlockNumber(); fromBlock = fromBlock ?? BlockNumber(INITIAL_L2_BLOCK_NUM); - toBlock = toBlock ?? BlockNumber(lastKnownBlock + 1); + toBlock = toBlock ?? BlockNumber(this.lastBlock + 1); } if (filter.scopes.length === 0) { diff --git a/yarn-project/pxe/src/notes/note_service.test.ts b/yarn-project/pxe/src/notes/note_service.test.ts index 31d08af92dc9..671fdfbf2d13 100644 --- a/yarn-project/pxe/src/notes/note_service.test.ts +++ b/yarn-project/pxe/src/notes/note_service.test.ts @@ -44,81 +44,104 @@ describe('NoteService', () => { contractAddress = await AztecAddress.random(); - // Check that there are no notes in the database - const notes = await noteStore.getNotes({ contractAddress }); + const notes = await noteStore.getNotes({ contractAddress }, 'test'); expect(notes).toHaveLength(0); - // Check that the expected number of accounts is present const accounts = await keyStore.getAccounts(); expect(accounts).toHaveLength(0); recipient = await keyStore.addAccount(new Fr(69), Fr.random()); - noteService = new NoteService(noteStore, aztecNode, anchorBlockStore); + noteService = new NoteService(noteStore, aztecNode, anchorBlockStore, 'test'); }); it('should remove notes that have been nullified', async () => { - // Set up initial state with a note const noteDao = await NoteDao.random({ contractAddress }); // Spy on the noteStore.applyNullifiers to later on have additional guarantee that we really removed // the note. jest.spyOn(noteStore, 'applyNullifiers'); - // Add the note to storage - await noteStore.addNotes([noteDao], recipient.address); + await noteStore.addNotes([noteDao], recipient.address, 'test'); // Set up the nullifier in the merkle tree const nullifierIndex = randomDataInBlock(123n); aztecNode.findLeavesIndexes.mockResolvedValue([nullifierIndex]); - // Call the function under test await noteService.syncNoteNullifiers(contractAddress); - // Verify the note was removed by checking storage - const remainingNotes = await noteStore.getNotes({ - contractAddress, - status: NoteStatus.ACTIVE, - scopes: [recipient.address], - }); + const remainingNotes = await noteStore.getNotes( + { + contractAddress, + status: NoteStatus.ACTIVE, + scopes: [recipient.address], + }, + 'test', + ); expect(remainingNotes).toHaveLength(0); // Verify the note was removed by checking the spy expect(noteStore.applyNullifiers).toHaveBeenCalledTimes(1); + + // Verify that the changes persist after job completion + { + await noteStore.commit('test'); + const remainingNotes = await noteStore.getNotes( + { + contractAddress, + status: NoteStatus.ACTIVE, + scopes: [recipient.address], + }, + 'fresh-job', + ); + expect(remainingNotes).toHaveLength(0); + } }); it('should keep notes that have not been nullified', async () => { - // Set up initial state with a note const noteDao = await NoteDao.random({ contractAddress }); - // Add the note to storage - await noteStore.addNotes([noteDao], recipient.address); + await noteStore.addNotes([noteDao], recipient.address, 'test'); // No nullifier found in merkle tree aztecNode.findLeavesIndexes.mockResolvedValue([undefined]); - // Call the function under test await noteService.syncNoteNullifiers(contractAddress); - // Verify note still exists - const remainingNotes = await noteStore.getNotes({ - contractAddress, - status: NoteStatus.ACTIVE, - scopes: [recipient.address], - }); + const remainingNotes = await noteStore.getNotes( + { + contractAddress, + status: NoteStatus.ACTIVE, + scopes: [recipient.address], + }, + 'test', + ); expect(remainingNotes).toHaveLength(1); expect(remainingNotes[0]).toEqual(noteDao); + + // Verify that the changes persist after job completion + { + await noteStore.commit('test'); + const remainingNotes = await noteStore.getNotes( + { + contractAddress, + status: NoteStatus.ACTIVE, + scopes: [recipient.address], + }, + 'fresh-job', + ); + expect(remainingNotes).toHaveLength(1); + expect(remainingNotes[0]).toEqual(noteDao); + } }); // Verifies that notes are not marked as nullified when their nullifier only exists in blocks that haven't been // synced yet. We mock the nullifier to only exist in blocks beyond our current sync point, then verify the note // is not removed by applyNullifiers. it('should not remove notes if nullifier is in unsynced blocks', async () => { - // Set up initial state with a note const noteDao = await NoteDao.random({ contractAddress }); - // Add the note to storage - await noteStore.addNotes([noteDao], recipient.address); + await noteStore.addNotes([noteDao], recipient.address, 'test'); // Mock nullifier to only exist after synced block aztecNode.findLeavesIndexes.mockImplementation(blockNum => { @@ -128,37 +151,51 @@ describe('NoteService', () => { return Promise.resolve([undefined]); }); - // Call the function under test await noteService.syncNoteNullifiers(contractAddress); // Verify note still exists - const remainingNotes = await noteStore.getNotes({ - contractAddress, - status: NoteStatus.ACTIVE, - scopes: [recipient.address], - }); + const remainingNotes = await noteStore.getNotes( + { + contractAddress, + status: NoteStatus.ACTIVE, + scopes: [recipient.address], + }, + 'test', + ); expect(remainingNotes).toHaveLength(1); expect(remainingNotes[0]).toEqual(noteDao); + + // Verify that the changes persist after job completion + { + await noteStore.commit('test'); + const remainingNotes = await noteStore.getNotes( + { + contractAddress, + status: NoteStatus.ACTIVE, + scopes: [recipient.address], + }, + 'fresh-job', + ); + expect(remainingNotes).toHaveLength(1); + expect(remainingNotes[0]).toEqual(noteDao); + } }); it('should search for notes from all accounts', async () => { - // Add multiple accounts to keystore await keyStore.addAccount(Fr.random(), Fr.random()); await keyStore.addAccount(Fr.random(), Fr.random()); expect(await keyStore.getAccounts()).toHaveLength(3); - // Spy on the noteStore.getNotesSpy const getNotesSpy = jest.spyOn(noteStore, 'getNotes'); - // Call the function under test await noteService.syncNoteNullifiers(contractAddress); // Verify applyNullifiers was called once for all accounts expect(getNotesSpy).toHaveBeenCalledTimes(1); - // Verify getNotes was called with the correct contract address - expect(getNotesSpy).toHaveBeenCalledWith(expect.objectContaining({ contractAddress })); + // Verify getNotes was called with the correct contract address and jobId + expect(getNotesSpy).toHaveBeenCalledWith(expect.objectContaining({ contractAddress }), 'test'); }); describe('deliverNote', () => { @@ -248,10 +285,20 @@ describe('NoteService', () => { ); // Verify note was stored - const notes = await noteStore.getNotes({ contractAddress, scopes: [recipient.address] }); + const notes = await noteStore.getNotes({ contractAddress, scopes: [recipient.address] }, 'test'); expect(notes).toHaveLength(1); expect(notes[0].noteHash.equals(noteHash)).toBe(true); + + // Verify note is still stored after committing job + { + await noteStore.commit('test'); + + const notes = await noteStore.getNotes({ contractAddress, scopes: [recipient.address] }, 'fresh-job'); + + expect(notes).toHaveLength(1); + expect(notes[0].noteHash.equals(noteHash)).toBe(true); + } }); it('should throw if tx hash does not exist', async () => { @@ -332,22 +379,35 @@ describe('NoteService', () => { recipient.address, ); - // Now we verify that the note is stored as nullified by checking it can be retrieved only with - // the ACTIVE_OR_NULLIFIED status on the input. - const allNotes = await noteStore.getNotes({ - contractAddress, - scopes: [recipient.address], - status: NoteStatus.ACTIVE_OR_NULLIFIED, - }); - expect(allNotes).toHaveLength(1); - expect(allNotes[0].noteHash.equals(noteHash)).toBe(true); + const verifyInJobContext = async (jobId: string) => { + // Now we verify that the note is stored as nullified by checking it can be retrieved only with + // the ACTIVE_OR_NULLIFIED status on the input. + const allNotes = await noteStore.getNotes( + { + contractAddress, + scopes: [recipient.address], + status: NoteStatus.ACTIVE_OR_NULLIFIED, + }, + jobId, + ); + expect(allNotes).toHaveLength(1); + expect(allNotes[0].noteHash.equals(noteHash)).toBe(true); + + const activeNotes = await noteStore.getNotes( + { + contractAddress, + scopes: [recipient.address], + status: NoteStatus.ACTIVE, + }, + jobId, + ); + expect(activeNotes).toHaveLength(0); + }; - const activeNotes = await noteStore.getNotes({ - contractAddress, - scopes: [recipient.address], - status: NoteStatus.ACTIVE, - }); - expect(activeNotes).toHaveLength(0); + // Verify store behaves correctly pre and post commit + await verifyInJobContext('test'); + await noteStore.commit('test'); + await verifyInJobContext('fresh-job'); }); }); }); diff --git a/yarn-project/pxe/src/notes/note_service.ts b/yarn-project/pxe/src/notes/note_service.ts index 13f903a6f290..18f3eddabff6 100644 --- a/yarn-project/pxe/src/notes/note_service.ts +++ b/yarn-project/pxe/src/notes/note_service.ts @@ -15,6 +15,7 @@ export class NoteService { private readonly noteStore: NoteStore, private readonly aztecNode: AztecNode, private readonly anchorBlockStore: AnchorBlockStore, + private readonly jobId: string, ) {} /** @@ -33,13 +34,16 @@ export class NoteService { status: NoteStatus, scopes?: AztecAddress[], ) { - const noteDaos = await this.noteStore.getNotes({ - contractAddress, - owner, - storageSlot, - status, - scopes, - }); + const noteDaos = await this.noteStore.getNotes( + { + contractAddress, + owner, + storageSlot, + status, + scopes, + }, + this.jobId, + ); return noteDaos.map( ({ contractAddress, owner, storageSlot, randomness, noteNonce, note, noteHash, siloedNullifier }) => ({ contractAddress, @@ -70,7 +74,7 @@ export class NoteService { public async syncNoteNullifiers(contractAddress: AztecAddress): Promise { const syncedBlockNumber = (await this.anchorBlockStore.getBlockHeader()).getBlockNumber(); - const contractNotes = await this.noteStore.getNotes({ contractAddress }); + const contractNotes = await this.noteStore.getNotes({ contractAddress }, this.jobId); if (contractNotes.length === 0) { return; @@ -104,7 +108,7 @@ export class NoteService { }) .filter(nullifier => nullifier !== undefined) as DataInBlock[]; - await this.noteStore.applyNullifiers(foundNullifiers); + await this.noteStore.applyNullifiers(foundNullifiers, this.jobId); } public async deliverNote( @@ -180,12 +184,17 @@ export class NoteService { ); // The note was found by `recipient`, so we use that as the scope when storing the note. - await this.noteStore.addNotes([noteDao], recipient); + await this.noteStore.addNotes([noteDao], recipient, this.jobId); if (nullifierIndex !== undefined) { // We found nullifier index which implies that the note has already been nullified. - const { data: _, ...blockHashAndNum } = nullifierIndex; - await this.noteStore.applyNullifiers([{ data: siloedNullifier, ...blockHashAndNum }]); + // Only apply nullifier if the note isn't already nullified + // (it might have been nullified by a previous sync operation in this job) + const alreadyNullified = await this.noteStore.isNoteNullified(noteDao, this.jobId); + if (!alreadyNullified) { + const { data: _, ...blockHashAndNum } = nullifierIndex; + await this.noteStore.applyNullifiers([{ data: siloedNullifier, ...blockHashAndNum }], this.jobId); + } } } } diff --git a/yarn-project/pxe/src/pxe.test.ts b/yarn-project/pxe/src/pxe.test.ts index 31fea2a5399a..fed85e0d86e4 100644 --- a/yarn-project/pxe/src/pxe.test.ts +++ b/yarn-project/pxe/src/pxe.test.ts @@ -238,6 +238,7 @@ describe('PXE', () => { txIndexInBlock: 0, eventIndexInTx: eventCounter++, }, + 'test', ); return event; @@ -247,6 +248,7 @@ describe('PXE', () => { // Store a couple of events to exercise `getPrivateEvents` const event1 = await storeEvent(); const event2 = await storeEvent(); + await privateEventStore.commit('test'); const events = await pxe.getPrivateEvents(eventSelector, { contractAddress, @@ -287,6 +289,8 @@ describe('PXE', () => { storeEvent(lastKnownBlockNumber + 1), storeEvent(lastKnownBlockNumber + 1), ]); + + await privateEventStore.commit('test'); }); it('filters by txHash', async () => { diff --git a/yarn-project/pxe/src/pxe.ts b/yarn-project/pxe/src/pxe.ts index e5b5f8cab472..50b0730f92e9 100644 --- a/yarn-project/pxe/src/pxe.ts +++ b/yarn-project/pxe/src/pxe.ts @@ -1,4 +1,5 @@ import type { PrivateEventFilter } from '@aztec/aztec.js/wallet'; +import { BlockNumber } from '@aztec/foundation/branded-types'; import { Fr } from '@aztec/foundation/curves/bn254'; import { type Logger, createLogger } from '@aztec/foundation/log'; import { SerialQueue } from '@aztec/foundation/queue'; @@ -157,7 +158,13 @@ export class PXE { ); const jobCoordinator = new JobCoordinator(store); - jobCoordinator.registerStores([capsuleStore, senderTaggingStore, recipientTaggingStore]); + jobCoordinator.registerStores([ + capsuleStore, + senderTaggingStore, + recipientTaggingStore, + privateEventStore, + noteStore, + ]); const debugUtils = new PXEDebugUtils(contractStore, noteStore); @@ -1009,9 +1016,17 @@ export class PXE { * Defaults to the latest known block to PXE + 1. * @returns - The packed events with block and tx metadata. */ - public getPrivateEvents(eventSelector: EventSelector, filter: PrivateEventFilter): Promise { - return this.#putInJobQueue(async jobId => { + public async getPrivateEvents( + eventSelector: EventSelector, + filter: PrivateEventFilter, + ): Promise { + let anchorBlockNumber: BlockNumber; + + await this.#putInJobQueue(async jobId => { await this.blockStateSynchronizer.sync(); + + anchorBlockNumber = (await this.anchorBlockStore.getBlockHeader()).getBlockNumber(); + const contractFunctionSimulator = this.#getSimulatorForTx(); await this.contractStore.syncPrivateState( @@ -1020,15 +1035,16 @@ export class PXE { async privateSyncCall => await this.#simulateUtility(contractFunctionSimulator, privateSyncCall, [], undefined, jobId), ); + }); - const sanitizedFilter = await new PrivateEventFilterValidator(this.anchorBlockStore).validate(filter); + const sanitizedFilter = new PrivateEventFilterValidator(anchorBlockNumber!).validate(filter); - this.log.debug( - `Getting private events for ${sanitizedFilter.contractAddress.toString()} from ${sanitizedFilter.fromBlock} to ${sanitizedFilter.toBlock}`, - ); + this.log.debug( + `Getting private events for ${sanitizedFilter.contractAddress.toString()} from ${sanitizedFilter.fromBlock} to ${sanitizedFilter.toBlock}`, + ); - return this.privateEventStore.getPrivateEvents(eventSelector, sanitizedFilter); - }); + // sanitizedFilter is assigned during the synchro + return this.privateEventStore.getPrivateEvents(eventSelector, sanitizedFilter); } /** diff --git a/yarn-project/pxe/src/storage/note_store/note_store.test.ts b/yarn-project/pxe/src/storage/note_store/note_store.test.ts index 9e4b2538341f..27cfe4e6a8f9 100644 --- a/yarn-project/pxe/src/storage/note_store/note_store.test.ts +++ b/yarn-project/pxe/src/storage/note_store/note_store.test.ts @@ -38,10 +38,10 @@ describe('NoteStore', () => { // Sets up a fresh NoteStore with two scopes and three notes. async function setupProviderWithNotes(storeName: string) { const store = await openTmpStore(storeName); - const provider = await NoteStore.create(store); + const noteStore = await NoteStore.create(store); - await provider.addScope(SCOPE_1); - await provider.addScope(SCOPE_2); + await noteStore.addScope(SCOPE_1); + await noteStore.addScope(SCOPE_2); const note1 = await mkNote({ contractAddress: CONTRACT_A, @@ -59,10 +59,11 @@ describe('NoteStore', () => { siloedNullifier: SILOED_NULLIFIER_3, }); - await provider.addNotes([note1, note2], SCOPE_1); - await provider.addNotes([note3], SCOPE_2); + await noteStore.addNotes([note1, note2], SCOPE_1, 'before-each-test-job'); + await noteStore.addNotes([note3], SCOPE_2, 'before-each-test-job'); + await noteStore.commit('before-each-test-job'); - return { store, provider, note1, note2, note3 }; + return { store, noteStore, note1, note2, note3 }; } // Helper to create a nullifier object matching a given note. @@ -85,7 +86,7 @@ describe('NoteStore', () => { const store = await openTmpStore('note_store_fresh_store'); const provider = await NoteStore.create(store); - const res = await provider.getNotes({ contractAddress: CONTRACT_A }); + const res = await provider.getNotes({ contractAddress: CONTRACT_A }, 'test'); expect(Array.isArray(res)).toBe(true); expect(res).toHaveLength(0); @@ -95,20 +96,21 @@ describe('NoteStore', () => { it('re-initializes from an existing store and restores previously added notes', async () => { const store = await openTmpStore('note_store_re-init_test'); - // First provider populates the store; second reopens it to verify persistence - const provider1 = await NoteStore.create(store); + // First note store populates the KV store; second reopens it to verify persistence + const noteStore1 = await NoteStore.create(store); - await provider1.addScope(SCOPE_1); - await provider1.addScope(SCOPE_2); + await noteStore1.addScope(SCOPE_1); + await noteStore1.addScope(SCOPE_2); const noteA = await mkNote({ contractAddress: CONTRACT_A, siloedNullifier: SILOED_NULLIFIER_1 }); const noteB = await mkNote({ contractAddress: CONTRACT_B, siloedNullifier: SILOED_NULLIFIER_2 }); - await provider1.addNotes([noteA, noteB], FAKE_ADDRESS); + await noteStore1.addNotes([noteA, noteB], FAKE_ADDRESS, 'first-store'); + await noteStore1.commit('first-store'); - const provider2 = await NoteStore.create(store); + const noteStore2 = await NoteStore.create(store); - const notesA = await provider2.getNotes({ contractAddress: CONTRACT_A }); - const notesB = await provider2.getNotes({ contractAddress: CONTRACT_B }); + const notesA = await noteStore2.getNotes({ contractAddress: CONTRACT_A }, 'second-store'); + const notesB = await noteStore2.getNotes({ contractAddress: CONTRACT_B }, 'second-store'); expect(new Set(getNullifiers(notesA))).toEqual(new Set([SILOED_NULLIFIER_1.toBigInt()])); expect(new Set(getNullifiers(notesB))).toEqual(new Set([SILOED_NULLIFIER_2.toBigInt()])); @@ -119,13 +121,13 @@ describe('NoteStore', () => { describe('NoteStore.getNotes filtering happy path', () => { let store: AztecLMDBStoreV2; - let provider: NoteStore; + let noteStore: NoteStore; let note1: NoteDao; let note2: NoteDao; let note3: NoteDao; beforeEach(async () => { - ({ store, provider, note1, note2, note3 } = await setupProviderWithNotes('note_store_get_notes_happy')); + ({ store, noteStore, note1, note2, note3 } = await setupProviderWithNotes('note_store_get_notes_happy')); }); afterEach(async () => { @@ -133,7 +135,7 @@ describe('NoteStore', () => { }); it('filters notes matching only the contractAddress', async () => { - const res = await provider.getNotes({ contractAddress: CONTRACT_A }); + const res = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'test'); // note1 and note2 match CONTRACT_A expect(new Set(getNullifiers(res))).toEqual( new Set([note1.siloedNullifier.toBigInt(), note2.siloedNullifier.toBigInt()]), @@ -141,12 +143,12 @@ describe('NoteStore', () => { }); it('filters notes matching contractAddress and storageSlot', async () => { - const res = await provider.getNotes({ contractAddress: CONTRACT_A, storageSlot: SLOT_Y }); + const res = await noteStore.getNotes({ contractAddress: CONTRACT_A, storageSlot: SLOT_Y }, 'test'); expect(new Set(getNullifiers(res))).toEqual(new Set([note2.siloedNullifier.toBigInt()])); }); it('filters notes matching contractAddress in the specified scope', async () => { - const res = await provider.getNotes({ contractAddress: CONTRACT_B, scopes: [SCOPE_2] }); + const res = await noteStore.getNotes({ contractAddress: CONTRACT_B, scopes: [SCOPE_2] }, 'test'); expect(new Set(getNullifiers(res))).toEqual(new Set([note3.siloedNullifier.toBigInt()])); }); @@ -158,12 +160,15 @@ describe('NoteStore', () => { storageSlot: SLOT_X, siloedNullifier: note4Nullifier, }); - await provider.addNotes([note4], SCOPE_2); + await noteStore.addNotes([note4], SCOPE_2, 'test'); - const res = await provider.getNotes({ - contractAddress: CONTRACT_A, - scopes: [SCOPE_1, SCOPE_2], - }); + const res = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + scopes: [SCOPE_1, SCOPE_2], + }, + 'test', + ); expect(new Set(getNullifiers(res))).toEqual( new Set([note1.siloedNullifier.toBigInt(), note2.siloedNullifier.toBigInt(), note4Nullifier.toBigInt()]), @@ -172,12 +177,15 @@ describe('NoteStore', () => { it('deduplicates notes that appear in multiple scopes', async () => { // note 1 has been added to scope 1 in setup so we add it to scope 2 to then be able to test deduplication - await provider.addNotes([note1], SCOPE_2); + await noteStore.addNotes([note1], SCOPE_2, 'test'); - const res = await provider.getNotes({ - contractAddress: CONTRACT_A, - scopes: [SCOPE_1, SCOPE_2], - }); + const res = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + scopes: [SCOPE_1, SCOPE_2], + }, + 'test', + ); // Note 1 should be present exactly once in the result const note1Matches = res.filter(n => n.equals(note1)); @@ -186,49 +194,61 @@ describe('NoteStore', () => { it('filters notes by status, returning ACTIVE by default and both ACTIVE and NULLIFIED when requested', async () => { const nullifiers = [mkNullifier(note2)]; - await expect(provider.applyNullifiers(nullifiers)).resolves.toEqual([note2]); + await expect(noteStore.applyNullifiers(nullifiers, 'test')).resolves.toEqual([note2]); - const resActive = await provider.getNotes({ contractAddress: CONTRACT_A }); + const resActive = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'test'); expect(new Set(getNullifiers(resActive))).toEqual(new Set([note1.siloedNullifier.toBigInt()])); - const resAll = await provider.getNotes({ - contractAddress: CONTRACT_A, - status: NoteStatus.ACTIVE_OR_NULLIFIED, - }); + const resAll = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + status: NoteStatus.ACTIVE_OR_NULLIFIED, + }, + 'test', + ); expect(new Set(getNullifiers(resAll))).toEqual( new Set([note1.siloedNullifier.toBigInt(), note2.siloedNullifier.toBigInt()]), ); }); it('returns only notes that match all provided filters', async () => { - const res = await provider.getNotes({ - contractAddress: CONTRACT_A, - storageSlot: SLOT_X, - scopes: [SCOPE_1], - }); + const res = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + storageSlot: SLOT_X, + scopes: [SCOPE_1], + }, + 'test', + ); expect(new Set(getNullifiers(res))).toEqual(new Set([note1.siloedNullifier.toBigInt()])); }); it('applies scope filtering to nullified notes', async () => { const nullifiers = [mkNullifier(note3)]; - await expect(provider.applyNullifiers(nullifiers)).resolves.toEqual([note3]); + await expect(noteStore.applyNullifiers(nullifiers, 'test')).resolves.toEqual([note3]); // Query for contractB, but with the wrong scope (scope1) - const res = await provider.getNotes({ - contractAddress: CONTRACT_B, - scopes: [SCOPE_1], - status: NoteStatus.ACTIVE_OR_NULLIFIED, - }); + const res = await noteStore.getNotes( + { + contractAddress: CONTRACT_B, + scopes: [SCOPE_1], + status: NoteStatus.ACTIVE_OR_NULLIFIED, + }, + 'test', + ); expect(res).toHaveLength(0); // Query for contractB with the correct scope (scope2) - const res2 = await provider.getNotes({ - contractAddress: CONTRACT_B, - scopes: [SCOPE_2], - status: NoteStatus.ACTIVE_OR_NULLIFIED, - }); + const res2 = await noteStore.getNotes( + { + contractAddress: CONTRACT_B, + scopes: [SCOPE_2], + status: NoteStatus.ACTIVE_OR_NULLIFIED, + }, + 'test', + ); expect(new Set(getNullifiers(res2))).toEqual(new Set([note3.siloedNullifier.toBigInt()])); }); @@ -239,25 +259,28 @@ describe('NoteStore', () => { siloedNullifier: note1.siloedNullifier, }; - const res = await provider.getNotes(filter); + const res = await noteStore.getNotes(filter, 'test'); expect(new Set(getNullifiers(res))).toEqual(new Set([note1.siloedNullifier.toBigInt()])); // Test with a different note's siloedNullifier - const res2 = await provider.getNotes({ - contractAddress: CONTRACT_A, - siloedNullifier: note2.siloedNullifier, - }); + const res2 = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + siloedNullifier: note2.siloedNullifier, + }, + 'test', + ); expect(new Set(getNullifiers(res2))).toEqual(new Set([note2.siloedNullifier.toBigInt()])); }); }); describe('NoteStore.getNotes filtering edge cases', () => { let store: AztecLMDBStoreV2; - let provider: NoteStore; + let noteStore: NoteStore; let note2: NoteDao; beforeEach(async () => { - ({ store, provider, note2 } = await setupProviderWithNotes('note_store_get_notes_edge')); + ({ store, noteStore, note2 } = await setupProviderWithNotes('note_store_get_notes_edge')); }); afterEach(async () => { @@ -265,28 +288,28 @@ describe('NoteStore', () => { }); it('returns no notes when filtering by non-existing contractAddress', async () => { - const res = await provider.getNotes({ contractAddress: FAKE_ADDRESS }); + const res = await noteStore.getNotes({ contractAddress: FAKE_ADDRESS }, 'test'); expect(getNullifiers(res)).toHaveLength(0); }); it('returns no notes when filtering by non-existing storageSlot', async () => { - const res = await provider.getNotes({ contractAddress: CONTRACT_A, storageSlot: NON_EXISTING_SLOT }); + const res = await noteStore.getNotes({ contractAddress: CONTRACT_A, storageSlot: NON_EXISTING_SLOT }, 'test'); expect(res).toHaveLength(0); }); it('filters notes matching contractAddress in the specified scope', async () => { - const res = await provider.getNotes({ contractAddress: CONTRACT_A, scopes: [SCOPE_2] }); + const res = await noteStore.getNotes({ contractAddress: CONTRACT_A, scopes: [SCOPE_2] }, 'test'); expect(res).toHaveLength(0); }); it('throws when filtering with a scope not present in the PXE database', async () => { - await expect(provider.getNotes({ contractAddress: CONTRACT_A, scopes: [FAKE_ADDRESS] })).rejects.toThrow( + await expect(noteStore.getNotes({ contractAddress: CONTRACT_A, scopes: [FAKE_ADDRESS] }, 'test')).rejects.toThrow( 'Trying to get incoming notes of a scope that is not in the PXE database', ); }); it('throws when called with an empty scopes array', async () => { - await expect(provider.getNotes({ contractAddress: CONTRACT_A, scopes: [] })).rejects.toThrow( + await expect(noteStore.getNotes({ contractAddress: CONTRACT_A, scopes: [] }, 'test')).rejects.toThrow( 'Trying to get notes with an empty scopes array', ); }); @@ -297,7 +320,7 @@ describe('NoteStore', () => { siloedNullifier: NON_EXISTING_SLOT, }; - const res = await provider.getNotes(filter); + const res = await noteStore.getNotes(filter, 'test'); expect(res).toHaveLength(0); }); @@ -307,20 +330,20 @@ describe('NoteStore', () => { siloedNullifier: note2.siloedNullifier, }; - const res = await provider.getNotes(filter); + const res = await noteStore.getNotes(filter, 'test'); expect(res).toHaveLength(0); }); }); describe('NoteStore.applyNullifiers happy path', () => { let store: AztecLMDBStoreV2; - let provider: NoteStore; + let noteStore: NoteStore; let note1: NoteDao; let note2: NoteDao; let note3: NoteDao; beforeEach(async () => { - ({ store, provider, note1, note2, note3 } = await setupProviderWithNotes('note_store_apply_nullifiers_happy')); + ({ store, noteStore, note1, note2, note3 } = await setupProviderWithNotes('note_store_apply_nullifiers_happy')); }); afterEach(async () => { @@ -328,19 +351,22 @@ describe('NoteStore', () => { }); it('returns empty array when given empty nullifiers array', async () => { - const result = await provider.applyNullifiers([]); + const result = await noteStore.applyNullifiers([], 'test'); expect(result).toEqual([]); }); it('nullifies a single note and moves it from active to nullified', async () => { - const result = await provider.applyNullifiers([mkNullifier(note1)]); + const result = await noteStore.applyNullifiers([mkNullifier(note1)], 'test'); expect(result).toEqual([note1]); - const active = await provider.getNotes({ contractAddress: CONTRACT_A }); - const all = await provider.getNotes({ - contractAddress: CONTRACT_A, - status: NoteStatus.ACTIVE_OR_NULLIFIED, - }); + const active = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'test'); + const all = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + status: NoteStatus.ACTIVE_OR_NULLIFIED, + }, + 'test', + ); expect(new Set(getNullifiers(active))).toEqual(new Set([note2.siloedNullifier.toBigInt()])); expect(new Set(getNullifiers(all))).toEqual( @@ -350,10 +376,10 @@ describe('NoteStore', () => { it('nullifies multiple notes and returns them', async () => { const nullifiers = [mkNullifier(note1), mkNullifier(note3)]; - const result = await provider.applyNullifiers(nullifiers); + const result = await noteStore.applyNullifiers(nullifiers, 'test'); - const activeA = await provider.getNotes({ contractAddress: CONTRACT_A }); - const activeB = await provider.getNotes({ contractAddress: CONTRACT_B }); + const activeA = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'test'); + const activeB = await noteStore.getNotes({ contractAddress: CONTRACT_B }, 'test'); expect(result).toEqual([note1, note3]); // returned nullified notes expect(new Set(getNullifiers(activeA))).toEqual(new Set([note2.siloedNullifier.toBigInt()])); // note2 remains active @@ -361,7 +387,7 @@ describe('NoteStore', () => { }); it('retrieves a nullified note by its siloedNullifier when status is ACTIVE_OR_NULLIFIED', async () => { - await provider.applyNullifiers([mkNullifier(note2)]); + await noteStore.applyNullifiers([mkNullifier(note2)], 'test'); const filter = { contractAddress: CONTRACT_A, @@ -369,133 +395,273 @@ describe('NoteStore', () => { status: NoteStatus.ACTIVE_OR_NULLIFIED, }; - const res = await provider.getNotes(filter); + const res = await noteStore.getNotes(filter, 'test'); expect(new Set(getNullifiers(res))).toEqual(new Set([note2.siloedNullifier.toBigInt()])); }); }); describe('NoteStore.applyNullifiers edge cases', () => { let store: AztecLMDBStoreV2; - let provider: NoteStore; + let noteStore: NoteStore; let note1: NoteDao; let note2: NoteDao; beforeEach(async () => { - ({ store, provider, note1, note2 } = await setupProviderWithNotes('note_store_apply_nullifiers_edge')); + ({ store, noteStore, note1, note2 } = await setupProviderWithNotes('note_store_apply_nullifiers_edge')); }); afterEach(async () => { await store.close(); }); - it('throws error when nullifier is not found', async () => { + it('silently skips unknown nullifiers', async () => { const fakeNullifier = { data: Fr.random(), l2BlockNumber: BlockNumber(999), l2BlockHash: L2BlockHash.random(), }; - await expect(provider.applyNullifiers([fakeNullifier])).rejects.toThrow('Nullifier not found in applyNullifiers'); + // Unknown nullifiers are silently skipped (idempotent behavior) + const result = await noteStore.applyNullifiers([fakeNullifier], 'test'); + expect(result).toEqual([]); }); it('preserves scope information when nullifying notes', async () => { const nullifiers = [mkNullifier(note1)]; - await provider.applyNullifiers(nullifiers); + await noteStore.applyNullifiers(nullifiers, 'test'); // Verify nullified note remains visible only within its original scope - const wrongScopeNotes = await provider.getNotes({ - contractAddress: CONTRACT_A, - scopes: [SCOPE_2], - status: NoteStatus.ACTIVE_OR_NULLIFIED, - }); + const wrongScopeNotes = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + scopes: [SCOPE_2], + status: NoteStatus.ACTIVE_OR_NULLIFIED, + }, + 'test', + ); expect(getNullifiers(wrongScopeNotes)).not.toContain(note1.siloedNullifier.toBigInt()); - const correctScopeNotes = await provider.getNotes({ - contractAddress: CONTRACT_A, - scopes: [SCOPE_1], - status: NoteStatus.ACTIVE_OR_NULLIFIED, - }); + const correctScopeNotes = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + scopes: [SCOPE_1], + status: NoteStatus.ACTIVE_OR_NULLIFIED, + }, + 'test', + ); expect(getNullifiers(correctScopeNotes)).toContain(note1.siloedNullifier.toBigInt()); }); - it('is atomic - fails entirely if any nullifier is invalid', async () => { - // Should fail entirely: note1 remains active because transaction is atomic. + it('skips invalid nullifiers and processes valid ones', async () => { + // Invalid nullifiers are skipped, valid ones are processed (idempotent behavior) const nullifiers = [ mkNullifier(note2), { - data: Fr.random(), // Invalid + data: Fr.random(), // Unknown - will be skipped l2BlockNumber: BlockNumber(999), l2BlockHash: L2BlockHash.random(), }, ]; - await expect(provider.applyNullifiers(nullifiers)).rejects.toThrow(); + const result = await noteStore.applyNullifiers(nullifiers, 'test'); - // Verify note1 is still active (transaction rolled back) - const activeNotes = await provider.getNotes({ contractAddress: CONTRACT_A }); - expect(new Set(getNullifiers(activeNotes))).toEqual( - new Set([note1.siloedNullifier.toBigInt(), note2.siloedNullifier.toBigInt()]), - ); + // Only the valid nullifier (note2) was processed + expect(result).toEqual([note2]); + + // Verify note2 is now nullified while note1 is still active + const activeNotes = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'test'); + expect(new Set(getNullifiers(activeNotes))).toEqual(new Set([note1.siloedNullifier.toBigInt()])); }); it('updates all relevant indexes when nullifying notes', async () => { const nullifiers = [mkNullifier(note1)]; - await provider.applyNullifiers(nullifiers); + await noteStore.applyNullifiers(nullifiers, 'test'); // Test various filter combinations still work - const byContract = await provider.getNotes({ - contractAddress: CONTRACT_A, - status: NoteStatus.ACTIVE_OR_NULLIFIED, - }); + const byContract = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + status: NoteStatus.ACTIVE_OR_NULLIFIED, + }, + 'test', + ); expect(new Set(getNullifiers(byContract))).toEqual( new Set([note1.siloedNullifier.toBigInt(), note2.siloedNullifier.toBigInt()]), ); - const bySlot = await provider.getNotes({ - contractAddress: CONTRACT_A, - storageSlot: note1.storageSlot, - status: NoteStatus.ACTIVE_OR_NULLIFIED, - }); + const bySlot = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + storageSlot: note1.storageSlot, + status: NoteStatus.ACTIVE_OR_NULLIFIED, + }, + 'test', + ); expect(new Set(getNullifiers(bySlot))).toEqual(new Set([note1.siloedNullifier.toBigInt()])); - const byScope = await provider.getNotes({ - contractAddress: CONTRACT_A, - scopes: [SCOPE_1], - status: NoteStatus.ACTIVE_OR_NULLIFIED, - }); + const byScope = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + scopes: [SCOPE_1], + status: NoteStatus.ACTIVE_OR_NULLIFIED, + }, + 'test', + ); expect(new Set(getNullifiers(byScope))).toEqual( new Set([note1.siloedNullifier.toBigInt(), note2.siloedNullifier.toBigInt()]), ); }); - it('attempts to nullify the same note twice in succession results in error', async () => { - await provider.applyNullifiers([mkNullifier(note1)]); // First application should succeed - const activeNotes = await provider.getNotes({ contractAddress: CONTRACT_A }); + it('silently skips already nullified notes when called twice in succession', async () => { + await noteStore.applyNullifiers([mkNullifier(note1)], 'test'); // First application should succeed + const activeNotes = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'test'); expect(new Set(getNullifiers(activeNotes))).toEqual(new Set([note2.siloedNullifier.toBigInt()])); - // should throw on second attempt as note1 is already nullified - await expect(provider.applyNullifiers([mkNullifier(note1)])).rejects.toThrow( - 'Nullifier already applied in applyNullifiers', - ); + // Second attempt is silently skipped (idempotent behavior) + const result = await noteStore.applyNullifiers([mkNullifier(note1)], 'test'); + expect(result).toEqual([]); }); - it('attempts to nullify the same note twice in same call results in error', async () => { + it('deduplicates nullifiers in the same call', async () => { const nullifiers = [mkNullifier(note1), mkNullifier(note1)]; - await expect(provider.applyNullifiers(nullifiers)).rejects.toThrow( - 'Nullifier already applied in applyNullifiers', + // Only the first occurrence is processed, duplicates are skipped + const result = await noteStore.applyNullifiers(nullifiers, 'test'); + expect(result).toEqual([note1]); + + // Verify the note is nullified + const activeNotes = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'test'); + expect(new Set(getNullifiers(activeNotes))).toEqual(new Set([note2.siloedNullifier.toBigInt()])); + }); + + it('can nullify a freshly added note in the same job without committing first', async () => { + // This test simulates the deliverNote flow where a note is added and immediately nullified + // without committing first (when the note is discovered to already be nullified on-chain) + const freshNullifier = Fr.random(); + const freshNote = await mkNote({ + contractAddress: CONTRACT_A, + storageSlot: SLOT_X, + siloedNullifier: freshNullifier, + }); + + // Add note to stage without committing + await noteStore.addNotes([freshNote], SCOPE_1, 'fresh-job'); + + // Immediately nullify it in the same job (simulating deliverNote when nullifier exists on-chain) + const nullifiers = [mkNullifier(freshNote)]; + await expect(noteStore.applyNullifiers(nullifiers, 'fresh-job')).resolves.toEqual([freshNote]); + + // Verify note is now in nullified stage + const activeNotes = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'fresh-job'); + expect(getNullifiers(activeNotes)).not.toContain(freshNullifier.toBigInt()); + + const allNotes = await noteStore.getNotes( + { contractAddress: CONTRACT_A, status: NoteStatus.ACTIVE_OR_NULLIFIED }, + 'fresh-job', + ); + expect(getNullifiers(allNotes)).toContain(freshNullifier.toBigInt()); + }); + + it('can handle parallel note additions and nullifications (simulating Promise.all in deliverNote)', async () => { + // This test simulates the scenario in utilityValidateEnqueuedNotesAndEvents where + // multiple deliverNote calls run in parallel via Promise.all + const noteNullifiers = [new Fr(1000n), new Fr(1001n), new Fr(1002n)]; + const notes = await Promise.all([ + mkNote({ contractAddress: CONTRACT_A, storageSlot: SLOT_X, siloedNullifier: noteNullifiers[0] }), + mkNote({ contractAddress: CONTRACT_A, storageSlot: SLOT_X, siloedNullifier: noteNullifiers[1] }), + mkNote({ contractAddress: CONTRACT_A, storageSlot: SLOT_X, siloedNullifier: noteNullifiers[2] }), + ]); + + // Simulate parallel deliverNote calls where each note is added and immediately nullified + const parallelDeliveries = notes.map(async note => { + await noteStore.addNotes([note], SCOPE_1, 'parallel-job'); + const nullifiers = [mkNullifier(note)]; + await noteStore.applyNullifiers(nullifiers, 'parallel-job'); + return note; + }); + + await expect(Promise.all(parallelDeliveries)).resolves.toEqual(notes); + + // Verify all notes are nullified + const activeNotes = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'parallel-job'); + expect(getNullifiers(activeNotes)).not.toContain(noteNullifiers[0].toBigInt()); + expect(getNullifiers(activeNotes)).not.toContain(noteNullifiers[1].toBigInt()); + expect(getNullifiers(activeNotes)).not.toContain(noteNullifiers[2].toBigInt()); + + const allNotes = await noteStore.getNotes( + { contractAddress: CONTRACT_A, status: NoteStatus.ACTIVE_OR_NULLIFIED }, + 'parallel-job', + ); + expect(new Set(getNullifiers(allNotes))).toEqual( + new Set([ + note1.siloedNullifier.toBigInt(), + note2.siloedNullifier.toBigInt(), + ...noteNullifiers.map(n => n.toBigInt()), + ]), ); }); + + it('handles nullification of a KV store note in a new job', async () => { + // Scenario: A note exists in KV store from a previous job, and we want to nullify it in a new job + // This is the syncNoteNullifiers flow where existing notes are checked for nullification + + // note1 is from setup and committed - it's in the KV store + // We should be able to nullify it in a new job + const nullifiers = [mkNullifier(note1)]; + await expect(noteStore.applyNullifiers(nullifiers, 'new-job')).resolves.toEqual([note1]); + + // After committing, the note should be in nullified state + await noteStore.commit('new-job'); + + const activeNotes = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'another-job'); + expect(getNullifiers(activeNotes)).not.toContain(note1.siloedNullifier.toBigInt()); + + const allNotes = await noteStore.getNotes( + { contractAddress: CONTRACT_A, status: NoteStatus.ACTIVE_OR_NULLIFIED }, + 'another-job', + ); + expect(getNullifiers(allNotes)).toContain(note1.siloedNullifier.toBigInt()); + }); + + it('handles duplicate note delivery gracefully (same note added and nullified twice)', async () => { + // This scenario can happen during parallel note delivery via Promise.all in deliverNote + // when the same note is somehow processed twice (e.g., duplicate log entries) + const duplicateNullifier = new Fr(9999n); + const duplicateNote = await mkNote({ + contractAddress: CONTRACT_A, + storageSlot: SLOT_X, + siloedNullifier: duplicateNullifier, + }); + + // First delivery: add and nullify the note + await noteStore.addNotes([duplicateNote], SCOPE_1, 'duplicate-job'); + await noteStore.applyNullifiers([mkNullifier(duplicateNote)], 'duplicate-job'); + + // Second delivery (duplicate): try to add the same note again - should not throw + // This simulates what happens in parallel deliverNote when the same note is processed twice + await noteStore.addNotes([duplicateNote], SCOPE_2, 'duplicate-job'); + + // The second applyNullifiers is silently skipped since the note is already nullified (idempotent) + const result = await noteStore.applyNullifiers([mkNullifier(duplicateNote)], 'duplicate-job'); + expect(result).toEqual([]); + + // Verify the note is nullified and has both scopes + const allNotes = await noteStore.getNotes( + { contractAddress: CONTRACT_A, status: NoteStatus.ACTIVE_OR_NULLIFIED }, + 'duplicate-job', + ); + expect(getNullifiers(allNotes)).toContain(duplicateNullifier.toBigInt()); + }); }); describe('NoteStore.rollback', () => { - let provider: NoteStore; + let noteStore: NoteStore; let store: AztecLMDBStoreV2; beforeEach(async () => { store = await openTmpStore('note_store_rollback_test'); - provider = await NoteStore.create(store); - await provider.addScope(SCOPE_1); - await provider.addScope(SCOPE_2); + noteStore = await NoteStore.create(store); + await noteStore.addScope(SCOPE_1); + await noteStore.addScope(SCOPE_2); }); afterEach(async () => { @@ -515,7 +681,7 @@ describe('NoteStore', () => { const noteBlock5Nullifier = Fr.random(); noteBlock5 = await mkNote({ siloedNullifier: noteBlock5Nullifier, l2BlockNumber: BlockNumber(5) }); // Created after rollback block 3 - await provider.addNotes([noteBlock1, noteBlock2, noteBlock3, noteBlock5], SCOPE_1); + await noteStore.addNotes([noteBlock1, noteBlock2, noteBlock3, noteBlock5], SCOPE_1, 'rollback-scenario-setup'); const nullifiers = [ mkNullifier(noteBlock1, BlockNumber(2)), @@ -525,8 +691,10 @@ describe('NoteStore', () => { // Apply nullifiers and rollback to block 3 // - should restore noteBlock3 (nullified at block 4) and preserve noteBlock1 (nullified at block 2) - await provider.applyNullifiers(nullifiers); - await provider.rollback(3, 6); + await noteStore.applyNullifiers(nullifiers, 'rollback-scenario-setup'); + await noteStore.commit('rollback-scenario-setup'); + + await noteStore.rollback(3, 6); } beforeEach(async () => { @@ -535,17 +703,20 @@ describe('NoteStore', () => { it('restores notes that were nullified after the rollback block', async () => { // noteBlock2 remains active, noteBlock3 was nullified at block 4 should be restored - const activeNotes = await provider.getNotes({ contractAddress: CONTRACT_A }); + const activeNotes = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'test'); expect(new Set(getNullifiers(activeNotes))).toEqual( new Set([noteBlock2.siloedNullifier.toBigInt(), noteBlock3.siloedNullifier.toBigInt()]), ); }); it('preserves nullification of notes nullified at or before the rollback block', async () => { - const allNotes = await provider.getNotes({ - contractAddress: CONTRACT_A, - status: NoteStatus.ACTIVE_OR_NULLIFIED, - }); + const allNotes = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + status: NoteStatus.ACTIVE_OR_NULLIFIED, + }, + 'test', + ); // Should contain noteBlock1 (nullified), noteBlock2 (active), and noteBlock3 (restored) expect(new Set(getNullifiers(allNotes))).toEqual( @@ -557,24 +728,27 @@ describe('NoteStore', () => { ); // Verify noteBlock1 is not in active notes - const activeNotes = await provider.getNotes({ contractAddress: CONTRACT_A }); + const activeNotes = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'test'); const activeIndexes = getNullifiers(activeNotes); expect(activeIndexes).not.toEqual(expect.arrayContaining([noteBlock1.siloedNullifier.toBigInt()])); }); it('preserves active notes created before the rollback block that were never nullified', async () => { // noteBlock2 was created at block 2 (before rollback block 3) and never nullified - const activeNotes = await provider.getNotes({ contractAddress: CONTRACT_A }); + const activeNotes = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'test'); expect(new Set(getNullifiers(activeNotes))).toEqual( new Set([noteBlock2.siloedNullifier.toBigInt(), noteBlock3.siloedNullifier.toBigInt()]), ); }); it('deletes notes created after the rollback block', async () => { - const allNotes = await provider.getNotes({ - contractAddress: CONTRACT_A, - status: NoteStatus.ACTIVE_OR_NULLIFIED, - }); + const allNotes = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + status: NoteStatus.ACTIVE_OR_NULLIFIED, + }, + 'test', + ); // noteBlock5 was created at block 5, which is after rollback block 3, should be deleted const indexes = getNullifiers(allNotes); @@ -593,7 +767,7 @@ describe('NoteStore', () => { it('handles rollback when blockNumber equals synchedBlockNumber', async () => { const noteNullifier = Fr.random(); const note = await mkNote({ siloedNullifier: noteNullifier, l2BlockNumber: BlockNumber(5) }); - await provider.addNotes([note], SCOPE_1); + await noteStore.addNotes([note], SCOPE_1, 'test'); const nullifiers = [ { @@ -602,26 +776,30 @@ describe('NoteStore', () => { l2BlockHash: L2BlockHash.fromString(note.l2BlockHash), }, ]; - await provider.applyNullifiers(nullifiers); + await noteStore.applyNullifiers(nullifiers, 'test'); // Since nullification happened at block 5 (not after), it should stay nullified // The rewind loop processes blocks (blockNumber+1) to synchedBlockNumber = 6 to 5 = no iterations - await provider.rollback(5, 5); + await noteStore.commit('test'); + await noteStore.rollback(5, 5); - const activeNotes = await provider.getNotes({ contractAddress: CONTRACT_A }); + const activeNotes = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'test'); expect(activeNotes).toHaveLength(0); - const allNotes = await provider.getNotes({ - contractAddress: CONTRACT_A, - status: NoteStatus.ACTIVE_OR_NULLIFIED, - }); + const allNotes = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + status: NoteStatus.ACTIVE_OR_NULLIFIED, + }, + 'test', + ); expect(new Set(getNullifiers(allNotes))).toEqual(new Set([noteNullifier.toBigInt()])); }); it('handles rollback when synchedBlockNumber < blockNumber', async () => { const noteNullifier = Fr.random(); const note = await mkNote({ siloedNullifier: noteNullifier, l2BlockNumber: BlockNumber(3) }); - await provider.addNotes([note], SCOPE_1); + await noteStore.addNotes([note], SCOPE_1, 'test'); const nullifiers = [ { @@ -630,18 +808,22 @@ describe('NoteStore', () => { l2BlockHash: L2BlockHash.fromString(note.l2BlockHash), }, ]; - await provider.applyNullifiers(nullifiers); + await noteStore.applyNullifiers(nullifiers, 'test'); // blockNumber=6, synchedBlockNumber=4 therefore no nullifications to rewind - await provider.rollback(6, 4); + await noteStore.commit('test'); + await noteStore.rollback(6, 4); - const activeNotes = await provider.getNotes({ contractAddress: CONTRACT_A }); + const activeNotes = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'test'); expect(activeNotes).toHaveLength(0); - const allNotes = await provider.getNotes({ - contractAddress: CONTRACT_A, - status: NoteStatus.ACTIVE_OR_NULLIFIED, - }); + const allNotes = await noteStore.getNotes( + { + contractAddress: CONTRACT_A, + status: NoteStatus.ACTIVE_OR_NULLIFIED, + }, + 'test', + ); expect(new Set(getNullifiers(allNotes))).toEqual(new Set([noteNullifier.toBigInt()])); }); @@ -650,7 +832,7 @@ describe('NoteStore', () => { const note2Nullifier = Fr.random(); const note1 = await mkNote({ siloedNullifier: note1Nullifier, l2BlockNumber: BlockNumber(5) }); const note2 = await mkNote({ siloedNullifier: note2Nullifier, l2BlockNumber: BlockNumber(10) }); - await provider.addNotes([note1, note2], SCOPE_1); + await noteStore.addNotes([note1, note2], SCOPE_1, 'test'); const nullifiers = [ { @@ -659,18 +841,19 @@ describe('NoteStore', () => { l2BlockHash: L2BlockHash.fromString(note1.l2BlockHash), }, ]; - await provider.applyNullifiers(nullifiers); - await provider.rollback(5, 100); + await noteStore.applyNullifiers(nullifiers, 'test'); + await noteStore.commit('test'); + await noteStore.rollback(5, 100); // note1 should be restored (nullified at block 7 > rollback block 5) // note2 should be deleted (created at block 10 > rollback block 5) - const activeNotes = await provider.getNotes({ contractAddress: CONTRACT_A }); + const activeNotes = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'test'); expect(new Set(getNullifiers(activeNotes))).toEqual(new Set([note1Nullifier.toBigInt()])); }); it('handles rollback on empty PXE database gracefully', async () => { - await expect(provider.rollback(10, 20)).resolves.not.toThrow(); - const notes = await provider.getNotes({ contractAddress: CONTRACT_A }); + await expect(noteStore.rollback(10, 20)).resolves.not.toThrow(); + const notes = await noteStore.getNotes({ contractAddress: CONTRACT_A }, 'test'); expect(notes).toHaveLength(0); }); }); diff --git a/yarn-project/pxe/src/storage/note_store/note_store.ts b/yarn-project/pxe/src/storage/note_store/note_store.ts index a45404eacff4..47433149830e 100644 --- a/yarn-project/pxe/src/storage/note_store/note_store.ts +++ b/yarn-project/pxe/src/storage/note_store/note_store.ts @@ -1,18 +1,23 @@ import type { Fr } from '@aztec/foundation/curves/bn254'; import { toArray } from '@aztec/foundation/iterable'; +import { Semaphore } from '@aztec/foundation/queue'; import type { AztecAsyncKVStore, AztecAsyncMap, AztecAsyncMultiMap } from '@aztec/kv-store'; import { AztecAddress } from '@aztec/stdlib/aztec-address'; import type { DataInBlock } from '@aztec/stdlib/block'; import { NoteStatus, type NotesFilter } from '@aztec/stdlib/note'; import { NoteDao } from '@aztec/stdlib/note'; +import type { StagedStore } from '../../job_coordinator/job_coordinator.js'; + /** * NoteStore manages the storage and retrieval of notes. * * Notes can be active or nullified. This class processes new notes, nullifications, * and performs rollback handling in the case of a reorg. **/ -export class NoteStore { +export class NoteStore implements StagedStore { + readonly storeName: string = 'note'; + #store: AztecAsyncKVStore; // Note that we use the siloedNullifier as the note id in the store as it's guaranteed to be unique. @@ -40,6 +45,18 @@ export class NoteStore { /** scope -> MultiMap(storageSlot -> noteId) */ #notesByStorageSlotAndScope: Map>; + // jobId => noteIndex => { note, scopes } + #notesForJob: Map }>>; + + // jobId => noteIndex => { note, scopes } (for nullified notes) + #nullifiedNotesForJob: Map>; + + // jobId => scope => true + #scopesForJob: Map>; + + // Per-job locks to serialize operations within the same job + #jobLocks: Map = new Map(); + private constructor(store: AztecAsyncKVStore) { this.#store = store; this.#notes = store.openMap('notes'); @@ -54,6 +71,267 @@ export class NoteStore { this.#notesToScope = store.openMultiMap('notes_to_scope'); this.#notesByContractAndScope = new Map>(); this.#notesByStorageSlotAndScope = new Map>(); + + this.#notesForJob = new Map(); + this.#nullifiedNotesForJob = new Map(); + this.#scopesForJob = new Map(); + } + + #getNotesForJob(jobId: string): Map }> { + let jobInStage = this.#notesForJob.get(jobId); + if (!jobInStage) { + jobInStage = new Map(); + this.#notesForJob.set(jobId, jobInStage); + } + return jobInStage; + } + + #getScopesForJob(jobId: string): Map { + let scopesInJobStage = this.#scopesForJob.get(jobId); + if (!scopesInJobStage) { + scopesInJobStage = new Map(); + this.#scopesForJob.set(jobId, scopesInJobStage); + } + return scopesInJobStage; + } + + #getNullifiedNotesForJob(jobId: string): Map { + let jobInStage = this.#nullifiedNotesForJob.get(jobId); + if (!jobInStage) { + jobInStage = new Map(); + this.#nullifiedNotesForJob.set(jobId, jobInStage); + } + return jobInStage; + } + + async #withJobLock(jobId: string, fn: () => Promise): Promise { + let lock = this.#jobLocks.get(jobId); + if (!lock) { + lock = new Semaphore(1); + this.#jobLocks.set(jobId, lock); + } + await lock.acquire(); + try { + return await fn(); + } finally { + lock.release(); + } + } + + async #addNote(jobId: string, noteIndex: string, noteDao: NoteDao, scope: string) { + // If this note was already nullified in this job, just add the scope to it + const nullifiedEntry = this.#getNullifiedNotesForJob(jobId).get(noteIndex); + if (nullifiedEntry) { + if (!nullifiedEntry.scopes.includes(scope)) { + nullifiedEntry.scopes.push(scope); + } + return; + } + + // If this note was already nullified in a previous job (committed to permanent storage), skip it + // We don't want to re-add nullified notes to the active notes + const permanentlyNullified = await this.#nullifiedNotes.getAsync(noteIndex); + if (permanentlyNullified) { + return; + } + + const staged = this.#getNotesForJob(jobId); + const existing = staged.get(noteIndex); + if (existing) { + existing.scopes.add(scope); + } else { + staged.set(noteIndex, { note: noteDao, scopes: new Set([scope]) }); + } + } + + #addNullifiedNote(jobId: string, noteIndex: string, noteDao: NoteDao, scopes: string[], blockNumber: number) { + this.#getNullifiedNotesForJob(jobId).set(noteIndex, { note: noteDao, scopes, blockNumber }); + } + + #deleteNote(jobId: string, noteIndex: string) { + this.#getNotesForJob(jobId).delete(noteIndex); + } + + async #readNoteBySiloedNullifier(jobId: string, noteIndex: string): Promise { + if (this.#getNullifiedNotesForJob(jobId).has(noteIndex)) { + return undefined; + } + const noteInStage = this.#getNotesForJob(jobId).get(noteIndex); + if (noteInStage) { + return noteInStage.note; + } + const storedNote = await this.#notes.getAsync(noteIndex); + return storedNote ? NoteDao.fromBuffer(storedNote) : undefined; + } + + async #readNullifiedNoteById(jobId: string, noteIndex: string): Promise { + const nullifiedNoteForJob = this.#getNullifiedNotesForJob(jobId).get(noteIndex); + if (nullifiedNoteForJob) { + return nullifiedNoteForJob.note; + } + const persistedNote = await this.#nullifiedNotes.getAsync(noteIndex); + return persistedNote ? NoteDao.fromBuffer(persistedNote) : undefined; + } + + #readScopesByNote(jobId: string, noteIndex: string): Promise { + if (this.#getNullifiedNotesForJob(jobId).has(noteIndex)) { + return Promise.resolve([]); + } + const noteForJob = this.#getNotesForJob(jobId).get(noteIndex); + if (noteForJob) { + return Promise.resolve([...noteForJob.scopes]); + } + return toArray(this.#notesToScope.getValuesAsync(noteIndex)); + } + + async #readScopes(jobId: string): Promise> { + const persistedScopes = await toArray(this.#scopes.keysAsync()); + const scopesForJob = this.#getScopesForJob(jobId); + return new Set([...(scopesForJob.keys() ?? []), ...persistedScopes]); + } + + async #addScope(jobId: string, scope: string) { + if (!(await this.#readScopes(jobId)).has(scope)) { + this.#getScopesForJob(jobId).set(scope, true); + } + } + + // TODO: this should return a Set + async #readNotesByStorageSlotAndScope(jobId: string, scope: string, slot: Fr): Promise { + const nullifiedNotesForJob = this.#getNullifiedNotesForJob(jobId); + const persistedNotesBySlotAndScope = this.#notesByStorageSlotAndScope.get(scope); + + // Read matching persisted notes, but remove the ones nullified during the current job + const foundNotes = persistedNotesBySlotAndScope + ? (await toArray(persistedNotesBySlotAndScope.getValuesAsync(slot.toString()))).filter( + noteIndex => !nullifiedNotesForJob.has(noteIndex), + ) + : []; + for (const [noteIndex, { note, scopes }] of this.#getNotesForJob(jobId)) { + if (scopes.has(scope) && note.storageSlot.equals(slot)) { + foundNotes.push(noteIndex); + } + } + return foundNotes; + } + + // TODO: this should return a Set + async #readNotesByContractAndScope(jobId: string, scope: string, contractAddress: AztecAddress): Promise { + const nullifiedNotesForJob = this.#getNullifiedNotesForJob(jobId); + const persistedNotesByContractAndScope = this.#notesByContractAndScope.get(scope); + + // Read matching persisted notes, but remove the ones nullified during the current job + const foundNotes = persistedNotesByContractAndScope + ? (await toArray(persistedNotesByContractAndScope.getValuesAsync(contractAddress.toString()))).filter( + noteIndex => !nullifiedNotesForJob.has(noteIndex), + ) + : []; + for (const [noteIndex, { note, scopes }] of this.#getNotesForJob(jobId)) { + if (scopes.has(scope) && note.contractAddress.equals(contractAddress)) { + foundNotes.push(noteIndex); + } + } + return foundNotes; + } + + // TODO: this should return a Set + async #readNullifiedNotesByStorageSlotAndScope(jobId: string, scope: string, storageSlot: Fr): Promise { + const foundNotes: string[] = []; + + // Get from storage and filter by scope + for await (const noteIndex of this.#nullifiedNotesByStorageSlot.getValuesAsync(storageSlot.toString())) { + const scopeList = await toArray(this.#nullifiedNotesToScope.getValuesAsync(noteIndex)); + if (scopeList.includes(scope)) { + foundNotes.push(noteIndex); + } + } + + // Check staged nullified notes + for (const [noteIndex, { note, scopes }] of this.#getNullifiedNotesForJob(jobId)) { + if (note.storageSlot.equals(storageSlot) && scopes.includes(scope)) { + foundNotes.push(noteIndex); + } + } + + return foundNotes; + } + + // TODO: this should return a set + async #readNullifiedNotesByContractAndScope( + jobId: string, + scope: string, + contractAddress: AztecAddress, + ): Promise { + const foundNotes: string[] = []; + + // Persisted nullified notes + for await (const noteIndex of this.#nullifiedNotesByContract.getValuesAsync(contractAddress.toString())) { + const scopeList = await toArray(this.#nullifiedNotesToScope.getValuesAsync(noteIndex)); + if (scopeList.includes(scope)) { + foundNotes.push(noteIndex); + } + } + + // Notes nullified during current job + for (const [noteIndex, { note, scopes }] of this.#getNullifiedNotesForJob(jobId)) { + if (note.contractAddress.equals(contractAddress) && scopes.includes(scope)) { + foundNotes.push(noteIndex); + } + } + + return foundNotes; + } + + /// TODO: promise-all-ify + async commit(jobId: string): Promise { + await this.#withJobLock(jobId, async () => { + // Commit scopes + for (const scope of this.#getScopesForJob(jobId).keys()) { + await this.addScope(AztecAddress.fromString(scope)); + } + + // Commit notes + for (const [noteId, { note, scopes }] of this.#getNotesForJob(jobId)) { + await this.#notes.set(noteId, note.toBuffer()); + for (const scope of scopes) { + await this.#notesToScope.set(noteId, scope); + await this.#notesByContractAndScope.get(scope)!.set(note.contractAddress.toString(), noteId); + await this.#notesByStorageSlotAndScope.get(scope)!.set(note.storageSlot.toString(), noteId); + } + } + + // Delete nullified notes from active notes store and add to nullified indexes + for (const [noteId, { note, scopes, blockNumber }] of this.#getNullifiedNotesForJob(jobId)) { + await this.#notes.delete(noteId); + await this.#notesToScope.delete(noteId); + for (const scope of this.#notesByContractAndScope.keys()) { + await this.#notesByContractAndScope.get(scope)!.deleteValue(note.contractAddress.toString(), noteId); + await this.#notesByStorageSlotAndScope.get(scope)!.deleteValue(note.storageSlot.toString(), noteId); + } + await this.#nullifiedNotes.set(noteId, note.toBuffer()); + await this.#nullifiersByBlockNumber.set(blockNumber, noteId); + await this.#nullifiedNotesByContract.set(note.contractAddress.toString(), noteId); + await this.#nullifiedNotesByStorageSlot.set(note.storageSlot.toString(), noteId); + for (const scope of scopes) { + await this.#nullifiedNotesToScope.set(noteId, scope); + } + } + + this.#notesForJob.delete(jobId); + this.#nullifiedNotesForJob.delete(jobId); + this.#scopesForJob.delete(jobId); + }); + this.#jobLocks.delete(jobId); + } + + async discardStaged(jobId: string): Promise { + await this.#withJobLock(jobId, () => { + this.#notesForJob.delete(jobId); + this.#nullifiedNotesForJob.delete(jobId); + this.#scopesForJob.delete(jobId); + return Promise.resolve(); + }); + this.#jobLocks.delete(jobId); } /** @@ -105,22 +383,25 @@ export class NoteStore { * * @param notes - Notes to store * @param scope - The scope (user/account) under which to store the notes + * @param jobId - The job context for staged writes */ - addNotes(notes: NoteDao[], scope: AztecAddress): Promise { - return this.#store.transactionAsync(async () => { - if (!(await this.#scopes.hasAsync(scope.toString()))) { - await this.addScope(scope); - } - - for (const dao of notes) { - const noteId = dao.siloedNullifier.toString(); - await this.#notes.set(noteId, dao.toBuffer()); - await this.#notesToScope.set(noteId, scope.toString()); + addNotes(notes: NoteDao[], scope: AztecAddress, jobId: string): Promise { + return this.#withJobLock(jobId, () => + this.#store.transactionAsync(async () => { + await this.#addScope(jobId, scope.toString()); + + for (const dao of notes) { + const noteId = dao.siloedNullifier.toString(); + await this.#addNote(jobId, noteId, dao, scope.toString()); + } + }), + ); + } - await this.#notesByContractAndScope.get(scope.toString())!.set(dao.contractAddress.toString(), noteId); - await this.#notesByStorageSlotAndScope.get(scope.toString())!.set(dao.storageSlot.toString(), noteId); - } - }); + async isNoteNullified(note: NoteDao, jobId: string): Promise { + const noteId = note.siloedNullifier.toString(); + const nullifiedEntry = this.#getNullifiedNotesForJob(jobId).get(noteId); + return !!nullifiedEntry || !!(await this.#nullifiedNotes.getAsync(noteId)); } /** @@ -219,6 +500,41 @@ export class NoteStore { } } + async #scopesFilter(jobId: string, filter: NotesFilter): Promise> { + // throw if scopes is an empty array + if (filter.scopes !== undefined && filter.scopes.length === 0) { + throw new Error( + 'Trying to get notes with an empty scopes array. Scopes have to be set to undefined if intending on not filtering by scopes.', + ); + } + + return new Set(filter?.scopes?.map(scope => scope.toString()) ?? (await this.#readScopes(jobId))); + } + + #doesNoteMatchFilter(note: NoteDao, filter: NotesFilter): boolean { + // Match contract address + if (!note?.contractAddress.equals(filter.contractAddress)) { + return false; + } + + // Match owner + if (filter.owner && !note.owner.equals(filter.owner)) { + return false; + } + + // Match storage slot + if (filter.storageSlot && !note.storageSlot.equals(filter.storageSlot!)) { + return false; + } + + // Match nullifier + if (filter.siloedNullifier && !note.siloedNullifier.equals(filter.siloedNullifier)) { + return false; + } + + return true; + } + /** * Retrieves notes based on the provided filter criteria. * @@ -227,105 +543,68 @@ export class NoteStore { * * @param filter - Filter criteria including contractAddress (required), and optional * owner, storageSlot, status, scopes, and siloedNullifier. + * @params jobId - the job context to read from. * @returns Filtered and deduplicated notes (a note might be present in multiple scopes - we ensure it is only * returned once if this is the case) * @throws If filtering by an empty scopes array. Scopes have to be set to undefined or to a non-empty array. */ - async getNotes(filter: NotesFilter): Promise { - filter.status = filter.status ?? NoteStatus.ACTIVE; - - // throw early if scopes is an empty array - if (filter.scopes !== undefined && filter.scopes.length === 0) { - throw new Error( - 'Trying to get notes with an empty scopes array. Scopes have to be set to undefined if intending on not filtering by scopes.', - ); - } - - const candidateNoteSources = []; - - filter.scopes ??= (await toArray(this.#scopes.keysAsync())).map(addressString => - AztecAddress.fromString(addressString), - ); + async getNotes(filter: NotesFilter, jobId: string): Promise { + const statusFilter = filter.status ?? NoteStatus.ACTIVE; + const scopesFilter = await this.#scopesFilter(jobId, filter); + // Query specific note indexes depending on given filters const activeNoteIdsPerScope: string[][] = []; - - for (const scope of new Set(filter.scopes)) { - const formattedScopeString = scope.toString(); - if (!(await this.#scopes.hasAsync(formattedScopeString))) { + for (const scope of scopesFilter) { + // TODO: move this to #scopesFilter + if (!(await this.#readScopes(jobId)).has(scope)) { throw new Error('Trying to get incoming notes of a scope that is not in the PXE database'); } - activeNoteIdsPerScope.push( - filter.storageSlot - ? await toArray( - this.#notesByStorageSlotAndScope.get(formattedScopeString)!.getValuesAsync(filter.storageSlot.toString()), - ) - : await toArray( - this.#notesByContractAndScope - .get(formattedScopeString)! - .getValuesAsync(filter.contractAddress.toString()), - ), - ); + if (filter.storageSlot) { + activeNoteIdsPerScope.push(await this.#readNotesByStorageSlotAndScope(jobId, scope, filter.storageSlot)); + } else { + activeNoteIdsPerScope.push(await this.#readNotesByContractAndScope(jobId, scope, filter.contractAddress)); + } } - candidateNoteSources.push({ - ids: new Set(activeNoteIdsPerScope.flat()), - notes: this.#notes, - }); + // Collect all the discovered note id's and look them up in notes and nullifiers collections + const candidateNoteIds = new Set(activeNoteIdsPerScope.flat()); - // If status is ACTIVE_OR_NULLIFIED we add nullified notes as candidates on top of the default active ones. - if (filter.status === NoteStatus.ACTIVE_OR_NULLIFIED) { - const nullifiedIds = filter.storageSlot - ? await toArray(this.#nullifiedNotesByStorageSlot.getValuesAsync(filter.storageSlot.toString())) - : await toArray(this.#nullifiedNotesByContract.getValuesAsync(filter.contractAddress.toString())); - - const setOfScopes = new Set(filter.scopes.map(s => s.toString() as string)); - const filteredNullifiedIds = new Set(); - - for (const noteId of nullifiedIds) { - const scopeList = await toArray(this.#nullifiedNotesToScope.getValuesAsync(noteId)); - if (scopeList.some(scope => setOfScopes.has(scope))) { - filteredNullifiedIds.add(noteId); - } - } + const result: NoteDao[] = []; + for (const noteId of candidateNoteIds) { + const note = await this.#readNoteBySiloedNullifier(jobId, noteId); - if (filteredNullifiedIds.size > 0) { - candidateNoteSources.push({ - ids: filteredNullifiedIds, - notes: this.#nullifiedNotes, - }); + if (note && this.#doesNoteMatchFilter(note, filter)) { + result.push(note); } } - const result: NoteDao[] = []; - for (const { ids, notes } of candidateNoteSources) { - for (const id of ids) { - const serializedNote = await notes.getAsync(id); - if (!serializedNote) { - continue; - } - - const note = NoteDao.fromBuffer(serializedNote); - if (!note.contractAddress.equals(filter.contractAddress)) { - continue; - } - - if (filter.owner && !note.owner.equals(filter.owner)) { - continue; - } - - if (filter.storageSlot && !note.storageSlot.equals(filter.storageSlot!)) { - continue; + // If status is ACTIVE_OR_NULLIFIED we add nullified notes as candidates on top of the default active ones. + if (statusFilter === NoteStatus.ACTIVE_OR_NULLIFIED) { + const nullifiedNoteIdsPerScope: string[][] = []; + for (const scope of scopesFilter) { + if (filter.storageSlot) { + nullifiedNoteIdsPerScope.push( + await this.#readNullifiedNotesByStorageSlotAndScope(jobId, scope, filter.storageSlot), + ); + } else { + nullifiedNoteIdsPerScope.push( + await this.#readNullifiedNotesByContractAndScope(jobId, scope, filter.contractAddress), + ); } + } - if (filter.siloedNullifier && !note.siloedNullifier.equals(filter.siloedNullifier)) { - continue; + const candidateNullifiedNoteIds = new Set(nullifiedNoteIdsPerScope.flat()); + for (const noteId of candidateNullifiedNoteIds) { + const note = await this.#readNullifiedNoteById(jobId, noteId); + if (note && this.#doesNoteMatchFilter(note, filter)) { + result.push(note); } - - result.push(note); } } + // TODO: more appropriate usage of Set or Map would allow us to get rid of the deduplication step, + // which is desirable because this method is complex enough // A note might be present in multiple scopes - we ensure it is only returned once const deduplicated: NoteDao[] = []; for (const note of result) { @@ -359,57 +638,74 @@ export class NoteStore { * @returns Promise resolving to array of nullified NoteDao objects * @throws Error if any nullifier is not found in the active notes */ - applyNullifiers(nullifiers: DataInBlock[]): Promise { - if (nullifiers.length === 0) { - return Promise.resolve([]); - } + applyNullifiers(nullifiers: DataInBlock[], jobId: string): Promise { + return this.#withJobLock(jobId, () => + this.#store.transactionAsync(async () => { + if (nullifiers.length === 0) { + return Promise.resolve([]); + } - return this.#store.transactionAsync(async () => { - const nullifiedNotes: NoteDao[] = []; + // Collect all changes before applying to ensure atomicity + const pendingChanges: { noteIndex: string; note: NoteDao; scopes: string[]; blockNumber: number }[] = []; - for (const blockScopedNullifier of nullifiers) { - const { data: nullifier, l2BlockNumber: blockNumber } = blockScopedNullifier; - const noteId = nullifier.toString(); + for (const blockScopedNullifier of nullifiers) { + const { data: nullifier } = blockScopedNullifier; + const noteId = nullifier.toString(); - const noteBuffer = await this.#notes.getAsync(noteId); - if (!noteBuffer) { - // Check if already nullified (noteId === siloedNullifier, so we can check #nullifiedNotes directly) + // Check if the note is already nullified (in current job stage or permanently) + if (this.#getNullifiedNotesForJob(jobId).has(noteId)) { + // Already nullified in this job - skip silently + continue; + } if (await this.#nullifiedNotes.hasAsync(noteId)) { - throw new Error(`Nullifier already applied in applyNullifiers`); + // Already nullified permanently - skip silently + continue; } - throw new Error('Nullifier not found in applyNullifiers'); - } - const noteScopes = await toArray(this.#notesToScope.getValuesAsync(noteId)); - if (noteScopes.length === 0) { - // We should never run into this error because notes always have a scope assigned to them - either on initial - // insertion via `addNotes` or when removing their nullifiers. - throw new Error('Note scopes are missing in applyNullifiers'); - } + // Check if note exists (either in stage or in KV store) + const noteInStage = this.#getNotesForJob(jobId).get(noteId); + const noteInKVStore = !noteInStage ? await this.#notes.getAsync(noteId) : undefined; - const note = NoteDao.fromBuffer(noteBuffer); + if (!noteInStage && !noteInKVStore) { + // Nullifier not found in active notes - unknown nullifier + // Skip it silently as this can happen legitimately with concurrent operations. + continue; + } - nullifiedNotes.push(note); + // Check if we're trying to nullify the same note twice in this batch - skip duplicates + if (pendingChanges.some(change => change.noteIndex === noteId)) { + continue; + } - await this.#notes.delete(noteId); - await this.#notesToScope.delete(noteId); + const note = noteInStage?.note ?? (noteInKVStore ? NoteDao.fromBuffer(noteInKVStore) : undefined); + if (!note) { + // Note was nullified between the lookup and here - skip it + continue; + } - const scopes = await toArray(this.#scopes.keysAsync()); + const noteScopes = await this.#readScopesByNote(jobId, noteId); + if (noteScopes.length === 0) { + // We should never run into this error because notes always have a scope assigned to them - either on initial + // insertion via `addNotes` or when removing their nullifiers. + throw new Error('Note scopes are missing in applyNullifiers'); + } - for (const scope of scopes) { - await this.#notesByContractAndScope.get(scope)!.deleteValue(note.contractAddress.toString(), noteId); - await this.#notesByStorageSlotAndScope.get(scope)!.deleteValue(note.storageSlot.toString(), noteId); + pendingChanges.push({ + noteIndex: noteId, + note, + scopes: noteScopes, + blockNumber: blockScopedNullifier.l2BlockNumber, + }); } - for (const scope of noteScopes) { - await this.#nullifiedNotesToScope.set(noteId, scope); + // Apply all changes after validation passes + for (const { noteIndex, note, scopes, blockNumber } of pendingChanges) { + this.#addNullifiedNote(jobId, noteIndex, note, scopes, blockNumber); + this.#deleteNote(jobId, noteIndex); } - await this.#nullifiedNotes.set(noteId, note.toBuffer()); - await this.#nullifiersByBlockNumber.set(blockNumber, noteId); - await this.#nullifiedNotesByContract.set(note.contractAddress.toString(), noteId); - await this.#nullifiedNotesByStorageSlot.set(note.storageSlot.toString(), noteId); - } - return nullifiedNotes; - }); + + return pendingChanges.map(change => change.note); + }), + ); } } diff --git a/yarn-project/pxe/src/storage/private_event_store/private_event_store.test.ts b/yarn-project/pxe/src/storage/private_event_store/private_event_store.test.ts index 8e0f6fb0b82d..f42c73d8de33 100644 --- a/yarn-project/pxe/src/storage/private_event_store/private_event_store.test.ts +++ b/yarn-project/pxe/src/storage/private_event_store/private_event_store.test.ts @@ -1,4 +1,5 @@ import { BlockNumber } from '@aztec/foundation/branded-types'; +import { randomInt } from '@aztec/foundation/crypto/random'; import { Fr } from '@aztec/foundation/curves/bn254'; import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { EventSelector } from '@aztec/stdlib/abi'; @@ -49,34 +50,56 @@ describe('PrivateEventStore', () => { }); it('stores and retrieves private events', async () => { - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent, siloedEventCommitment, { - contractAddress, - scope, - txHash, - l2BlockNumber, - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); + { + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent, + siloedEventCommitment, + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'test', + ); + await privateEventStore.commit('test'); + } + const events = await privateEventStore.getPrivateEvents(eventSelector, { contractAddress, fromBlock: l2BlockNumber, toBlock: l2BlockNumber + 1, scopes: [scope], }); + expect(events).toEqual([expectedEvent]); }); it('ignores duplicate events with same siloedEventCommitment', async () => { - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent, siloedEventCommitment, { - contractAddress, - scope, - txHash, - l2BlockNumber, - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); + { + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent, + siloedEventCommitment, + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'test', + ); + await privateEventStore.commit('test'); + } const events = await privateEventStore.getPrivateEvents(eventSelector, { contractAddress, @@ -91,24 +114,41 @@ describe('PrivateEventStore', () => { it('allows multiple events with same content but different siloedEventCommitment', async () => { const otherSiloedEventCommitment = Fr.random(); - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent, siloedEventCommitment, { - contractAddress, - scope, - txHash, - l2BlockNumber, - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent, otherSiloedEventCommitment, { - contractAddress, - scope, - txHash, - l2BlockNumber, - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 1, - }); + { + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent, + siloedEventCommitment, + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'test', + ); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent, + otherSiloedEventCommitment, + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 1, + }, + 'test', + ); + await privateEventStore.commit('test'); + } const events = await privateEventStore.getPrivateEvents(eventSelector, { contractAddress, @@ -127,33 +167,57 @@ describe('PrivateEventStore', () => { l2BlockNumber: BlockNumber(200), }; - await privateEventStore.storePrivateEventLog(eventSelector, randomness, getRandomMsgContent(), Fr.random(), { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(100), - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent, Fr.random(), { - contractAddress, - scope, - txHash: expectedEvent.txHash, - l2BlockNumber: expectedEvent.l2BlockNumber, - l2BlockHash: expectedEvent.l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); - await privateEventStore.storePrivateEventLog(eventSelector, randomness, getRandomMsgContent(), Fr.random(), { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(300), - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); + { + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + getRandomMsgContent(), + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(100), + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'test', + ); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent, + Fr.random(), + { + contractAddress, + scope, + txHash: expectedEvent.txHash, + l2BlockNumber: expectedEvent.l2BlockNumber, + l2BlockHash: expectedEvent.l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'test', + ); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + getRandomMsgContent(), + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(300), + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'test', + ); + await privateEventStore.commit('test'); + } const events = await privateEventStore.getPrivateEvents(eventSelector, { contractAddress, @@ -168,24 +232,41 @@ describe('PrivateEventStore', () => { it('filters events by recipient', async () => { const otherScope = await AztecAddress.random(); - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent, siloedEventCommitment, { - contractAddress, - scope, - txHash, - l2BlockNumber, - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent, Fr.random(), { - contractAddress, - scope: otherScope, - txHash: TxHash.random(), - l2BlockNumber, - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); + { + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent, + siloedEventCommitment, + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'test', + ); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent, + Fr.random(), + { + contractAddress, + scope: otherScope, + txHash: TxHash.random(), + l2BlockNumber, + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'test', + ); + await privateEventStore.commit('test'); + } const events = await privateEventStore.getPrivateEvents(eventSelector, { contractAddress, @@ -220,35 +301,59 @@ describe('PrivateEventStore', () => { }); it('returns events in order by block number', async () => { - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent2, Fr.random(), { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(200), - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); - - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent1, Fr.random(), { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(100), - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); - - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent3, Fr.random(), { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(300), - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); + { + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent2, + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(200), + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'test', + ); + + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent1, + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(100), + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'test', + ); + + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent3, + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(300), + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'test', + ); + await privateEventStore.commit('test'); + } const events = await privateEventStore.getPrivateEvents(eventSelector, { contractAddress, @@ -263,36 +368,60 @@ describe('PrivateEventStore', () => { it('returns events in order by tx index within the same block', async () => { const l2BlockNumber = BlockNumber(100); - // Store events in the same block but different tx indexes (store them out of order) - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent2, Fr.random(), { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber, - l2BlockHash, - txIndexInBlock: 1, - eventIndexInTx: 0, - }); - - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent1, Fr.random(), { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber, - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); - - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent3, Fr.random(), { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber, - l2BlockHash, - txIndexInBlock: 2, - eventIndexInTx: 0, - }); + { + // Store events in the same block but different tx indexes (store them out of order) + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent2, + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber, + l2BlockHash, + txIndexInBlock: 1, + eventIndexInTx: 0, + }, + 'test', + ); + + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent1, + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber, + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'test', + ); + + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent3, + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber, + l2BlockHash, + txIndexInBlock: 2, + eventIndexInTx: 0, + }, + 'test', + ); + await privateEventStore.commit('test'); + } const events = await privateEventStore.getPrivateEvents(eventSelector, { contractAddress, @@ -308,36 +437,60 @@ describe('PrivateEventStore', () => { const txHash = TxHash.random(); const l2BlockNumber = BlockNumber(100); - // Store events in the same block and tx but different event indexes (store them out of order) - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent2, Fr.random(), { - contractAddress, - scope, - txHash, - l2BlockNumber, - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 1, - }); - - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent1, Fr.random(), { - contractAddress, - scope, - txHash, - l2BlockNumber, - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); - - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent3, Fr.random(), { - contractAddress, - scope, - txHash, - l2BlockNumber, - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 2, - }); + { + // Store events in the same block and tx but different event indexes (store them out of order) + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent2, + Fr.random(), + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 1, + }, + 'test', + ); + + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent1, + Fr.random(), + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'test', + ); + + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent3, + Fr.random(), + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 2, + }, + 'test', + ); + await privateEventStore.commit('test'); + } const events = await privateEventStore.getPrivateEvents(eventSelector, { contractAddress, @@ -365,45 +518,75 @@ describe('PrivateEventStore', () => { it('removes events after rollback block', async () => { // Store events in blocks 100, 200, 300 - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent1, Fr.random(), { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(100), - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent1, + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(100), + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'before-rollback', + ); // We add another event in the same block to verify that more events per block work. - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent2, Fr.random(), { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(100), - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 1, - }); - - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent3, Fr.random(), { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(200), - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); - - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent4, Fr.random(), { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(300), - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent2, + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(100), + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 1, + }, + 'before-rollback', + ); + + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent3, + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(200), + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'before-rollback', + ); + + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent4, + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(300), + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'before-rollback', + ); + + await privateEventStore.commit('before-rollback'); // Rollback to block 150 (should remove events from blocks 200 and 300) await privateEventStore.rollback(150, 300); @@ -427,15 +610,23 @@ describe('PrivateEventStore', () => { const reorgEventCommitment = Fr.random(); // Store event at block 200 - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent1, reorgEventCommitment, { - contractAddress, - scope, - txHash: reorgTxHash, - l2BlockNumber: BlockNumber(200), - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent1, + reorgEventCommitment, + { + contractAddress, + scope, + txHash: reorgTxHash, + l2BlockNumber: BlockNumber(200), + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'test', + ); + await privateEventStore.commit('before-rollback'); // Rollback to block 100 await privateEventStore.rollback(100, 200); @@ -450,15 +641,24 @@ describe('PrivateEventStore', () => { expect(events.length).toBe(0); // Re-add the same event (same siloedEventCommitment and txHash, as happens after a reorg) - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent1, reorgEventCommitment, { - contractAddress, - scope, - txHash: reorgTxHash, - l2BlockNumber: BlockNumber(200), - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent1, + reorgEventCommitment, + { + contractAddress, + scope, + txHash: reorgTxHash, + l2BlockNumber: BlockNumber(200), + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 're-add', + ); + + await privateEventStore.commit('re-add'); // Verify event can be retrieved again events = await privateEventStore.getPrivateEvents(eventSelector, { @@ -471,15 +671,23 @@ describe('PrivateEventStore', () => { }); it('handles rollback with no events to remove', async () => { - await privateEventStore.storePrivateEventLog(eventSelector, randomness, msgContent1, Fr.random(), { - contractAddress, - scope, - txHash: TxHash.random(), - l2BlockNumber: BlockNumber(100), - l2BlockHash, - txIndexInBlock: 0, - eventIndexInTx: 0, - }); + await privateEventStore.storePrivateEventLog( + eventSelector, + randomness, + msgContent1, + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber: BlockNumber(100), + l2BlockHash, + txIndexInBlock: 0, + eventIndexInTx: 0, + }, + 'before-rollback', + ); + await privateEventStore.commit('before-rollback'); // Rollback after all existing events await privateEventStore.rollback(200, 300); @@ -495,4 +703,155 @@ describe('PrivateEventStore', () => { expect(events[0].packedEvent).toEqual(msgContent1); }); }); + + describe('staging', () => { + it('stages events without affecting committed storage', async () => { + const commitJobId: string = 'commit-job'; + const stagingJobId: string = 'staging-job'; + + const committedEventRandomness = Fr.random(); + const stagedEventRandomness = Fr.random(); + + // Store committed event + await privateEventStore.storePrivateEventLog( + eventSelector, + committedEventRandomness, + msgContent, + Fr.random(), + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + txIndexInBlock: randomInt(100), + eventIndexInTx: randomInt(100), + }, + commitJobId, + ); + await privateEventStore.commit(commitJobId); + + // Store staged event (not committed) + const stagedMsgContent = getRandomMsgContent(); + await privateEventStore.storePrivateEventLog( + eventSelector, + stagedEventRandomness, + stagedMsgContent, + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber, + l2BlockHash, + txIndexInBlock: randomInt(100), + eventIndexInTx: randomInt(100), + }, + stagingJobId, + ); + + // With a fresh jobId, should only see committed event + const events = await privateEventStore.getPrivateEvents(eventSelector, { + contractAddress, + fromBlock: l2BlockNumber, + toBlock: l2BlockNumber + 1, + scopes: [scope], + }); + expect(events).toHaveLength(1); + expect(events[0].packedEvent).toEqual(msgContent); + }); + + it('commit promotes staged events to main storage', async () => { + const stagingJobId: string = 'staging-job'; + const stagedEventRandomness = Fr.random(); + const stagedMsgContent = getRandomMsgContent(); + + await privateEventStore.storePrivateEventLog( + eventSelector, + stagedEventRandomness, + stagedMsgContent, + Fr.random(), + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + txIndexInBlock: randomInt(100), + eventIndexInTx: randomInt(100), + }, + stagingJobId, + ); + + await privateEventStore.commit(stagingJobId); + + // Now should see the event with a fresh jobId + const events = await privateEventStore.getPrivateEvents(eventSelector, { + contractAddress, + fromBlock: l2BlockNumber, + toBlock: l2BlockNumber + 1, + scopes: [scope], + }); + expect(events).toHaveLength(1); + expect(events[0].packedEvent).toEqual(stagedMsgContent); + }); + + it('discardStaged removes staged events without affecting main', async () => { + const commitJobId: string = 'commit-job'; + const stagingJobId: string = 'staging-job'; + const committedEventRandomness = Fr.random(); + const stagedEventRandomness = Fr.random(); + + // Store committed event + await privateEventStore.storePrivateEventLog( + eventSelector, + committedEventRandomness, + msgContent, + Fr.random(), + { + contractAddress, + scope, + txHash, + l2BlockNumber, + l2BlockHash, + txIndexInBlock: randomInt(100), + eventIndexInTx: randomInt(100), + }, + commitJobId, + ); + await privateEventStore.commit(commitJobId); + + // Store staged event (not committed) + const stagedMsgContent = getRandomMsgContent(); + await privateEventStore.storePrivateEventLog( + eventSelector, + stagedEventRandomness, + stagedMsgContent, + Fr.random(), + { + contractAddress, + scope, + txHash: TxHash.random(), + l2BlockNumber, + l2BlockHash, + txIndexInBlock: randomInt(100), + eventIndexInTx: randomInt(100), + }, + stagingJobId, + ); + + // Discard staging + await privateEventStore.discardStaged(stagingJobId); + + // Should only see committed event + const events = await privateEventStore.getPrivateEvents(eventSelector, { + contractAddress, + fromBlock: l2BlockNumber, + toBlock: l2BlockNumber + 1, + scopes: [scope], + }); + expect(events).toHaveLength(1); + expect(events[0].packedEvent).toEqual(msgContent); + }); + }); }); diff --git a/yarn-project/pxe/src/storage/private_event_store/private_event_store.ts b/yarn-project/pxe/src/storage/private_event_store/private_event_store.ts index ee3c96b65673..70c2cacd772e 100644 --- a/yarn-project/pxe/src/storage/private_event_store/private_event_store.ts +++ b/yarn-project/pxe/src/storage/private_event_store/private_event_store.ts @@ -2,12 +2,13 @@ import { BlockNumber } from '@aztec/foundation/branded-types'; import { Fr } from '@aztec/foundation/curves/bn254'; import { createLogger } from '@aztec/foundation/log'; import { BufferReader, serializeToBuffer } from '@aztec/foundation/serialize'; -import type { AztecAsyncKVStore, AztecAsyncMap } from '@aztec/kv-store'; +import type { AztecAsyncKVStore, AztecAsyncMap, AztecAsyncMultiMap } from '@aztec/kv-store'; import type { EventSelector } from '@aztec/stdlib/abi'; import type { AztecAddress } from '@aztec/stdlib/aztec-address'; import { L2BlockHash } from '@aztec/stdlib/block'; import { type InTx, TxHash } from '@aztec/stdlib/tx'; +import type { StagedStore } from '../../job_coordinator/job_coordinator.js'; import type { PackedPrivateEvent } from '../../pxe.js'; export type PrivateEventStoreFilter = { @@ -44,31 +45,110 @@ type PrivateEventMetadata = InTx & { /** * Stores decrypted private event logs. */ -export class PrivateEventStore { +export class PrivateEventStore implements StagedStore { + readonly storeName: string = 'private_event'; + #store: AztecAsyncKVStore; /** Map storing the actual private event log entries, keyed by siloedEventCommitment */ #eventLogs: AztecAsyncMap; - /** Map from contractAddress_scope_eventSelector to siloedEventCommitment[] for efficient lookup */ - #eventsByContractScopeSelector: AztecAsyncMap; - /** Map from block number to siloedEventCommitment[] for rollback support */ - #eventsByBlockNumber: AztecAsyncMap; + /** Multi-map from contractAddress_scope_eventSelector to siloedEventCommitment for efficient lookup */ + #eventsByContractScopeSelector: AztecAsyncMultiMap; + /** Multi-map from block number to siloedEventCommitment for rollback support */ + #eventsByBlockNumber: AztecAsyncMultiMap; /** Map from siloedEventCommitment to boolean indicating if log has been seen. */ #seenLogs: AztecAsyncMap; + /** jobId => eventId (event siloed nullifier) => PrivateEventEntry */ + #eventLogsInJobStage: Map>; + logger = createLogger('private_event_store'); constructor(store: AztecAsyncKVStore) { this.#store = store; this.#eventLogs = this.#store.openMap('private_event_logs'); - this.#eventsByContractScopeSelector = this.#store.openMap('events_by_contract_scope_selector'); + this.#eventsByContractScopeSelector = this.#store.openMultiMap('events_by_contract_scope_selector'); this.#seenLogs = this.#store.openMap('seen_logs'); - this.#eventsByBlockNumber = this.#store.openMap('events_by_block_number'); + this.#eventsByBlockNumber = this.#store.openMultiMap('events_by_block_number'); + + this.#eventLogsInJobStage = new Map(); } #keyFor(contractAddress: AztecAddress, scope: AztecAddress, eventSelector: EventSelector): string { return `${contractAddress.toString()}_${scope.toString()}_${eventSelector.toString()}`; } + async commit(jobId: string): Promise { + await Promise.all( + [...this.#getEventLogsInJobStage(jobId).entries()].map(async ([eventId, eventEntry]) => { + this.logger.verbose('storing private event log (KV store)', eventEntry); + + await Promise.all([ + this.#eventLogs.set(eventId, eventEntry), + this.#eventsByContractScopeSelector.set(eventEntry.lookupKey, eventId), + this.#eventsByBlockNumber.set(eventEntry.l2BlockNumber, eventId), + this.#seenLogs.set(eventId, true), + ]); + }), + ); + + return this.discardStaged(jobId); + } + + discardStaged(jobId: string): Promise { + this.#eventLogsInJobStage.delete(jobId); + return Promise.resolve(); + } + + #getEventLogsInJobStage(jobId: string): Map { + let jobStage = this.#eventLogsInJobStage.get(jobId); + if (jobStage === undefined) { + jobStage = new Map(); + this.#eventLogsInJobStage.set(jobId, jobStage); + } + return jobStage; + } + + async #isSeenLog(jobId: string, eventId: string): Promise { + const eventLogsInJobStage = this.#getEventLogsInJobStage(jobId).get(eventId); + return !!eventLogsInJobStage || !!(await this.#seenLogs.getAsync(eventId)); + } + + #addEventLogToStage(jobId: string, eventId: string, eventEntry: PrivateEventEntry) { + this.#getEventLogsInJobStage(jobId).set(eventId, eventEntry); + } + + async #getEventSiloedNullifiers( + contractAddress: AztecAddress, + scope: AztecAddress, + eventSelector: EventSelector, + jobId?: string, + ): Promise { + const key = this.#keyFor(contractAddress, scope, eventSelector); + const eventSiloedNullifiersInStorage: string[] = []; + for await (const eventId of this.#eventsByContractScopeSelector.getValuesAsync(key)) { + eventSiloedNullifiersInStorage.push(eventId); + } + + if (!jobId) { + return eventSiloedNullifiersInStorage; + } + + const eventSiloedNullifiersInJobStage = new Set( + [...this.#getEventLogsInJobStage(jobId).entries()] + .filter(([_, entry]) => entry.lookupKey === key) + .map(([idx, _]) => idx), + ); + return [...new Set(eventSiloedNullifiersInStorage).union(eventSiloedNullifiersInJobStage)]; + } + + async #getEventLogBySiloedNullifier(eventId: string, jobId?: string): Promise { + if (jobId) { + return this.#getEventLogsInJobStage(jobId).get(eventId) ?? (await this.#eventLogs.getAsync(eventId)); + } else { + return await this.#eventLogs.getAsync(eventId); + } + } + /** * Store a private event log. * @param eventSelector - The event selector of the event. @@ -87,6 +167,7 @@ export class PrivateEventStore { msgContent: Fr[], siloedEventCommitment: Fr, metadata: PrivateEventMetadata, + jobId: string, ): Promise { const { contractAddress, scope, txHash, l2BlockNumber, l2BlockHash, txIndexInBlock, eventIndexInTx } = metadata; @@ -97,15 +178,20 @@ export class PrivateEventStore { // reason we use it as id. const eventId = siloedEventCommitment.toString(); - const hasBeenSeen = await this.#seenLogs.getAsync(eventId); + const hasBeenSeen = await this.#isSeenLog(jobId, eventId); if (hasBeenSeen) { this.logger.verbose('Ignoring duplicate event log', { txHash: txHash.toString(), siloedEventCommitment }); return; } - this.logger.verbose('storing private event log', { contractAddress, scope, msgContent, l2BlockNumber }); + this.logger.verbose('storing private event log (job stage)', { + contractAddress, + scope, + msgContent, + l2BlockNumber, + }); - await this.#eventLogs.set(eventId, { + this.#addEventLogToStage(jobId, eventId, { randomness, msgContent: serializeToBuffer(msgContent), l2BlockNumber, @@ -115,15 +201,6 @@ export class PrivateEventStore { eventIndexInTx, lookupKey: key, }); - - const existingIds = (await this.#eventsByContractScopeSelector.getAsync(key)) || []; - await this.#eventsByContractScopeSelector.set(key, [...existingIds, eventId]); - - const existingBlockIds = (await this.#eventsByBlockNumber.getAsync(l2BlockNumber)) || []; - await this.#eventsByBlockNumber.set(l2BlockNumber, [...existingBlockIds, eventId]); - - // Mark this log as seen - await this.#seenLogs.set(eventId, true); }); } @@ -150,11 +227,11 @@ export class PrivateEventStore { }> = []; for (const scope of filter.scopes) { - const key = this.#keyFor(filter.contractAddress, scope, eventSelector); - const eventIds = (await this.#eventsByContractScopeSelector.getAsync(key)) || []; + const eventIds = await this.#getEventSiloedNullifiers(filter.contractAddress, scope, eventSelector); for (const eventId of eventIds) { - const entry = await this.#eventLogs.getAsync(eventId); + const entry = await this.#getEventLogBySiloedNullifier(eventId); + if (!entry || entry.l2BlockNumber < filter.fromBlock || entry.l2BlockNumber >= filter.toBlock) { continue; } @@ -202,14 +279,29 @@ export class PrivateEventStore { * Rolls back private events that were stored after a given `blockNumber` and up to `synchedBlockNumber` (the block * number up to which PXE managed to sync before the reorg happened). * + * We don't need staged writes for a rollback since it's handled in the context of a blockchain rewind. + * + * Rollbacks are handled by the BlockSynchronizer, which runs a DB transaction across stores when it detects a + * re-org, including setting the new anchor block after rolling back. + * + * So if anything fails in the process of rolling back any store, all DB changes occurring during rollbacks will be + * lost and the anchor block will not be updated; which means this code will eventually need to run again + * (i.e.: PXE will detect it's basing it work on an invalid block hash, then which re-triggers rewind). + * + * For further details, refer to `BlockSynchronizer#handleBlockStreamEvent`. + * * IMPORTANT: This method must be called within a transaction to ensure atomicity. */ public async rollback(blockNumber: number, synchedBlockNumber: number): Promise { let removedCount = 0; for (let block = blockNumber + 1; block <= synchedBlockNumber; block++) { - const eventIds = await this.#eventsByBlockNumber.getAsync(block); - if (eventIds) { + const eventIds: string[] = []; + for await (const eventId of this.#eventsByBlockNumber.getValuesAsync(block)) { + eventIds.push(eventId); + } + + if (eventIds.length > 0) { await this.#eventsByBlockNumber.delete(block); for (const eventId of eventIds) { @@ -220,18 +312,7 @@ export class PrivateEventStore { await this.#eventLogs.delete(eventId); await this.#seenLogs.delete(eventId); - - // Update #eventsByContractScopeSelector using the stored lookupKey - const existingIds = await this.#eventsByContractScopeSelector.getAsync(entry.lookupKey); - if (!existingIds || existingIds.length === 0) { - throw new Error(`No ids found in #eventsByContractScopeSelector for key ${entry.lookupKey}`); - } - const filteredIds = existingIds.filter(id => id !== eventId); - if (filteredIds.length === 0) { - await this.#eventsByContractScopeSelector.delete(entry.lookupKey); - } else { - await this.#eventsByContractScopeSelector.set(entry.lookupKey, filteredIds); - } + await this.#eventsByContractScopeSelector.deleteValue(entry.lookupKey, eventId); removedCount++; } diff --git a/yarn-project/txe/src/txe_session.ts b/yarn-project/txe/src/txe_session.ts index 1e7e8ee4e0a9..02149f8c6f18 100644 --- a/yarn-project/txe/src/txe_session.ts +++ b/yarn-project/txe/src/txe_session.ts @@ -294,6 +294,7 @@ export class TXESession implements TXESessionStateHandler { this.noteStore, this.stateMachine.node, this.stateMachine.anchorBlockStore, + TXE_JOB_ID, ).syncNoteNullifiers(contractAddress); // Private execution has two associated block numbers: the anchor block (i.e. the historical block that is used to @@ -387,6 +388,7 @@ export class TXESession implements TXESessionStateHandler { this.noteStore, this.stateMachine.node, this.stateMachine.anchorBlockStore, + TXE_JOB_ID, ).syncNoteNullifiers(contractAddress); const anchorBlockHeader = await this.stateMachine.anchorBlockStore.getBlockHeader();