diff --git a/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution_oracle.ts b/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution_oracle.ts index 41a20efa141f..7a3d55f3ddbe 100644 --- a/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution_oracle.ts +++ b/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution_oracle.ts @@ -283,9 +283,9 @@ export class PrivateExecutionOracle extends UtilityExecutionOracle implements IP // This is a tagging secret we've not yet used in this tx, so first sync our store to make sure its indices // are up to date. We do this here because this store is not synced as part of the global sync because // that'd be wasteful as most tagging secrets are not used in each tx. - await syncSenderTaggingIndexes(secret, this.contractAddress, this.aztecNode, this.senderTaggingStore); + await syncSenderTaggingIndexes(secret, this.contractAddress, this.aztecNode, this.senderTaggingStore, this.jobId); - const lastUsedIndex = await this.senderTaggingStore.getLastUsedIndex(secret); + const lastUsedIndex = await this.senderTaggingStore.getLastUsedIndex(secret, this.jobId); // If lastUsedIndex is undefined, we've never used this secret, so start from 0 // Otherwise, the next index to use is one past the last used index return lastUsedIndex === undefined ? 0 : lastUsedIndex + 1; diff --git a/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts b/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts index 757ebef994be..1fcd05d35c6b 100644 --- a/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts +++ b/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts @@ -354,6 +354,7 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra this.recipientTaggingStore, this.senderAddressBookStore, this.addressStore, + this.jobId, ); await logService.syncTaggedLogs(this.contractAddress, pendingTaggedLogArrayBaseSlot, this.scopes); @@ -385,11 +386,11 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra // We read all note and event validation requests and process them all concurrently. This makes the process much // faster as we don't need to wait for the network round-trip. const noteValidationRequests = ( - await this.capsuleStore.readCapsuleArray(contractAddress, noteValidationRequestsArrayBaseSlot) + await this.capsuleStore.readCapsuleArray(contractAddress, noteValidationRequestsArrayBaseSlot, this.jobId) ).map(NoteValidationRequest.fromFields); const eventValidationRequests = ( - await this.capsuleStore.readCapsuleArray(contractAddress, eventValidationRequestsArrayBaseSlot) + await this.capsuleStore.readCapsuleArray(contractAddress, eventValidationRequestsArrayBaseSlot, this.jobId) ).map(EventValidationRequest.fromFields); const noteService = new NoteService(this.noteStore, this.aztecNode, this.anchorBlockStore); @@ -424,8 +425,8 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra await Promise.all([...noteDeliveries, ...eventDeliveries]); // Requests are cleared once we're done. - await this.capsuleStore.setCapsuleArray(contractAddress, noteValidationRequestsArrayBaseSlot, []); - await this.capsuleStore.setCapsuleArray(contractAddress, eventValidationRequestsArrayBaseSlot, []); + await this.capsuleStore.setCapsuleArray(contractAddress, noteValidationRequestsArrayBaseSlot, [], this.jobId); + await this.capsuleStore.setCapsuleArray(contractAddress, eventValidationRequestsArrayBaseSlot, [], this.jobId); } public async utilityBulkRetrieveLogs( @@ -441,7 +442,7 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra // We read all log retrieval requests and process them all concurrently. This makes the process much faster as we // don't need to wait for the network round-trip. const logRetrievalRequests = ( - await this.capsuleStore.readCapsuleArray(contractAddress, logRetrievalRequestsArrayBaseSlot) + await this.capsuleStore.readCapsuleArray(contractAddress, logRetrievalRequestsArrayBaseSlot, this.jobId) ).map(LogRetrievalRequest.fromFields); const logService = new LogService( @@ -452,18 +453,20 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra this.recipientTaggingStore, this.senderAddressBookStore, this.addressStore, + this.jobId, ); const maybeLogRetrievalResponses = await logService.bulkRetrieveLogs(logRetrievalRequests); // Requests are cleared once we're done. - await this.capsuleStore.setCapsuleArray(contractAddress, logRetrievalRequestsArrayBaseSlot, []); + await this.capsuleStore.setCapsuleArray(contractAddress, logRetrievalRequestsArrayBaseSlot, [], this.jobId); // The responses are stored as Option in a second CapsuleArray. await this.capsuleStore.setCapsuleArray( contractAddress, logRetrievalResponsesArrayBaseSlot, maybeLogRetrievalResponses.map(LogRetrievalResponse.toSerializedOption), + this.jobId, ); } @@ -472,7 +475,8 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra // TODO(#10727): instead of this check that this.contractAddress is allowed to access the external DB throw new Error(`Contract ${contractAddress} is not allowed to access ${this.contractAddress}'s PXE DB`); } - return this.capsuleStore.storeCapsule(this.contractAddress, slot, capsule); + this.capsuleStore.storeCapsule(this.contractAddress, slot, capsule, this.jobId); + return Promise.resolve(); } public async utilityLoadCapsule(contractAddress: AztecAddress, slot: Fr): Promise { @@ -483,7 +487,7 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra return ( // TODO(#12425): On the following line, the pertinent capsule gets overshadowed by the transient one. Tackle this. this.capsules.find(c => c.contractAddress.equals(contractAddress) && c.storageSlot.equals(slot))?.data ?? - (await this.capsuleStore.loadCapsule(this.contractAddress, slot)) + (await this.capsuleStore.loadCapsule(this.contractAddress, slot, this.jobId)) ); } @@ -492,7 +496,8 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra // TODO(#10727): instead of this check that this.contractAddress is allowed to access the external DB throw new Error(`Contract ${contractAddress} is not allowed to access ${this.contractAddress}'s PXE DB`); } - return this.capsuleStore.deleteCapsule(this.contractAddress, slot); + this.capsuleStore.deleteCapsule(this.contractAddress, slot, this.jobId); + return Promise.resolve(); } public utilityCopyCapsule( @@ -505,7 +510,7 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra // TODO(#10727): instead of this check that this.contractAddress is allowed to access the external DB throw new Error(`Contract ${contractAddress} is not allowed to access ${this.contractAddress}'s PXE DB`); } - return this.capsuleStore.copyCapsule(this.contractAddress, srcSlot, dstSlot, numEntries); + return this.capsuleStore.copyCapsule(this.contractAddress, srcSlot, dstSlot, numEntries, this.jobId); } // TODO(#11849): consider replacing this oracle with a pure Noir implementation of aes decryption. diff --git a/yarn-project/pxe/src/logs/log_service.test.ts b/yarn-project/pxe/src/logs/log_service.test.ts index 25e2c3494b8e..69fd3f297ae2 100644 --- a/yarn-project/pxe/src/logs/log_service.test.ts +++ b/yarn-project/pxe/src/logs/log_service.test.ts @@ -50,6 +50,7 @@ describe('LogService', () => { recipientTaggingStore, senderAddressBookStore, addressStore, + 'test', ); aztecNode.getPrivateLogsByTags.mockReset(); diff --git a/yarn-project/pxe/src/logs/log_service.ts b/yarn-project/pxe/src/logs/log_service.ts index 4959d4a089e4..c1382093ed0e 100644 --- a/yarn-project/pxe/src/logs/log_service.ts +++ b/yarn-project/pxe/src/logs/log_service.ts @@ -26,6 +26,7 @@ export class LogService { private readonly recipientTaggingStore: RecipientTaggingStore, private readonly senderAddressBookStore: SenderAddressBookStore, private readonly addressStore: AddressStore, + private readonly jobId: string, ) {} public async bulkRetrieveLogs(logRetrievalRequests: LogRetrievalRequest[]): Promise<(LogRetrievalResponse | null)[]> { @@ -122,6 +123,7 @@ export class LogService { this.aztecNode, this.recipientTaggingStore, anchorBlockNumber, + this.jobId, ), ), ); @@ -186,7 +188,7 @@ export class LogService { }); // TODO: This looks like it could belong more at the oracle interface level - return this.capsuleStore.appendToCapsuleArray(contractAddress, capsuleArrayBaseSlot, pendingTaggedLogs); + return this.capsuleStore.appendToCapsuleArray(contractAddress, capsuleArrayBaseSlot, pendingTaggedLogs, this.jobId); } async #getCompleteAddress(account: AztecAddress): Promise { diff --git a/yarn-project/pxe/src/pxe.ts b/yarn-project/pxe/src/pxe.ts index 5b6c21f0300d..371fd3bf95be 100644 --- a/yarn-project/pxe/src/pxe.ts +++ b/yarn-project/pxe/src/pxe.ts @@ -157,6 +157,7 @@ export class PXE { ); const jobCoordinator = new JobCoordinator(store); + jobCoordinator.registerStores([capsuleStore, senderTaggingStore, recipientTaggingStore]); const debugUtils = new PXEDebugUtils(contractStore, noteStore); @@ -744,7 +745,7 @@ export class PXE { // TODO(benesjan): The following is an expensive operation. Figure out a way to avoid it. const txHash = (await txProvingResult.toTx()).txHash; - await this.senderTaggingStore.storePendingIndexes(preTagsUsedInTheTx, txHash); + await this.senderTaggingStore.storePendingIndexes(preTagsUsedInTheTx, txHash, jobId); this.log.debug(`Stored used pre-tags as sender for the tx`, { preTagsUsedInTheTx, }); diff --git a/yarn-project/pxe/src/storage/capsule_store/capsule_store.test.ts b/yarn-project/pxe/src/storage/capsule_store/capsule_store.test.ts index 848c27d25e37..51217891caee 100644 --- a/yarn-project/pxe/src/storage/capsule_store/capsule_store.test.ts +++ b/yarn-project/pxe/src/storage/capsule_store/capsule_store.test.ts @@ -1,7 +1,7 @@ import { range } from '@aztec/foundation/array'; import { times } from '@aztec/foundation/collection'; import { Fr } from '@aztec/foundation/curves/bn254'; -import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; +import { AztecLMDBStoreV2, openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { AztecAddress } from '@aztec/stdlib/aztec-address'; import { CapsuleStore } from './capsule_store.js'; @@ -9,12 +9,13 @@ import { CapsuleStore } from './capsule_store.js'; describe('capsule data provider', () => { let contract: AztecAddress; let capsuleStore: CapsuleStore; + let store: AztecLMDBStoreV2; beforeEach(async () => { // Setup mock contract address contract = await AztecAddress.random(); - // Setup data provider - const store = await openTmpStore('capsule_store_test'); + // Setup store + store = await openTmpStore('capsule_store_test'); capsuleStore = new CapsuleStore(store); }); @@ -23,8 +24,8 @@ describe('capsule data provider', () => { const slot = new Fr(1); const values = [new Fr(42)]; - await capsuleStore.storeCapsule(contract, slot, values); - const result = await capsuleStore.loadCapsule(contract, slot); + capsuleStore.storeCapsule(contract, slot, values, 'test'); + const result = await capsuleStore.loadCapsule(contract, slot, 'test'); expect(result).toEqual(values); }); @@ -32,8 +33,8 @@ describe('capsule data provider', () => { const slot = new Fr(1); const values = [new Fr(42), new Fr(43), new Fr(44)]; - await capsuleStore.storeCapsule(contract, slot, values); - const result = await capsuleStore.loadCapsule(contract, slot); + capsuleStore.storeCapsule(contract, slot, values, 'test'); + const result = await capsuleStore.loadCapsule(contract, slot, 'test'); expect(result).toEqual(values); }); @@ -42,10 +43,10 @@ describe('capsule data provider', () => { const initialValues = [new Fr(42)]; const newValues = [new Fr(100)]; - await capsuleStore.storeCapsule(contract, slot, initialValues); - await capsuleStore.storeCapsule(contract, slot, newValues); + capsuleStore.storeCapsule(contract, slot, initialValues, 'test'); + capsuleStore.storeCapsule(contract, slot, newValues, 'test'); - const result = await capsuleStore.loadCapsule(contract, slot); + const result = await capsuleStore.loadCapsule(contract, slot, 'test'); expect(result).toEqual(newValues); }); @@ -55,11 +56,11 @@ describe('capsule data provider', () => { const values1 = [new Fr(42)]; const values2 = [new Fr(100)]; - await capsuleStore.storeCapsule(contract, slot, values1); - await capsuleStore.storeCapsule(anotherContract, slot, values2); + capsuleStore.storeCapsule(contract, slot, values1, 'test'); + capsuleStore.storeCapsule(anotherContract, slot, values2, 'test'); - const result1 = await capsuleStore.loadCapsule(contract, slot); - const result2 = await capsuleStore.loadCapsule(anotherContract, slot); + const result1 = await capsuleStore.loadCapsule(contract, slot, 'test'); + const result2 = await capsuleStore.loadCapsule(anotherContract, slot, 'test'); expect(result1).toEqual(values1); expect(result2).toEqual(values2); @@ -67,7 +68,7 @@ describe('capsule data provider', () => { it('returns null for non-existent slots', async () => { const slot = Fr.random(); - const result = await capsuleStore.loadCapsule(contract, slot); + const result = await capsuleStore.loadCapsule(contract, slot, 'test'); expect(result).toBeNull(); }); }); @@ -77,17 +78,17 @@ describe('capsule data provider', () => { const slot = new Fr(1); const values = [new Fr(42)]; - await capsuleStore.storeCapsule(contract, slot, values); - await capsuleStore.deleteCapsule(contract, slot); + capsuleStore.storeCapsule(contract, slot, values, 'test'); + capsuleStore.deleteCapsule(contract, slot, 'test'); - expect(await capsuleStore.loadCapsule(contract, slot)).toBeNull(); + expect(await capsuleStore.loadCapsule(contract, slot, 'test')).toBeNull(); }); it('deletes an empty slot', async () => { const slot = new Fr(1); - await capsuleStore.deleteCapsule(contract, slot); + capsuleStore.deleteCapsule(contract, slot, 'test'); - expect(await capsuleStore.loadCapsule(contract, slot)).toBeNull(); + expect(await capsuleStore.loadCapsule(contract, slot, 'test')).toBeNull(); }); }); @@ -96,82 +97,84 @@ describe('capsule data provider', () => { const slot = new Fr(1); const values = [new Fr(42)]; - await capsuleStore.storeCapsule(contract, slot, values); + capsuleStore.storeCapsule(contract, slot, values, 'test'); const dstSlot = new Fr(5); - await capsuleStore.copyCapsule(contract, slot, dstSlot, 1); + await capsuleStore.copyCapsule(contract, slot, dstSlot, 1, 'test'); - expect(await capsuleStore.loadCapsule(contract, dstSlot)).toEqual(values); + expect(await capsuleStore.loadCapsule(contract, dstSlot, 'test')).toEqual(values); }); it('copies multiple non-overlapping values', async () => { const src = new Fr(1); const valuesArray = [[new Fr(42)], [new Fr(1337)], [new Fr(13)]]; - await capsuleStore.storeCapsule(contract, src, valuesArray[0]); - await capsuleStore.storeCapsule(contract, src.add(new Fr(1)), valuesArray[1]); - await capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2]); + capsuleStore.storeCapsule(contract, src, valuesArray[0], 'test'); + capsuleStore.storeCapsule(contract, src.add(new Fr(1)), valuesArray[1], 'test'); + capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2], 'test'); const dst = new Fr(5); - await capsuleStore.copyCapsule(contract, src, dst, 3); + await capsuleStore.copyCapsule(contract, src, dst, 3, 'test'); - expect(await capsuleStore.loadCapsule(contract, dst)).toEqual(valuesArray[0]); - expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(1)))).toEqual(valuesArray[1]); - expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(2)))).toEqual(valuesArray[2]); + expect(await capsuleStore.loadCapsule(contract, dst, 'test')).toEqual(valuesArray[0]); + expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(1)), 'test')).toEqual(valuesArray[1]); + expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(2)), 'test')).toEqual(valuesArray[2]); }); it('copies overlapping values with src ahead', async () => { const src = new Fr(1); const valuesArray = [[new Fr(42)], [new Fr(1337)], [new Fr(13)]]; - await capsuleStore.storeCapsule(contract, src, valuesArray[0]); - await capsuleStore.storeCapsule(contract, src.add(new Fr(1)), valuesArray[1]); - await capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2]); + capsuleStore.storeCapsule(contract, src, valuesArray[0], 'test'); + capsuleStore.storeCapsule(contract, src.add(new Fr(1)), valuesArray[1], 'test'); + capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2], 'test'); const dst = new Fr(2); - await capsuleStore.copyCapsule(contract, src, dst, 3); + await capsuleStore.copyCapsule(contract, src, dst, 3, 'test'); - expect(await capsuleStore.loadCapsule(contract, dst)).toEqual(valuesArray[0]); - expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(1)))).toEqual(valuesArray[1]); - expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(2)))).toEqual(valuesArray[2]); + expect(await capsuleStore.loadCapsule(contract, dst, 'test')).toEqual(valuesArray[0]); + expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(1)), 'test')).toEqual(valuesArray[1]); + expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(2)), 'test')).toEqual(valuesArray[2]); // Slots 2 and 3 (src[1] and src[2]) should have been overwritten since they are also dst[0] and dst[1] - expect(await capsuleStore.loadCapsule(contract, src)).toEqual(valuesArray[0]); // src[0] (unchanged) - expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(1)))).toEqual(valuesArray[0]); // dst[0] - expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(2)))).toEqual(valuesArray[1]); // dst[1] + expect(await capsuleStore.loadCapsule(contract, src, 'test')).toEqual(valuesArray[0]); // src[0] (unchanged) + expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(1)), 'test')).toEqual(valuesArray[0]); // dst[0] + expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(2)), 'test')).toEqual(valuesArray[1]); // dst[1] }); it('copies overlapping values with dst ahead', async () => { const src = new Fr(5); const valuesArray = [[new Fr(42)], [new Fr(1337)], [new Fr(13)]]; - await capsuleStore.storeCapsule(contract, src, valuesArray[0]); - await capsuleStore.storeCapsule(contract, src.add(new Fr(1)), valuesArray[1]); - await capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2]); + capsuleStore.storeCapsule(contract, src, valuesArray[0], 'test'); + capsuleStore.storeCapsule(contract, src.add(new Fr(1)), valuesArray[1], 'test'); + capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2], 'test'); const dst = new Fr(4); - await capsuleStore.copyCapsule(contract, src, dst, 3); + await capsuleStore.copyCapsule(contract, src, dst, 3, 'test'); - expect(await capsuleStore.loadCapsule(contract, dst)).toEqual(valuesArray[0]); - expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(1)))).toEqual(valuesArray[1]); - expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(2)))).toEqual(valuesArray[2]); + expect(await capsuleStore.loadCapsule(contract, dst, 'test')).toEqual(valuesArray[0]); + expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(1)), 'test')).toEqual(valuesArray[1]); + expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(2)), 'test')).toEqual(valuesArray[2]); // Slots 5 and 6 (src[0] and src[1]) should have been overwritten since they are also dst[1] and dst[2] - expect(await capsuleStore.loadCapsule(contract, src)).toEqual(valuesArray[1]); // dst[1] - expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(1)))).toEqual(valuesArray[2]); // dst[2] - expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(2)))).toEqual(valuesArray[2]); // src[2] (unchanged) + expect(await capsuleStore.loadCapsule(contract, src, 'test')).toEqual(valuesArray[1]); // dst[1] + expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(1)), 'test')).toEqual(valuesArray[2]); // dst[2] + expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(2)), 'test')).toEqual(valuesArray[2]); // src[2] (unchanged) }); it('copying fails if any value is empty', async () => { const src = new Fr(1); const valuesArray = [[new Fr(42)], [new Fr(1337)], [new Fr(13)]]; - await capsuleStore.storeCapsule(contract, src, valuesArray[0]); + capsuleStore.storeCapsule(contract, src, valuesArray[0], 'test'); // We skip src[1] - await capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2]); + capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2], 'test'); const dst = new Fr(5); - await expect(capsuleStore.copyCapsule(contract, src, dst, 3)).rejects.toThrow('Attempted to copy empty slot'); + await expect(capsuleStore.copyCapsule(contract, src, dst, 3, 'test')).rejects.toThrow( + 'Attempted to copy empty slot', + ); }); }); @@ -181,11 +184,11 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); const array = range(4).map(x => [new Fr(x)]); - await capsuleStore.appendToCapsuleArray(contract, baseSlot, array); + await capsuleStore.appendToCapsuleArray(contract, baseSlot, array, 'test'); - expect(await capsuleStore.loadCapsule(contract, baseSlot)).toEqual([new Fr(array.length)]); + expect(await capsuleStore.loadCapsule(contract, baseSlot, 'test')).toEqual([new Fr(array.length)]); for (const i of range(array.length)) { - expect(await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + i)))).toEqual(array[i]); + expect(await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + i)), 'test')).toEqual(array[i]); } }); @@ -193,16 +196,16 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); const originalArray = range(4).map(x => [new Fr(x)]); - await capsuleStore.appendToCapsuleArray(contract, baseSlot, originalArray); + await capsuleStore.appendToCapsuleArray(contract, baseSlot, originalArray, 'test'); const newElements = [[new Fr(13)], [new Fr(42)]]; - await capsuleStore.appendToCapsuleArray(contract, baseSlot, newElements); + await capsuleStore.appendToCapsuleArray(contract, baseSlot, newElements, 'test'); const expectedLength = originalArray.length + newElements.length; - expect(await capsuleStore.loadCapsule(contract, baseSlot)).toEqual([new Fr(expectedLength)]); + expect(await capsuleStore.loadCapsule(contract, baseSlot, 'test')).toEqual([new Fr(expectedLength)]); for (const i of range(expectedLength)) { - expect(await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + i)))).toEqual( + expect(await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + i)), 'test')).toEqual( [...originalArray, ...newElements][i], ); } @@ -212,7 +215,7 @@ describe('capsule data provider', () => { describe('readCapsuleArray', () => { it('reads an empty array', async () => { const baseSlot = new Fr(3); - const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot); + const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot, 'test'); expect(retrievedArray).toEqual([]); }); @@ -220,9 +223,9 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); const storedArray = range(4).map(x => [new Fr(x)]); - await capsuleStore.appendToCapsuleArray(contract, baseSlot, storedArray); + await capsuleStore.appendToCapsuleArray(contract, baseSlot, storedArray, 'test'); - const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot); + const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot, 'test'); expect(retrievedArray).toEqual(storedArray); }); @@ -230,10 +233,12 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); // Store in the base slot a non-zero value, indicating a non-zero array length - await capsuleStore.storeCapsule(contract, baseSlot, [new Fr(1)]); + capsuleStore.storeCapsule(contract, baseSlot, [new Fr(1)], 'test'); // Reading should now fail as some of the capsules in the array are empty - await expect(capsuleStore.readCapsuleArray(contract, baseSlot)).rejects.toThrow('Expected non-empty value'); + await expect(capsuleStore.readCapsuleArray(contract, baseSlot, 'test')).rejects.toThrow( + 'Expected non-empty value', + ); }); }); @@ -242,9 +247,9 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); const newArray = range(4).map(x => [new Fr(x)]); - await capsuleStore.setCapsuleArray(contract, baseSlot, newArray); + await capsuleStore.setCapsuleArray(contract, baseSlot, newArray, 'test'); - const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot); + const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot, 'test'); expect(retrievedArray).toEqual(newArray); }); @@ -252,12 +257,12 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); const originalArray = range(4, 0).map(x => [new Fr(x)]); - await capsuleStore.setCapsuleArray(contract, baseSlot, originalArray); + await capsuleStore.setCapsuleArray(contract, baseSlot, originalArray, 'test'); const newArray = range(10, 10).map(x => [new Fr(x)]); - await capsuleStore.setCapsuleArray(contract, baseSlot, newArray); + await capsuleStore.setCapsuleArray(contract, baseSlot, newArray, 'test'); - const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot); + const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot, 'test'); expect(retrievedArray).toEqual(newArray); }); @@ -265,17 +270,19 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); const originalArray = range(10, 0).map(x => [new Fr(x)]); - await capsuleStore.setCapsuleArray(contract, baseSlot, originalArray); + await capsuleStore.setCapsuleArray(contract, baseSlot, originalArray, 'test'); const newArray = range(4, 10).map(x => [new Fr(x)]); - await capsuleStore.setCapsuleArray(contract, baseSlot, newArray); + await capsuleStore.setCapsuleArray(contract, baseSlot, newArray, 'test'); - const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot); + const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot, 'test'); expect(retrievedArray).toEqual(newArray); // Not only do we read the expected array, but also all capsules past the new array length have been cleared for (const i of range(originalArray.length - newArray.length)) { - expect(await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + newArray.length + i)))).toBeNull(); + expect( + await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + newArray.length + i)), 'test'), + ).toBeNull(); } }); @@ -283,16 +290,16 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); const originalArray = range(10, 0).map(x => [new Fr(x)]); - await capsuleStore.setCapsuleArray(contract, baseSlot, originalArray); + await capsuleStore.setCapsuleArray(contract, baseSlot, originalArray, 'test'); - await capsuleStore.setCapsuleArray(contract, baseSlot, []); + await capsuleStore.setCapsuleArray(contract, baseSlot, [], 'test'); - const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot); + const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot, 'test'); expect(retrievedArray).toEqual([]); // All capsules from the original array have been cleared for (const i of range(originalArray.length)) { - expect(await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + i)))).toBeNull(); + expect(await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + i)), 'test')).toBeNull(); } }); }); @@ -319,7 +326,12 @@ describe('capsule data provider', () => { contract, new Fr(0), times(NUMBER_OF_ITEMS, () => range(ARRAY_LENGTH).map(x => new Fr(x))), + 'test', ); + + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); }, TEST_TIMEOUT_MS, ); @@ -331,7 +343,12 @@ describe('capsule data provider', () => { contract, new Fr(0), times(NUMBER_OF_ITEMS, () => range(ARRAY_LENGTH).map(x => new Fr(x))), + 'test', ); + + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); }, TEST_TIMEOUT_MS, ); @@ -343,10 +360,19 @@ describe('capsule data provider', () => { contract, new Fr(0), times(NUMBER_OF_ITEMS, () => range(ARRAY_LENGTH).map(x => new Fr(x))), + 'test', ); + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); + // Append a single element - await capsuleStore.appendToCapsuleArray(contract, new Fr(0), [range(ARRAY_LENGTH).map(x => new Fr(x))]); + await capsuleStore.appendToCapsuleArray(contract, new Fr(0), [range(ARRAY_LENGTH).map(x => new Fr(x))], 'test'); + + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); }, TEST_TIMEOUT_MS, ); @@ -358,10 +384,19 @@ describe('capsule data provider', () => { contract, new Fr(0), times(NUMBER_OF_ITEMS, () => range(ARRAY_LENGTH).map(x => new Fr(x))), + 'test', ); + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); + // We just move the entire thing one slot. - await capsuleStore.copyCapsule(contract, new Fr(0), new Fr(1), NUMBER_OF_ITEMS); + await capsuleStore.copyCapsule(contract, new Fr(0), new Fr(1), NUMBER_OF_ITEMS, 'test'); + + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); }, TEST_TIMEOUT_MS, ); @@ -373,9 +408,18 @@ describe('capsule data provider', () => { contract, new Fr(0), times(NUMBER_OF_ITEMS, () => range(ARRAY_LENGTH).map(x => new Fr(x))), + 'test', ); - await capsuleStore.readCapsuleArray(contract, new Fr(0)); + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); + + await capsuleStore.readCapsuleArray(contract, new Fr(0), 'test'); + + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); }, TEST_TIMEOUT_MS, ); @@ -387,11 +431,128 @@ describe('capsule data provider', () => { contract, new Fr(0), times(NUMBER_OF_ITEMS, () => range(ARRAY_LENGTH).map(x => new Fr(x))), + 'test', ); - await capsuleStore.setCapsuleArray(contract, new Fr(0), []); + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); + + await capsuleStore.setCapsuleArray(contract, new Fr(0), [], 'test'); + + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); }, TEST_TIMEOUT_MS, ); }); + + describe('staged writes', () => { + it('commit does not hold zombie data', async () => { + // This test tries to reproduce a scenario where + // we fail to clear a job's data after commit. + // The effect of such an incorrect behavior would be perceived + // if we re-used a jobId we had previously committed, + // which should not happen given we generate random job id's, + // but it's good to keep things clean and consistent. + const slot = Fr.random(); + const committedValues1 = [Fr.random()]; + const committedValues2 = [Fr.random()]; + + capsuleStore.storeCapsule(contract, slot, committedValues1, 'job-1'); + + // After this commit, 'job-1' should logically be reset + // Any read of contract-slot after this should see committedValues1 + await capsuleStore.commit('job-1'); + + // Any read of contract-slot should see job2committedValues + capsuleStore.storeCapsule(contract, slot, committedValues2, 'job-2'); + await capsuleStore.commit('job-2'); + + // If we failed to properly dispose 'job-1's staged writes on commit, + // Instead of reading committedValues2 (as we should), we would end + // up reading committedValues1 (which would be wrong) + expect(await capsuleStore.loadCapsule(contract, slot, 'job-1')).toEqual(committedValues2); + }); + + it('writes to job view are isolated from another job view', async () => { + const slot = Fr.random(); + const committedValues = [Fr.random()]; + const stagedValues = [Fr.random()]; + const commitJobId: string = 'commit-job'; + const stagedJob1: string = 'staged-job-1'; + const stagedJob2: string = 'staged-job-2'; + + // First set a committed capsule (using a different job that we commit) + capsuleStore.storeCapsule(contract, slot, committedValues, commitJobId); + await capsuleStore.commit(commitJobId); + + // Then set a staged capsule (not committed) + capsuleStore.storeCapsule(contract, slot, stagedValues, stagedJob1); + + // With jobId=1, should get staged capsule + expect(await capsuleStore.loadCapsule(contract, slot, stagedJob1)).toEqual(stagedValues); + + // With jobId=2, should get committed capsule + expect(await capsuleStore.loadCapsule(contract, slot, stagedJob2)).toEqual(committedValues); + }); + + it('staged deletions hide committed data', async () => { + const slot = Fr.random(); + const committedValues = [Fr.random()]; + const commitJobId: string = 'commit-job'; + const stagedJob1: string = 'staged-job-1'; + const stagedJob2: string = 'staged-job-2'; + + // First set a committed capsule + capsuleStore.storeCapsule(contract, slot, committedValues, commitJobId); + await capsuleStore.commit(commitJobId); + + // Delete in staging (not committed) + capsuleStore.deleteCapsule(contract, slot, stagedJob1); + + // Without jobId=2, should still see committed capsule + expect(await capsuleStore.loadCapsule(contract, slot, stagedJob2)).toEqual(committedValues); + + // With jobId=1, should see null (deleted in staging) + expect(await capsuleStore.loadCapsule(contract, slot, stagedJob1)).toBeNull(); + }); + + it('commit applies staged deletions', async () => { + const slot = Fr.random(); + const committedValues = [Fr.random()]; + const commitJobId: string = 'commit-job'; + const deleteJobId: string = 'delete-job'; + + capsuleStore.storeCapsule(contract, slot, committedValues, commitJobId); + await capsuleStore.commit(commitJobId); + capsuleStore.deleteCapsule(contract, slot, deleteJobId); + + await capsuleStore.commit(deleteJobId); + + // Now any job should see this null (deleted) + expect(await capsuleStore.loadCapsule(contract, slot, 'any-job-sees-this')).toBeNull(); + }); + + it('discardStaged removes staged data without affecting main', async () => { + const slot = Fr.random(); + const committedValues = [Fr.random()]; + const stagedValues = [Fr.random()]; + const commitJobId: string = 'commit-job'; + const stagingJobId: string = 'staging-job'; + + capsuleStore.storeCapsule(contract, slot, committedValues, commitJobId); + await capsuleStore.commit(commitJobId); + capsuleStore.storeCapsule(contract, slot, stagedValues, stagingJobId); + + await capsuleStore.discardStaged(stagingJobId); + + // Should still get committed capsule + expect(await capsuleStore.loadCapsule(contract, slot, 'any-job')).toEqual(committedValues); + + // With stagingJobId should fall back to committed since staging was discarded + expect(await capsuleStore.loadCapsule(contract, slot, stagingJobId)).toEqual(committedValues); + }); + }); }); diff --git a/yarn-project/pxe/src/storage/capsule_store/capsule_store.ts b/yarn-project/pxe/src/storage/capsule_store/capsule_store.ts index ac39dc88dafb..3424ff546844 100644 --- a/yarn-project/pxe/src/storage/capsule_store/capsule_store.ts +++ b/yarn-project/pxe/src/storage/capsule_store/capsule_store.ts @@ -3,12 +3,21 @@ import { type Logger, createLogger } from '@aztec/foundation/log'; import type { AztecAsyncKVStore, AztecAsyncMap } from '@aztec/kv-store'; import type { AztecAddress } from '@aztec/stdlib/aztec-address'; -export class CapsuleStore { +import type { StagedStore } from '../../job_coordinator/job_coordinator.js'; + +export class CapsuleStore implements StagedStore { + readonly storeName = 'capsule'; + #store: AztecAsyncKVStore; // Arbitrary data stored by contracts. Key is computed as `${contractAddress}:${key}` #capsules: AztecAsyncMap; + // jobId => `${contractAddress}:${key}` => capsule data + // when `#stagedCapsules.get('some-job-id').get('${some-contract-address:some-key') === null`, + // it signals that the capsule was deleted during the job, so it needs to be deleted on commit + #stagedCapsules: Map>; + logger: Logger; constructor(store: AztecAsyncKVStore) { @@ -16,21 +25,120 @@ export class CapsuleStore { this.#capsules = this.#store.openMap('capsules'); + this.#stagedCapsules = new Map(); + this.logger = createLogger('pxe:capsule-data-provider'); } + /** + * Given a job denoted by `jobId`, it returns the + * capsules that said job has interacted with. + * + * Capsules that haven't been committed to persistence KV storage + * are kept in-memory in `#stagedCapsules`, this method provides a convenient + * way to access that in-memory collection of data. + * + * @param jobId + * @returns + */ + #getJobStagedCapsules(jobId: string): Map { + let jobStagedCapsules = this.#stagedCapsules.get(jobId); + if (!jobStagedCapsules) { + jobStagedCapsules = new Map(); + this.#stagedCapsules.set(jobId, jobStagedCapsules); + } + return jobStagedCapsules; + } + + /** + * Reads a capsule's slot from the staged version of the data associated to the given jobId. + * + * If it is not there, it reads it from the KV store. + */ + async #getFromStage(jobId: string, dbSlotKey: string): Promise { + const jobStagedCapsules = this.#getJobStagedCapsules(jobId); + let staged: Buffer | null | undefined = jobStagedCapsules.get(dbSlotKey); + // Note that if staged === null, we marked it for deletion, so we don't want to + // re-read it from DB + if (staged === undefined) { + // If we don't have a staged version of this dbSlotKey, first we check if there's one in DB + staged = await this.#loadCapsuleFromDb(dbSlotKey); + } + return staged; + } + + /** + * Writes a capsule to the stage of a job. + */ + #setOnStage(jobId: string, dbSlotKey: string, capsuleData: Buffer) { + this.#getJobStagedCapsules(jobId).set(dbSlotKey, capsuleData); + } + + /** + * Deletes a capsule on the stage of a job. Note the capsule will still + * exist in storage until the job is committed. + */ + #deleteOnStage(jobId: string, dbSlotKey: string) { + this.#getJobStagedCapsules(jobId).set(dbSlotKey, null); + } + + async #loadCapsuleFromDb(dbSlotKey: string): Promise { + const dataBuffer = await this.#capsules.getAsync(dbSlotKey); + if (!dataBuffer) { + return null; + } + + return dataBuffer; + } + + /** + * Commits staged data to main storage. + * Called by JobCoordinator when a job completes successfully. + * Note: JobCoordinator wraps all commits in a single transaction, so we don't + * need our own transactionAsync here (and using one would deadlock on IndexedDB). + * @param jobId - The jobId identifying which staged data to commit + */ + async commit(jobId: string): Promise { + const jobStagedCapsules = this.#getJobStagedCapsules(jobId); + + for (const [key, value] of jobStagedCapsules) { + // In the write stage, we represent deleted capsules with null + // (as opposed to undefined, which denotes there was never a capsule there to begin with). + // So we delete from actual KV store here. + if (value === null) { + await this.#capsules.delete(key); + } else { + await this.#capsules.set(key, value); + } + } + + this.#stagedCapsules.delete(jobId); + } + + /** + * Discards staged data without committing. + */ + discardStaged(jobId: string): Promise { + this.#stagedCapsules.delete(jobId); + return Promise.resolve(); + } + /** * Stores arbitrary information in a per-contract non-volatile database, which can later be retrieved with `loadCapsule`. * * If data was already stored at this slot, it is overwritten. * @param contractAddress - The contract address to scope the data under. * @param slot - The slot in the database in which to store the value. Slots need not be contiguous. * @param capsule - An array of field elements representing the capsule. + * @param jobId - The context in which this store will be visible until PXE decides to persist it to underlying KV store * @remarks A capsule is a "blob" of data that is passed to the contract through an oracle. It works similarly * to public contract storage in that it's indexed by the contract address and storage slot but instead of the global * network state it's backed by local PXE db. */ - async storeCapsule(contractAddress: AztecAddress, slot: Fr, capsule: Fr[]): Promise { - await this.#capsules.set(dbSlotToKey(contractAddress, slot), Buffer.concat(capsule.map(value => value.toBuffer()))); + storeCapsule(contractAddress: AztecAddress, slot: Fr, capsule: Fr[], jobId: string) { + const dbSlotKey = dbSlotToKey(contractAddress, slot); + + // A store overrides any pre-existing data on the slot + this.#setOnStage(jobId, dbSlotKey, Buffer.concat(capsule.map(value => value.toBuffer()))); } /** @@ -39,8 +147,8 @@ export class CapsuleStore { * @param slot - The slot in the database to read. * @returns The stored data or `null` if no data is stored under the slot. */ - async loadCapsule(contractAddress: AztecAddress, slot: Fr): Promise { - const dataBuffer = await this.#capsules.getAsync(dbSlotToKey(contractAddress, slot)); + async loadCapsule(contractAddress: AztecAddress, slot: Fr, jobId: string): Promise { + const dataBuffer = await this.#getFromStage(jobId, dbSlotToKey(contractAddress, slot)); if (!dataBuffer) { this.logger.trace(`Data not found for contract ${contractAddress.toString()} and slot ${slot.toString()}`); return null; @@ -57,8 +165,9 @@ export class CapsuleStore { * @param contractAddress - The contract address under which the data is scoped. * @param slot - The slot in the database to delete. */ - async deleteCapsule(contractAddress: AztecAddress, slot: Fr): Promise { - await this.#capsules.delete(dbSlotToKey(contractAddress, slot)); + deleteCapsule(contractAddress: AztecAddress, slot: Fr, jobId: string) { + // When we commit this, we will interpret null as a deletion, so we'll propagate the delete to the KV store + this.#deleteOnStage(jobId, dbSlotToKey(contractAddress, slot)); } /** @@ -72,13 +181,22 @@ export class CapsuleStore { * @param dstSlot - The first slot to copy to. * @param numEntries - The number of entries to copy. */ - copyCapsule(contractAddress: AztecAddress, srcSlot: Fr, dstSlot: Fr, numEntries: number): Promise { + copyCapsule( + contractAddress: AztecAddress, + srcSlot: Fr, + dstSlot: Fr, + numEntries: number, + jobId: string, + ): Promise { + // This transactional context gives us "copy atomicity": + // there shouldn't be concurrent writes to what's being copied here. + // Equally important: this in practice is expected to perform thousands of DB operations + // and not using a transaction here would heavily impact performance. return this.#store.transactionAsync(async () => { // In order to support overlapping source and destination regions, we need to check the relative positions of source // and destination. If destination is ahead of source, then by the time we overwrite source elements using forward // indexes we'll have already read those. On the contrary, if source is ahead of destination we need to use backward // indexes to avoid reading elements that've been overwritten. - const indexes = Array.from(Array(numEntries).keys()); if (srcSlot.lt(dstSlot)) { indexes.reverse(); @@ -88,12 +206,12 @@ export class CapsuleStore { const currentSrcSlot = dbSlotToKey(contractAddress, srcSlot.add(new Fr(i))); const currentDstSlot = dbSlotToKey(contractAddress, dstSlot.add(new Fr(i))); - const toCopy = await this.#capsules.getAsync(currentSrcSlot); + const toCopy = await this.#getFromStage(jobId, currentSrcSlot); if (!toCopy) { throw new Error(`Attempted to copy empty slot ${currentSrcSlot} for contract ${contractAddress.toString()}`); } - await this.#capsules.set(currentDstSlot, toCopy); + this.#setOnStage(jobId, currentDstSlot, toCopy); } }); } @@ -106,35 +224,45 @@ export class CapsuleStore { * @param baseSlot - The slot where the array length is stored * @param content - Array of capsule data to append */ - appendToCapsuleArray(contractAddress: AztecAddress, baseSlot: Fr, content: Fr[][]): Promise { + appendToCapsuleArray(contractAddress: AztecAddress, baseSlot: Fr, content: Fr[][], jobId: string): Promise { + // We wrap this in a transaction to serialize concurrent calls from Promise.all. + // Without this, concurrent appends to the same array could race: both read length=0, + // both write at the same slots, one overwrites the other. + // Equally important: this in practice is expected to perform thousands of DB operations + // and not using a transaction here would heavily impact performance. return this.#store.transactionAsync(async () => { // Load current length, defaulting to 0 if not found - const lengthData = await this.loadCapsule(contractAddress, baseSlot); + const lengthData = await this.loadCapsule(contractAddress, baseSlot, jobId); const currentLength = lengthData ? lengthData[0].toNumber() : 0; // Store each capsule at consecutive slots after baseSlot + 1 + currentLength for (let i = 0; i < content.length; i++) { const nextSlot = arraySlot(baseSlot, currentLength + i); - await this.storeCapsule(contractAddress, nextSlot, content[i]); + this.storeCapsule(contractAddress, nextSlot, content[i], jobId); } // Update length to include all new capsules const newLength = currentLength + content.length; - await this.storeCapsule(contractAddress, baseSlot, [new Fr(newLength)]); + this.storeCapsule(contractAddress, baseSlot, [new Fr(newLength)], jobId); }); } - readCapsuleArray(contractAddress: AztecAddress, baseSlot: Fr): Promise { + readCapsuleArray(contractAddress: AztecAddress, baseSlot: Fr, jobId: string): Promise { + // I'm leaving this transactional context here though because I'm assuming this + // gives us "read array atomicity": there shouldn't be concurrent writes to what's being copied + // here. + // This is one point we should revisit in the future if we want to relax the concurrency + // of jobs: different calls running concurrently on the same contract may cause trouble. return this.#store.transactionAsync(async () => { // Load length, defaulting to 0 if not found - const maybeLength = await this.loadCapsule(contractAddress, baseSlot); + const maybeLength = await this.loadCapsule(contractAddress, baseSlot, jobId); const length = maybeLength ? maybeLength[0].toBigInt() : 0n; const values: Fr[][] = []; // Read each capsule at consecutive slots after baseSlot for (let i = 0; i < length; i++) { - const currentValue = await this.loadCapsule(contractAddress, arraySlot(baseSlot, i)); + const currentValue = await this.loadCapsule(contractAddress, arraySlot(baseSlot, i), jobId); if (currentValue == undefined) { throw new Error( `Expected non-empty value at capsule array in base slot ${baseSlot} at index ${i} for contract ${contractAddress}`, @@ -148,23 +276,31 @@ export class CapsuleStore { }); } - setCapsuleArray(contractAddress: AztecAddress, baseSlot: Fr, content: Fr[][]) { + setCapsuleArray(contractAddress: AztecAddress, baseSlot: Fr, content: Fr[][], jobId: string) { + // This transactional context in theory isn't so critical now because we aren't + // writing to DB so if there's exceptions midway and it blows up, no visible impact + // to persistent storage will happen. + // I'm leaving this transactional context here though because I'm assuming this + // gives us "write array atomicity": there shouldn't be concurrent writes to what's being copied + // here. + // This is one point we should revisit in the future if we want to relax the concurrency + // of jobs: different calls running concurrently on the same contract may cause trouble. return this.#store.transactionAsync(async () => { // Load current length, defaulting to 0 if not found - const maybeLength = await this.loadCapsule(contractAddress, baseSlot); + const maybeLength = await this.loadCapsule(contractAddress, baseSlot, jobId); const originalLength = maybeLength ? maybeLength[0].toNumber() : 0; // Set the new length - await this.storeCapsule(contractAddress, baseSlot, [new Fr(content.length)]); + this.storeCapsule(contractAddress, baseSlot, [new Fr(content.length)], jobId); // Store the new content, possibly overwriting existing values for (let i = 0; i < content.length; i++) { - await this.storeCapsule(contractAddress, arraySlot(baseSlot, i), content[i]); + this.storeCapsule(contractAddress, arraySlot(baseSlot, i), content[i], jobId); } // Clear any stragglers for (let i = content.length; i < originalLength; i++) { - await this.deleteCapsule(contractAddress, arraySlot(baseSlot, i)); + this.deleteCapsule(contractAddress, arraySlot(baseSlot, i), jobId); } }); } diff --git a/yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.test.ts b/yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.test.ts new file mode 100644 index 000000000000..4a578ef3347c --- /dev/null +++ b/yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.test.ts @@ -0,0 +1,83 @@ +import { Fr } from '@aztec/foundation/curves/bn254'; +import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; +import { DirectionalAppTaggingSecret } from '@aztec/stdlib/logs'; + +import { RecipientTaggingStore } from './recipient_tagging_store.js'; + +describe('RecipientTaggingStore', () => { + let taggingStore: RecipientTaggingStore; + let secret1: DirectionalAppTaggingSecret; + let secret2: DirectionalAppTaggingSecret; + + beforeEach(async () => { + taggingStore = new RecipientTaggingStore(await openTmpStore('test')); + secret1 = DirectionalAppTaggingSecret.fromString(Fr.random().toString()); + secret2 = DirectionalAppTaggingSecret.fromString(Fr.random().toString()); + }); + + describe('staged writes', () => { + it('persists staged highest aged index to the store', async () => { + await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); + + expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBeUndefined(); + + await taggingStore.commit('job1'); + + expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(5); + }); + + it('persists staged highest finalized index to the store', async () => { + await taggingStore.updateHighestFinalizedIndex(secret1, 10, 'job1'); + + expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBeUndefined(); + + await taggingStore.commit('job1'); + + expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBe(10); + }); + + it('persists multiple secrets for the same job', async () => { + await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); + await taggingStore.updateHighestAgedIndex(secret2, 8, 'job1'); + await taggingStore.updateHighestFinalizedIndex(secret1, 3, 'job1'); + await taggingStore.updateHighestFinalizedIndex(secret2, 6, 'job1'); + + await taggingStore.commit('job1'); + + expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(5); + expect(await taggingStore.getHighestAgedIndex(secret2, 'job2')).toBe(8); + expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBe(3); + expect(await taggingStore.getHighestFinalizedIndex(secret2, 'job2')).toBe(6); + }); + + it('clears staged data after commit', async () => { + await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); + await taggingStore.commit('job1'); + + // Updating again with a higher value in the same job should work + // (if staged data wasn't cleared, it would still have the old value cached) + await taggingStore.updateHighestAgedIndex(secret1, 10, 'job2'); + expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(10); + await taggingStore.commit('job2'); + + await taggingStore.getHighestAgedIndex(secret1, 'job1'); + expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBe(10); + }); + + it('does not affect other jobs when committing', async () => { + await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); + await taggingStore.updateHighestAgedIndex(secret1, 10, 'job2'); + + await taggingStore.commit('job1'); + + // job2's staged value should still be intact + expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(10); + }); + + it('discards staged highest aged index without persisting', async () => { + await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); + await taggingStore.discardStaged('job1'); + expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBeUndefined(); + }); + }); +}); diff --git a/yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.ts b/yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.ts index 3c9c4d77a883..a31a3bb540d4 100644 --- a/yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.ts +++ b/yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.ts @@ -1,6 +1,8 @@ import type { AztecAsyncKVStore, AztecAsyncMap } from '@aztec/kv-store'; import type { DirectionalAppTaggingSecret } from '@aztec/stdlib/logs'; +import type { StagedStore } from '../../job_coordinator/job_coordinator.js'; + /** * Data provider of tagging data used when syncing the logs as a recipient. The sender counterpart of this class * is called SenderTaggingStore. We have the providers separate for the sender and recipient because @@ -11,43 +13,121 @@ import type { DirectionalAppTaggingSecret } from '@aztec/stdlib/logs'; * * TODO(benesjan): Relocate to yarn-project/pxe/src/storage/tagging_store */ -export class RecipientTaggingStore { +export class RecipientTaggingStore implements StagedStore { + storeName: string = 'recipient_tagging'; + #store: AztecAsyncKVStore; #highestAgedIndex: AztecAsyncMap; #highestFinalizedIndex: AztecAsyncMap; + // jobId => secret => number + #stagedHighestAgedIndex: Map>; + + // jobId => secret => number + #stagedHighestFinalizedIndex: Map>; + constructor(store: AztecAsyncKVStore) { this.#store = store; this.#highestAgedIndex = this.#store.openMap('highest_aged_index'); this.#highestFinalizedIndex = this.#store.openMap('highest_finalized_index'); + + this.#stagedHighestAgedIndex = new Map(); + this.#stagedHighestFinalizedIndex = new Map(); + } + + #getJobStagedHighestAgedIndex(jobId: string): Map { + let jobStagedHighestAgedIndex = this.#stagedHighestAgedIndex.get(jobId); + if (!jobStagedHighestAgedIndex) { + jobStagedHighestAgedIndex = new Map(); + this.#stagedHighestAgedIndex.set(jobId, jobStagedHighestAgedIndex); + } + return jobStagedHighestAgedIndex; + } + + async #getHighestFinalizedIndexFromStage(jobId: string, secret: string): Promise { + let staged = this.#getJobStagedHighestFinalizedIndex(jobId).get(secret); + if (staged === undefined) { + staged = await this.#highestFinalizedIndex.getAsync(secret); + } + return staged; + } + + async #getHighestAgedIndexFromStage(jobId: string, secret: string): Promise { + let staged = this.#getJobStagedHighestAgedIndex(jobId).get(secret); + if (staged === undefined) { + staged = await this.#highestAgedIndex.getAsync(secret); + } + return staged; + } + + #setHighestAgedIndexOnStage(jobId: string, secret: string, index: number) { + this.#getJobStagedHighestAgedIndex(jobId).set(secret, index); + } + + #getJobStagedHighestFinalizedIndex(jobId: string): Map { + let jobStagedHighestFinalizedIndex = this.#stagedHighestFinalizedIndex.get(jobId); + if (!jobStagedHighestFinalizedIndex) { + jobStagedHighestFinalizedIndex = new Map(); + this.#stagedHighestFinalizedIndex.set(jobId, jobStagedHighestFinalizedIndex); + } + return jobStagedHighestFinalizedIndex; + } + + #setHighestFinalizedIndexOnStage(jobId: string, secret: string, index: number) { + this.#getJobStagedHighestFinalizedIndex(jobId).set(secret, index); + } + + async commit(jobId: string): Promise { + const stagedHighestAgedIndex = this.#stagedHighestAgedIndex.get(jobId); + if (stagedHighestAgedIndex) { + for (const [secret, index] of stagedHighestAgedIndex.entries()) { + await this.#highestAgedIndex.set(secret, index); + } + } + + const stagedHighestFinalizedIndex = this.#stagedHighestFinalizedIndex.get(jobId); + if (stagedHighestFinalizedIndex) { + for (const [secret, index] of stagedHighestFinalizedIndex.entries()) { + await this.#highestFinalizedIndex.set(secret, index); + } + } + + this.#stagedHighestAgedIndex.delete(jobId); + this.#stagedHighestFinalizedIndex.delete(jobId); + } + + discardStaged(jobId: string): Promise { + this.#stagedHighestAgedIndex.delete(jobId); + this.#stagedHighestFinalizedIndex.delete(jobId); + return Promise.resolve(); } - getHighestAgedIndex(secret: DirectionalAppTaggingSecret): Promise { - return this.#highestAgedIndex.getAsync(secret.toString()); + getHighestAgedIndex(secret: DirectionalAppTaggingSecret, jobId: string): Promise { + return this.#getHighestAgedIndexFromStage(jobId, secret.toString()); } - async updateHighestAgedIndex(secret: DirectionalAppTaggingSecret, index: number): Promise { - const currentIndex = await this.#highestAgedIndex.getAsync(secret.toString()); + async updateHighestAgedIndex(secret: DirectionalAppTaggingSecret, index: number, jobId: string): Promise { + const currentIndex = await this.#getHighestAgedIndexFromStage(jobId, secret.toString()); if (currentIndex !== undefined && index <= currentIndex) { // Log sync should never set a lower highest aged index. throw new Error(`New highest aged index (${index}) must be higher than the current one (${currentIndex})`); } - await this.#highestAgedIndex.set(secret.toString(), index); + this.#setHighestAgedIndexOnStage(jobId, secret.toString(), index); } - getHighestFinalizedIndex(secret: DirectionalAppTaggingSecret): Promise { - return this.#highestFinalizedIndex.getAsync(secret.toString()); + getHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, jobId: string): Promise { + return this.#getHighestFinalizedIndexFromStage(jobId, secret.toString()); } - async updateHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, index: number): Promise { - const currentIndex = await this.#highestFinalizedIndex.getAsync(secret.toString()); + async updateHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, index: number, jobId: string): Promise { + const currentIndex = await this.#getHighestFinalizedIndexFromStage(jobId, secret.toString()); if (currentIndex !== undefined && index < currentIndex) { // Log sync should never set a lower highest finalized index but it can happen that it would try to set the same // one because we are loading logs from highest aged index + 1 and not from the highest finalized index. throw new Error(`New highest finalized index (${index}) must be higher than the current one (${currentIndex})`); } - await this.#highestFinalizedIndex.set(secret.toString(), index); + this.#setHighestFinalizedIndexOnStage(jobId, secret.toString(), index); } } diff --git a/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.test.ts b/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.test.ts index 19d3073b223c..e87ad857e169 100644 --- a/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.test.ts +++ b/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.test.ts @@ -22,9 +22,9 @@ describe('SenderTaggingStore', () => { const txHash = TxHash.random(); const preTag: PreTag = { secret: secret1, index: 5 }; - await taggingStore.storePendingIndexes([preTag], txHash); + await taggingStore.storePendingIndexes([preTag], txHash, 'test'); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes).toHaveLength(1); expect(txHashes[0]).toEqual(txHash); }); @@ -36,13 +36,13 @@ describe('SenderTaggingStore', () => { { secret: secret2, index: 7 }, ]; - await taggingStore.storePendingIndexes(preTags, txHash); + await taggingStore.storePendingIndexes(preTags, txHash, 'test'); - const txHashes1 = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes1 = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes1).toHaveLength(1); expect(txHashes1[0]).toEqual(txHash); - const txHashes2 = await taggingStore.getTxHashesOfPendingIndexes(secret2, 0, 10); + const txHashes2 = await taggingStore.getTxHashesOfPendingIndexes(secret2, 0, 10, 'test'); expect(txHashes2).toHaveLength(1); expect(txHashes2[0]).toEqual(txHash); }); @@ -51,10 +51,10 @@ describe('SenderTaggingStore', () => { const txHash1 = TxHash.random(); const txHash2 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, 'test'); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes).toHaveLength(2); expect(txHashes).toContainEqual(txHash1); expect(txHashes).toContainEqual(txHash2); @@ -64,10 +64,10 @@ describe('SenderTaggingStore', () => { const txHash = TxHash.random(); const preTag: PreTag = { secret: secret1, index: 5 }; - await taggingStore.storePendingIndexes([preTag], txHash); - await taggingStore.storePendingIndexes([preTag], txHash); + await taggingStore.storePendingIndexes([preTag], txHash, 'test'); + await taggingStore.storePendingIndexes([preTag], txHash, 'test'); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes).toHaveLength(1); expect(txHashes[0]).toEqual(txHash); }); @@ -79,7 +79,7 @@ describe('SenderTaggingStore', () => { { secret: secret1, index: 7 }, ]; - await expect(taggingStore.storePendingIndexes(preTags, txHash)).rejects.toThrow( + await expect(taggingStore.storePendingIndexes(preTags, txHash, 'test')).rejects.toThrow( 'Duplicate secrets found when storing pending indexes', ); }); @@ -88,10 +88,10 @@ describe('SenderTaggingStore', () => { const txHash = TxHash.random(); // First store an index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash, 'test'); // Try to store a different index for the same secret + txHash pair - await expect(taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash)).rejects.toThrow( + await expect(taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash, 'test')).rejects.toThrow( /Cannot store index 7.*a different index 5 already exists/, ); }); @@ -101,11 +101,11 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); // First store and finalize an index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Try to store a pending index lower than the finalized index - await expect(taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2)).rejects.toThrow( + await expect(taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2, 'test')).rejects.toThrow( /Cannot store pending index 5.*lower than or equal to the last finalized index 10/, ); }); @@ -115,11 +115,11 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); // First store and finalize an index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Try to store a pending index equal to the finalized index - await expect(taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash2)).rejects.toThrow( + await expect(taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash2, 'test')).rejects.toThrow( /Cannot store pending index 10.*lower than or equal to the last finalized index 10/, ); }); @@ -129,13 +129,15 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); // First store and finalize an index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Store a pending index higher than the finalized index - should succeed - await expect(taggingStore.storePendingIndexes([{ secret: secret1, index: 15 }], txHash2)).resolves.not.toThrow(); + await expect( + taggingStore.storePendingIndexes([{ secret: secret1, index: 15 }], txHash2, 'test'), + ).resolves.not.toThrow(); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 20); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 20, 'test'); expect(txHashes).toHaveLength(1); expect(txHashes[0]).toEqual(txHash2); }); @@ -148,12 +150,12 @@ describe('SenderTaggingStore', () => { const indexBeyondWindow = finalizedIndex + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN + 1; // First store and finalize an index - await taggingStore.storePendingIndexes([{ secret: secret1, index: finalizedIndex }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: finalizedIndex }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Try to store an index beyond the window await expect( - taggingStore.storePendingIndexes([{ secret: secret1, index: indexBeyondWindow }], txHash2), + taggingStore.storePendingIndexes([{ secret: secret1, index: indexBeyondWindow }], txHash2, 'test'), ).rejects.toThrow( `Highest used index ${indexBeyondWindow} is further than window length from the highest finalized index ${finalizedIndex}`, ); @@ -166,15 +168,15 @@ describe('SenderTaggingStore', () => { const indexAtBoundary = finalizedIndex + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN; // First store and finalize an index - await taggingStore.storePendingIndexes([{ secret: secret1, index: finalizedIndex }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: finalizedIndex }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Store an index at the boundary, but check is >, so it should succeed await expect( - taggingStore.storePendingIndexes([{ secret: secret1, index: indexAtBoundary }], txHash2), + taggingStore.storePendingIndexes([{ secret: secret1, index: indexAtBoundary }], txHash2, 'test'), ).resolves.not.toThrow(); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, indexAtBoundary + 5); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, indexAtBoundary + 5, 'test'); expect(txHashes).toHaveLength(1); expect(txHashes[0]).toEqual(txHash2); }); @@ -183,7 +185,7 @@ describe('SenderTaggingStore', () => { describe('getTxHashesOfPendingIndexes', () => { it('returns empty array when no pending indexes exist', async () => { - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes).toEqual([]); }); @@ -192,11 +194,11 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); const txHash3 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 8 }], txHash3); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 8 }], txHash3, 'test'); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 4, 9); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 4, 9, 'test'); expect(txHashes).toHaveLength(2); expect(txHashes).toContainEqual(txHash2); expect(txHashes).toContainEqual(txHash3); @@ -207,10 +209,10 @@ describe('SenderTaggingStore', () => { const txHash1 = TxHash.random(); const txHash2 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash2, 'test'); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 5, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 5, 10, 'test'); expect(txHashes).toHaveLength(1); expect(txHashes[0]).toEqual(txHash1); }); @@ -221,15 +223,15 @@ describe('SenderTaggingStore', () => { const txHash3 = TxHash.random(); const txHash4 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2, 'test'); // We store different secret with txHash1 to check we correctly don't return it in the result - await taggingStore.storePendingIndexes([{ secret: secret2, index: 7 }], txHash1); + await taggingStore.storePendingIndexes([{ secret: secret2, index: 7 }], txHash1, 'test'); // Store "parallel" index for secret1 with a different tx (can happen when sending logs from multiple PXEs) - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash3); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash4); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash3, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash4, 'test'); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); // Should have 3 unique tx hashes for secret1 expect(txHashes).toEqual(expect.arrayContaining([txHash1, txHash2, txHash3, txHash4])); }); @@ -237,32 +239,32 @@ describe('SenderTaggingStore', () => { describe('getLastFinalizedIndex', () => { it('returns undefined when no finalized index exists', async () => { - const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1); + const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1, 'test'); expect(lastFinalized).toBeUndefined(); }); it('returns the last finalized index after finalizePendingIndexes', async () => { const txHash = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash); - await taggingStore.finalizePendingIndexes([txHash]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash, 'test'); + await taggingStore.finalizePendingIndexes([txHash], 'test'); - const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1); + const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1, 'test'); expect(lastFinalized).toBe(5); }); }); describe('getLastUsedIndex', () => { it('returns undefined when no indexes exist', async () => { - const lastUsed = await taggingStore.getLastUsedIndex(secret1); + const lastUsed = await taggingStore.getLastUsedIndex(secret1, 'test'); expect(lastUsed).toBeUndefined(); }); it('returns the last finalized index when no pending indexes exist', async () => { const txHash = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash); - await taggingStore.finalizePendingIndexes([txHash]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash, 'test'); + await taggingStore.finalizePendingIndexes([txHash], 'test'); - const lastUsed = await taggingStore.getLastUsedIndex(secret1); + const lastUsed = await taggingStore.getLastUsedIndex(secret1, 'test'); expect(lastUsed).toBe(5); }); @@ -271,13 +273,13 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); // First, finalize an index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Then add a higher pending index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, 'test'); - const lastUsed = await taggingStore.getLastUsedIndex(secret1); + const lastUsed = await taggingStore.getLastUsedIndex(secret1, 'test'); expect(lastUsed).toBe(7); }); @@ -286,11 +288,11 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); const txHash3 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash3); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash3, 'test'); - const lastUsed = await taggingStore.getLastUsedIndex(secret1); + const lastUsed = await taggingStore.getLastUsedIndex(secret1, 'test'); expect(lastUsed).toBe(7); }); }); @@ -300,19 +302,19 @@ describe('SenderTaggingStore', () => { const txHash1 = TxHash.random(); const txHash2 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret2, index: 5 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret2, index: 5 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, 'test'); - await taggingStore.dropPendingIndexes([txHash1]); + await taggingStore.dropPendingIndexes([txHash1], 'test'); // txHash1 should be removed - const txHashes1 = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes1 = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes1).toHaveLength(1); expect(txHashes1[0]).toEqual(txHash2); // txHash1 should also be removed from secret2 - const txHashes2 = await taggingStore.getTxHashesOfPendingIndexes(secret2, 0, 10); + const txHashes2 = await taggingStore.getTxHashesOfPendingIndexes(secret2, 0, 10, 'test'); expect(txHashes2).toEqual([]); }); }); @@ -320,15 +322,15 @@ describe('SenderTaggingStore', () => { describe('finalizePendingIndexes', () => { it('moves pending index to finalized for a given tx hash', async () => { const txHash = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash, 'test'); - await taggingStore.finalizePendingIndexes([txHash]); + await taggingStore.finalizePendingIndexes([txHash], 'test'); - const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1); + const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1, 'test'); expect(lastFinalized).toBe(5); // Pending index should be removed - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes).toEqual([]); }); @@ -336,13 +338,13 @@ describe('SenderTaggingStore', () => { const txHash1 = TxHash.random(); const txHash2 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2); - await taggingStore.finalizePendingIndexes([txHash2]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, 'test'); + await taggingStore.finalizePendingIndexes([txHash2], 'test'); - const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1); + const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1, 'test'); expect(lastFinalized).toBe(7); }); @@ -351,16 +353,16 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); // Store both pending indexes first - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash2, 'test'); // Finalize the higher index first - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Then try to finalize the lower index - await taggingStore.finalizePendingIndexes([txHash2]); + await taggingStore.finalizePendingIndexes([txHash2], 'test'); - const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1); + const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1, 'test'); expect(lastFinalized).toBe(7); // Should remain at 7 }); @@ -369,16 +371,16 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); const txHash3 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash3); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash3, 'test'); // Finalize txHash2 (index 5) - await taggingStore.finalizePendingIndexes([txHash2]); + await taggingStore.finalizePendingIndexes([txHash2], 'test'); // txHash1 (index 3) should be pruned as it's lower than finalized // txHash3 (index 7) should remain - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes).toHaveLength(1); expect(txHashes[0]).toEqual(txHash3); }); @@ -391,12 +393,13 @@ describe('SenderTaggingStore', () => { { secret: secret2, index: 7 }, ], txHash, + 'test', ); - await taggingStore.finalizePendingIndexes([txHash]); + await taggingStore.finalizePendingIndexes([txHash], 'test'); - const lastFinalized1 = await taggingStore.getLastFinalizedIndex(secret1); - const lastFinalized2 = await taggingStore.getLastFinalizedIndex(secret2); + const lastFinalized1 = await taggingStore.getLastFinalizedIndex(secret1, 'test'); + const lastFinalized2 = await taggingStore.getLastFinalizedIndex(secret2, 'test'); expect(lastFinalized1).toBe(3); expect(lastFinalized2).toBe(7); @@ -404,16 +407,16 @@ describe('SenderTaggingStore', () => { it('does nothing when tx hash does not exist', async () => { const txHash = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash, 'test'); - await taggingStore.finalizePendingIndexes([TxHash.random()]); + await taggingStore.finalizePendingIndexes([TxHash.random()], 'test'); // Original pending index should still be there - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes).toHaveLength(1); // Finalized index should not be set - const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1); + const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1, 'test'); expect(lastFinalized).toBeUndefined(); }); }); @@ -424,39 +427,39 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); // Step 1: Add pending index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - expect(await taggingStore.getLastUsedIndex(secret1)).toBe(3); - expect(await taggingStore.getLastFinalizedIndex(secret1)).toBeUndefined(); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + expect(await taggingStore.getLastUsedIndex(secret1, 'test')).toBe(3); + expect(await taggingStore.getLastFinalizedIndex(secret1, 'test')).toBeUndefined(); // Step 2: Finalize the index - await taggingStore.finalizePendingIndexes([txHash1]); - expect(await taggingStore.getLastUsedIndex(secret1)).toBe(3); - expect(await taggingStore.getLastFinalizedIndex(secret1)).toBe(3); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); + expect(await taggingStore.getLastUsedIndex(secret1, 'test')).toBe(3); + expect(await taggingStore.getLastFinalizedIndex(secret1, 'test')).toBe(3); // Step 3: Add a new higher pending index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2); - expect(await taggingStore.getLastUsedIndex(secret1)).toBe(7); - expect(await taggingStore.getLastFinalizedIndex(secret1)).toBe(3); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, 'test'); + expect(await taggingStore.getLastUsedIndex(secret1, 'test')).toBe(7); + expect(await taggingStore.getLastFinalizedIndex(secret1, 'test')).toBe(3); // Step 4: Finalize the new index - await taggingStore.finalizePendingIndexes([txHash2]); - expect(await taggingStore.getLastUsedIndex(secret1)).toBe(7); - expect(await taggingStore.getLastFinalizedIndex(secret1)).toBe(7); + await taggingStore.finalizePendingIndexes([txHash2], 'test'); + expect(await taggingStore.getLastUsedIndex(secret1, 'test')).toBe(7); + expect(await taggingStore.getLastFinalizedIndex(secret1, 'test')).toBe(7); }); it('handles dropped transactions', async () => { const txHash1 = TxHash.random(); const txHash2 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2, 'test'); - expect(await taggingStore.getLastUsedIndex(secret1)).toBe(5); + expect(await taggingStore.getLastUsedIndex(secret1, 'test')).toBe(5); // Drop txHash2 - await taggingStore.dropPendingIndexes([txHash2]); + await taggingStore.dropPendingIndexes([txHash2], 'test'); - expect(await taggingStore.getLastUsedIndex(secret1)).toBe(3); + expect(await taggingStore.getLastUsedIndex(secret1, 'test')).toBe(3); }); it('handles multiple secrets with different lifecycles', async () => { @@ -465,19 +468,160 @@ describe('SenderTaggingStore', () => { const txHash3 = TxHash.random(); // Secret1: pending -> finalized - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Secret2: pending (not finalized) - await taggingStore.storePendingIndexes([{ secret: secret2, index: 5 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret2, index: 5 }], txHash2, 'test'); // Secret1: new pending - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash3); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash3, 'test'); - expect(await taggingStore.getLastFinalizedIndex(secret1)).toBe(3); - expect(await taggingStore.getLastUsedIndex(secret1)).toBe(7); - expect(await taggingStore.getLastFinalizedIndex(secret2)).toBeUndefined(); - expect(await taggingStore.getLastUsedIndex(secret2)).toBe(5); + expect(await taggingStore.getLastFinalizedIndex(secret1, 'test')).toBe(3); + expect(await taggingStore.getLastUsedIndex(secret1, 'test')).toBe(7); + expect(await taggingStore.getLastFinalizedIndex(secret2, 'test')).toBeUndefined(); + expect(await taggingStore.getLastUsedIndex(secret2, 'test')).toBe(5); + }); + }); + + describe('staging', () => { + it('writes to staging when jobId provided', async () => { + const committedTxHash = TxHash.random(); + const stagedTxHash = TxHash.random(); + const commitJobId: string = 'commit-job'; + const stagingJobId: string = 'staging-job'; + const anotherStagingJobId: string = 'another-staging-job-id'; + + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], committedTxHash, commitJobId); + await taggingStore.commit(commitJobId); + + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], stagedTxHash, stagingJobId); + + // Without jobId, should only get committed data + const txHashesWithoutJobId = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, anotherStagingJobId); + expect(txHashesWithoutJobId).toHaveLength(1); + expect(txHashesWithoutJobId[0]).toEqual(committedTxHash); + + // With stagingJobId, should get both committed and staged data + const txHashesWithJobId = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, stagingJobId); + expect(txHashesWithJobId).toHaveLength(2); + expect(txHashesWithJobId).toContainEqual(committedTxHash); + expect(txHashesWithJobId).toContainEqual(stagedTxHash); + }); + + it('stages finalized indexes separately', async () => { + const txHash1 = TxHash.random(); + const txHash2 = TxHash.random(); + const commitJobId: string = 'commit-job'; + const stagingJobId: string = 'staging-job'; + const anotherStagingJobId: string = 'another-staging-job-id'; + + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, commitJobId); + await taggingStore.finalizePendingIndexes([txHash1], commitJobId); + await taggingStore.commit(commitJobId); + + // Stage a higher finalized index (not committed) + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, stagingJobId); + await taggingStore.finalizePendingIndexes([txHash2], stagingJobId); + + // Without a different jobId, should get the committed finalized index + expect(await taggingStore.getLastFinalizedIndex(secret1, anotherStagingJobId)).toBe(3); + + // With stagingJobId, should get the staged finalized index + expect(await taggingStore.getLastFinalizedIndex(secret1, stagingJobId)).toBe(7); + }); + + it('commit promotes staged data to main', async () => { + const txHash1 = TxHash.random(); + const txHash2 = TxHash.random(); + const commitJobId: string = 'commit-job'; + const stagingJobId: string = 'staging-job'; + const anotherStagingJobId: string = 'another-staging-job-id'; + + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, commitJobId); + await taggingStore.finalizePendingIndexes([txHash1], commitJobId); + await taggingStore.commit(commitJobId); + + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, stagingJobId); + await taggingStore.finalizePendingIndexes([txHash2], stagingJobId); + await taggingStore.commit(stagingJobId); + + // Since we committed, we should get the previously staged data from a different job + expect(await taggingStore.getLastFinalizedIndex(secret1, anotherStagingJobId)).toBe(7); + }); + + it('discardStaged removes staged data without affecting main', async () => { + const txHash1 = TxHash.random(); + const txHash2 = TxHash.random(); + const commitJobId: string = 'commit-job'; + const stagingJobId: string = 'staging-job'; + const anotherStagingJobId: string = 'another-staging-job-id'; + + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, commitJobId); + await taggingStore.finalizePendingIndexes([txHash1], commitJobId); + await taggingStore.commit(commitJobId); + + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, stagingJobId); + await taggingStore.finalizePendingIndexes([txHash2], stagingJobId); + + await taggingStore.discardStaged(stagingJobId); + + // Should still get the committed finalized index + expect(await taggingStore.getLastFinalizedIndex(secret1, anotherStagingJobId)).toBe(3); + + // With stagingJobId should fall back to committed since staging was discarded + expect(await taggingStore.getLastFinalizedIndex(secret1, stagingJobId)).toBe(3); + }); + + it('stages pending and finalized index operations independently', async () => { + const txHash1 = TxHash.random(); + const txHash2 = TxHash.random(); + const txHash3 = TxHash.random(); + const commitJobId: string = 'commit-job'; + const stagingJobId: string = 'staging-job'; + const anotherStagingJobId: string = 'another-staging-job-id'; + + // Committed: index 3 pending + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, commitJobId); + await taggingStore.commit(commitJobId); + + // Staged: index 5 pending, then finalize it + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2, stagingJobId); + await taggingStore.finalizePendingIndexes([txHash2], stagingJobId); + + // Staged: add another pending index + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash3, stagingJobId); + + // With a different jobId: + // - Should see pending: txHash1 (index 3) + // - No finalized index + expect(await taggingStore.getLastFinalizedIndex(secret1, anotherStagingJobId)).toBeUndefined(); + expect(await taggingStore.getLastUsedIndex(secret1, anotherStagingJobId)).toBe(3); + + // With stagingJobId: + // - Should see finalized: 5 + // - Should see pending: txHash1 (index 3), txHash3 (index 7) + // - Last used should be max(finalized=5, pending={3,7}) = 7 + expect(await taggingStore.getLastFinalizedIndex(secret1, stagingJobId)).toBe(5); + expect(await taggingStore.getLastUsedIndex(secret1, stagingJobId)).toBe(7); + }); + + it('drops pending indexes in staging correctly', async () => { + const txHash1 = TxHash.random(); + const txHash2 = TxHash.random(); + const stagingJobId: string = 'staging-job'; + + // Store both pending indexes with staging + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, stagingJobId); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2, stagingJobId); + + // Drop one in staging + await taggingStore.dropPendingIndexes([txHash1], stagingJobId); + + // With stagingJobId, should only see txHash2 + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, stagingJobId); + expect(txHashes).toHaveLength(1); + expect(txHashes[0]).toEqual(txHash2); }); }); }); diff --git a/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.ts b/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.ts index d95f1491adc1..54e968a671bf 100644 --- a/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.ts +++ b/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.ts @@ -3,6 +3,7 @@ import type { AztecAsyncKVStore, AztecAsyncMap } from '@aztec/kv-store'; import type { DirectionalAppTaggingSecret, PreTag } from '@aztec/stdlib/logs'; import { TxHash } from '@aztec/stdlib/tx'; +import type { StagedStore } from '../../job_coordinator/job_coordinator.js'; import { UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN } from '../../tagging/constants.js'; /** @@ -10,7 +11,9 @@ import { UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN } from '../../tagging/constants. * is called RecipientTaggingStore. We have the providers separate for the sender and recipient because * the algorithms are completely disjoint and there is not data reuse between the two. */ -export class SenderTaggingStore { +export class SenderTaggingStore implements StagedStore { + readonly storeName = 'sender_tagging'; + #store: AztecAsyncKVStore; // Stores the pending indexes for each directional app tagging secret. Pending here means that the tx that contained @@ -21,18 +24,119 @@ export class SenderTaggingStore { // the smaller ones are irrelevant due to tx atomicity. // // TODO(#17615): This assumes no logs are used in the non-revertible phase. + // + // directional app tagging secret => { pending index, txHash }[] #pendingIndexes: AztecAsyncMap; + // jobId => directional app tagging secret => { pending index, txHash }[] + #stagedPendingIndexes: Map>; + // Stores the last (highest) finalized index for each directional app tagging secret. We care only about the last // index because unlike the pending indexes, it will never happen that a finalized index would be removed and hence // we don't need to store the history. + // + // directional app tagging secret => highest finalized index #lastFinalizedIndexes: AztecAsyncMap; + // jobId => directional app tagging secret => highest finalized index + // note: null means "checked kv store, there was nothing" + #stagedLastFinalizedIndexes: Map>; + constructor(store: AztecAsyncKVStore) { this.#store = store; this.#pendingIndexes = this.#store.openMap('pending_indexes'); this.#lastFinalizedIndexes = this.#store.openMap('last_finalized_indexes'); + + this.#stagedPendingIndexes = new Map(); + this.#stagedLastFinalizedIndexes = new Map(); + } + + #getJobStagedPendingIndexes(jobId: string): Map { + let jobStagedPendingIndexes = this.#stagedPendingIndexes.get(jobId); + if (!jobStagedPendingIndexes) { + jobStagedPendingIndexes = new Map(); + this.#stagedPendingIndexes.set(jobId, jobStagedPendingIndexes); + } + return jobStagedPendingIndexes; + } + + #getJobStagedLastFinalizedIndexes(jobId: string): Map { + let jobStagedLastFinalizedIndexes = this.#stagedLastFinalizedIndexes.get(jobId); + if (!jobStagedLastFinalizedIndexes) { + jobStagedLastFinalizedIndexes = new Map(); + this.#stagedLastFinalizedIndexes.set(jobId, jobStagedLastFinalizedIndexes); + } + return jobStagedLastFinalizedIndexes; + } + + async #getPendingIndexes(jobId: string, secret: string): Promise<{ index: number; txHash: string }[]> { + const jobStagedPendingIndexes = this.#getJobStagedPendingIndexes(jobId); + let staged: { index: number; txHash: string }[] | undefined = jobStagedPendingIndexes.get(secret); + if (staged === undefined) { + // If we don't have a staged version of this, first we check if there's one in DB + // If it's not in DB, we'll get an undefined here, which we coerce to [] + staged = (await this.#pendingIndexes.getAsync(secret)) ?? []; + } + return staged; + } + + #setPendingIndexes(jobId: string, secret: string, pendingIndexes: { index: number; txHash: string }[]) { + this.#getJobStagedPendingIndexes(jobId).set(secret, pendingIndexes); + } + + /** + * Returns a job view of all the secrets that have a corresponding list of pending indexes + * either in persistent storage or the current job + * @param jobId the job view to use. + */ + async #allSecretsWithPendingIndexes(jobId: string): Promise { + const allSecretsInKV = new Set(await toArray(this.#pendingIndexes.keysAsync())); + const allSecretsInJobStage = this.#getJobStagedPendingIndexes(jobId).keys(); + return [...allSecretsInKV.union(new Set(allSecretsInJobStage))]; + } + + async #getLastFinalizedIndex(jobId: string, secret: string): Promise { + const jobStagedLastFinalizedIndexes = this.#getJobStagedLastFinalizedIndexes(jobId); + let staged: number | undefined = jobStagedLastFinalizedIndexes.get(secret); + if (staged === undefined) { + staged = await this.#lastFinalizedIndexes.getAsync(secret); + } + return staged; + } + + #setLastFinalizedIndex(jobId: string, secret: string, lastFinalizedIndex: number) { + const jobStagedLastFinalizedIndexes = this.#getJobStagedLastFinalizedIndexes(jobId); + jobStagedLastFinalizedIndexes.set(secret, lastFinalizedIndex); + } + + async commit(jobId: string): Promise { + const stagedPendingIndexes = this.#stagedPendingIndexes.get(jobId); + if (stagedPendingIndexes) { + for (const [secret, pendingIndexes] of stagedPendingIndexes.entries()) { + if (pendingIndexes.length === 0) { + await this.#pendingIndexes.delete(secret); + } else { + await this.#pendingIndexes.set(secret, pendingIndexes); + } + } + } + + const stagedLastFinalizedIndexes = this.#stagedLastFinalizedIndexes.get(jobId); + if (stagedLastFinalizedIndexes) { + for (const [secret, lastFinalizedIndex] of stagedLastFinalizedIndexes.entries()) { + await this.#lastFinalizedIndexes.set(secret, lastFinalizedIndex); + } + } + + this.#stagedPendingIndexes.delete(jobId); + this.#stagedLastFinalizedIndexes.delete(jobId); + } + + discardStaged(jobId: string): Promise { + this.#stagedPendingIndexes.delete(jobId); + this.#stagedLastFinalizedIndexes.delete(jobId); + return Promise.resolve(); } /** @@ -43,6 +147,7 @@ export class SenderTaggingStore { * @param preTags - The pre-tags containing the directional app tagging secrets and the indexes that are to be * stored in the db. * @param txHash - The tx in which the pretags were used in private logs. + * @param jobId - job context for staged writes to this store. See `JobCoordinator` for more details. * @throws If any two pre-tags contain the same directional app tagging secret. This is enforced because we care * only about the highest index for a given secret that was used in the tx. Hence this check is a good way to catch * bugs. @@ -56,7 +161,7 @@ export class SenderTaggingStore { * This is enforced because this should never happen if the syncing is done correctly as we look for logs from higher * indexes than finalized ones. */ - async storePendingIndexes(preTags: PreTag[], txHash: TxHash) { + async storePendingIndexes(preTags: PreTag[], txHash: TxHash, jobId: string) { // The secrets in pre-tags should be unique because we always store just the highest index per given secret-txHash // pair. Below we check that this is the case. const secretsSet = new Set(preTags.map(preTag => preTag.secret.toString())); @@ -67,7 +172,7 @@ export class SenderTaggingStore { for (const { secret, index } of preTags) { // First we check that for any secret the highest used index in tx is not further than window length from // the highest finalized index. - const finalizedIndex = (await this.getLastFinalizedIndex(secret)) ?? 0; + const finalizedIndex = (await this.getLastFinalizedIndex(secret, jobId)) ?? 0; if (index > finalizedIndex + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN) { throw new Error( `Highest used index ${index} is further than window length from the highest finalized index ${finalizedIndex}. @@ -78,7 +183,7 @@ export class SenderTaggingStore { // Throw if the new pending index is lower than or equal to the last finalized index const secretStr = secret.toString(); - const lastFinalizedIndex = await this.#lastFinalizedIndexes.getAsync(secretStr); + const lastFinalizedIndex = await this.#getLastFinalizedIndex(jobId, secretStr); if (lastFinalizedIndex !== undefined && index <= lastFinalizedIndex) { throw new Error( `Cannot store pending index ${index} for secret ${secretStr}: ` + @@ -88,7 +193,7 @@ export class SenderTaggingStore { // Check if this secret + txHash combination already exists const txHashStr = txHash.toString(); - const existingForSecret = (await this.#pendingIndexes.getAsync(secretStr)) ?? []; + const existingForSecret = await this.#getPendingIndexes(jobId, secretStr); const existingForSecretAndTx = existingForSecret.find(entry => entry.txHash === txHashStr); if (existingForSecretAndTx) { @@ -102,7 +207,7 @@ export class SenderTaggingStore { // If it exists with the same index, ignore the update (no-op) } else { // If it doesn't exist, add it - await this.#pendingIndexes.set(secretStr, [...existingForSecret, { index, txHash: txHashStr }]); + this.#setPendingIndexes(jobId, secretStr, [...existingForSecret, { index, txHash: txHashStr }]); } } } @@ -120,8 +225,9 @@ export class SenderTaggingStore { secret: DirectionalAppTaggingSecret, startIndex: number, endIndex: number, + jobId: string, ): Promise { - const existing = (await this.#pendingIndexes.getAsync(secret.toString())) ?? []; + const existing = await this.#getPendingIndexes(jobId, secret.toString()); const txHashes = existing .filter(entry => entry.index >= startIndex && entry.index < endIndex) .map(entry => entry.txHash); @@ -133,8 +239,8 @@ export class SenderTaggingStore { * @param secret - The secret to get the last finalized index for. * @returns The last (highest) finalized index for the given secret. */ - getLastFinalizedIndex(secret: DirectionalAppTaggingSecret): Promise { - return this.#lastFinalizedIndexes.getAsync(secret.toString()); + async getLastFinalizedIndex(secret: DirectionalAppTaggingSecret, jobId: string): Promise { + return (await this.#getLastFinalizedIndex(jobId, secret.toString())) ?? undefined; } /** @@ -143,13 +249,13 @@ export class SenderTaggingStore { * @param secret - The directional app tagging secret to query the last used index for. * @returns The last used index. */ - async getLastUsedIndex(secret: DirectionalAppTaggingSecret): Promise { + async getLastUsedIndex(secret: DirectionalAppTaggingSecret, jobId: string): Promise { const secretStr = secret.toString(); - const pendingTxScopedIndexes = (await this.#pendingIndexes.getAsync(secretStr)) ?? []; + const pendingTxScopedIndexes = await this.#getPendingIndexes(jobId, secretStr); const pendingIndexes = pendingTxScopedIndexes.map(entry => entry.index); if (pendingTxScopedIndexes.length === 0) { - return this.#lastFinalizedIndexes.getAsync(secretStr); + return (await this.#getLastFinalizedIndex(jobId, secretStr)) ?? undefined; } // As the last used index we return the highest one from the pending indexes. Note that this value will be always @@ -160,23 +266,23 @@ export class SenderTaggingStore { /** * Drops all pending indexes corresponding to the given transaction hashes. */ - async dropPendingIndexes(txHashes: TxHash[]) { + async dropPendingIndexes(txHashes: TxHash[], jobId: string) { if (txHashes.length === 0) { return; } - const txHashStrs = new Set(txHashes.map(txHash => txHash.toString())); - const allSecrets = await toArray(this.#pendingIndexes.keysAsync()); + const txHashStrings = new Set(txHashes.map(txHash => txHash.toString())); + const allSecrets = await this.#allSecretsWithPendingIndexes(jobId); for (const secret of allSecrets) { - const pendingData = await this.#pendingIndexes.getAsync(secret); + const pendingData = await this.#getPendingIndexes(jobId, secret); if (pendingData) { - const filtered = pendingData.filter(item => !txHashStrs.has(item.txHash)); + const filtered = pendingData.filter(item => !txHashStrings.has(item.txHash)); if (filtered.length === 0) { - await this.#pendingIndexes.delete(secret); + this.#setPendingIndexes(jobId, secret, []); } else if (filtered.length !== pendingData.length) { // Some items were filtered out, so update the pending data - await this.#pendingIndexes.set(secret, filtered); + this.#setPendingIndexes(jobId, secret, filtered); } // else: No items were filtered out (txHashes not found for this secret) --> no-op } @@ -187,7 +293,7 @@ export class SenderTaggingStore { * Updates pending indexes corresponding to the given transaction hashes to be finalized and prunes any lower pending * indexes. */ - async finalizePendingIndexes(txHashes: TxHash[]) { + async finalizePendingIndexes(txHashes: TxHash[], jobId: string) { if (txHashes.length === 0) { return; } @@ -195,10 +301,10 @@ export class SenderTaggingStore { for (const txHash of txHashes) { const txHashStr = txHash.toString(); - const allSecrets = await toArray(this.#pendingIndexes.keysAsync()); + const allSecrets = await this.#allSecretsWithPendingIndexes(jobId); for (const secret of allSecrets) { - const pendingData = await this.#pendingIndexes.getAsync(secret); + const pendingData = await this.#getPendingIndexes(jobId, secret); if (!pendingData) { continue; } @@ -214,7 +320,7 @@ export class SenderTaggingStore { throw new Error(`Multiple pending indexes found for tx hash ${txHashStr} and secret ${secret}`); } - let lastFinalized = await this.#lastFinalizedIndexes.getAsync(secret); + let lastFinalized = await this.#getLastFinalizedIndex(jobId, secret); const newFinalized = matchingIndexes[0]; if (newFinalized < (lastFinalized ?? 0)) { @@ -225,7 +331,7 @@ export class SenderTaggingStore { ); } - await this.#lastFinalizedIndexes.set(secret, newFinalized); + this.#setLastFinalizedIndex(jobId, secret, newFinalized); lastFinalized = newFinalized; // When we add pending indexes, we ensure they are higher than the last finalized index. However, because we @@ -234,9 +340,9 @@ export class SenderTaggingStore { // outdated pending indexes. const remainingItemsOfHigherIndex = pendingData.filter(item => item.index > (lastFinalized ?? 0)); if (remainingItemsOfHigherIndex.length === 0) { - await this.#pendingIndexes.delete(secret); + this.#setPendingIndexes(jobId, secret, []); } else { - await this.#pendingIndexes.set(secret, remainingItemsOfHigherIndex); + this.#setPendingIndexes(jobId, secret, remainingItemsOfHigherIndex); } } } diff --git a/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.test.ts b/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.test.ts index a5fe72cb9ab5..b5585a78553a 100644 --- a/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.test.ts +++ b/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.test.ts @@ -66,11 +66,12 @@ describe('loadPrivateLogsForSenderRecipientPair', () => { aztecNode, taggingStore, NON_INTERFERING_ANCHOR_BLOCK_NUMBER, + 'test', ); expect(logs).toHaveLength(0); - expect(await taggingStore.getHighestAgedIndex(secret)).toBeUndefined(); - expect(await taggingStore.getHighestFinalizedIndex(secret)).toBeUndefined(); + expect(await taggingStore.getHighestAgedIndex(secret, 'test')).toBeUndefined(); + expect(await taggingStore.getHighestFinalizedIndex(secret, 'test')).toBeUndefined(); }); it('loads log and updates highest finalized index but not highest aged index', async () => { @@ -101,11 +102,12 @@ describe('loadPrivateLogsForSenderRecipientPair', () => { aztecNode, taggingStore, NON_INTERFERING_ANCHOR_BLOCK_NUMBER, + 'test', ); expect(logs).toHaveLength(1); - expect(await taggingStore.getHighestFinalizedIndex(secret)).toBe(logIndex); - expect(await taggingStore.getHighestAgedIndex(secret)).toBeUndefined(); + expect(await taggingStore.getHighestFinalizedIndex(secret, 'test')).toBe(logIndex); + expect(await taggingStore.getHighestAgedIndex(secret, 'test')).toBeUndefined(); }); it('loads log and updates both highest aged and highest finalized indexes', async () => { @@ -136,11 +138,12 @@ describe('loadPrivateLogsForSenderRecipientPair', () => { aztecNode, taggingStore, NON_INTERFERING_ANCHOR_BLOCK_NUMBER, + 'test', ); expect(logs).toHaveLength(1); - expect(await taggingStore.getHighestAgedIndex(secret)).toBe(logIndex); - expect(await taggingStore.getHighestFinalizedIndex(secret)).toBe(logIndex); + expect(await taggingStore.getHighestAgedIndex(secret, 'test')).toBe(logIndex); + expect(await taggingStore.getHighestFinalizedIndex(secret, 'test')).toBe(logIndex); }); it('logs at boundaries are properly loaded, window and highest indexes advance as expected', async () => { @@ -156,8 +159,8 @@ describe('loadPrivateLogsForSenderRecipientPair', () => { const log2Tag = await computeSiloedTagForIndex(log2Index); // Set existing highest aged index and highest finalized index - await taggingStore.updateHighestAgedIndex(secret, highestAgedIndex); - await taggingStore.updateHighestFinalizedIndex(secret, highestFinalizedIndex); + await taggingStore.updateHighestAgedIndex(secret, highestAgedIndex, 'test'); + await taggingStore.updateHighestFinalizedIndex(secret, highestFinalizedIndex, 'test'); aztecNode.getL2Tips.mockResolvedValue({ finalized: { number: BlockNumber(finalizedBlockNumber) }, @@ -188,12 +191,13 @@ describe('loadPrivateLogsForSenderRecipientPair', () => { aztecNode, taggingStore, NON_INTERFERING_ANCHOR_BLOCK_NUMBER, + 'test', ); // Verify that both logs at the boundaries of the range were found and processed expect(logs).toHaveLength(2); - expect(await taggingStore.getHighestFinalizedIndex(secret)).toBe(log2Index); - expect(await taggingStore.getHighestAgedIndex(secret)).toBe(log1Index); + expect(await taggingStore.getHighestFinalizedIndex(secret, 'test')).toBe(log2Index); + expect(await taggingStore.getHighestAgedIndex(secret, 'test')).toBe(log1Index); // Verify that the window was moved forward correctly // Total range queried: from (highestAgedIndex + 1) to (log2Index + WINDOW_LEN + 1) exclusive diff --git a/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.ts b/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.ts index 9a18c0331406..31e1bc38a972 100644 --- a/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.ts +++ b/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.ts @@ -21,6 +21,7 @@ export async function loadPrivateLogsForSenderRecipientPair( aztecNode: AztecNode, taggingStore: RecipientTaggingStore, anchorBlockNumber: BlockNumber, + jobId: string, ): Promise { // # Explanation of how the algorithm works // When we perform the sync we will look at logs that correspond to the tagging index range @@ -73,8 +74,8 @@ export async function loadPrivateLogsForSenderRecipientPair( let start: number, end: number; { - const currentHighestAgedIndex = await taggingStore.getHighestAgedIndex(secret); - const currentHighestFinalizedIndex = await taggingStore.getHighestFinalizedIndex(secret); + const currentHighestAgedIndex = await taggingStore.getHighestAgedIndex(secret, jobId); + const currentHighestFinalizedIndex = await taggingStore.getHighestFinalizedIndex(secret, jobId); // We don't want to include the highest aged index so we start from `currentHighestAgedIndex + 1` (or 0 if not set) start = currentHighestAgedIndex === undefined ? 0 : currentHighestAgedIndex + 1; @@ -104,7 +105,7 @@ export async function loadPrivateLogsForSenderRecipientPair( // Store updates in data provider and update local variables if (highestAgedIndex !== undefined) { - await taggingStore.updateHighestAgedIndex(secret, highestAgedIndex); + await taggingStore.updateHighestAgedIndex(secret, highestAgedIndex, jobId); } if (highestFinalizedIndex === undefined) { @@ -117,7 +118,7 @@ export async function loadPrivateLogsForSenderRecipientPair( throw new Error('Highest aged index lower than highest finalized index invariant violated'); } - await taggingStore.updateHighestFinalizedIndex(secret, highestFinalizedIndex); + await taggingStore.updateHighestFinalizedIndex(secret, highestFinalizedIndex, jobId); // For the next iteration we want to look only at indexes for which we have not attempted to load logs yet while // ensuring that we do not look further than WINDOW_LEN ahead of the highest finalized index. diff --git a/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.test.ts b/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.test.ts index 43af9587edf5..d845b446fff8 100644 --- a/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.test.ts +++ b/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.test.ts @@ -44,11 +44,11 @@ describe('syncSenderTaggingIndexes', () => { return Promise.resolve(tags.map((_tag: SiloedTag) => [])); }); - await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore); + await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore, 'test'); // Highest used and finalized indexes should stay undefined - expect(await taggingStore.getLastUsedIndex(secret)).toBeUndefined(); - expect(await taggingStore.getLastFinalizedIndex(secret)).toBeUndefined(); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBeUndefined(); + expect(await taggingStore.getLastFinalizedIndex(secret, 'test')).toBeUndefined(); }); // These tests need to be run together in sequence. @@ -86,13 +86,13 @@ describe('syncSenderTaggingIndexes', () => { finalized: { number: finalizedBlockNumberStep1 }, } as any); - await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore); + await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore, 'test'); // Verify the highest finalized index is updated to 3 - expect(await taggingStore.getLastFinalizedIndex(secret)).toBe(finalizedIndexStep1); + expect(await taggingStore.getLastFinalizedIndex(secret, 'test')).toBe(finalizedIndexStep1); // Verify the highest used index also returns 3 (when there is no higher pending index the highest used index is // the highest finalized index). - expect(await taggingStore.getLastUsedIndex(secret)).toBe(finalizedIndexStep1); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(finalizedIndexStep1); }); it('step 2: pending log is synced', async () => { @@ -115,12 +115,12 @@ describe('syncSenderTaggingIndexes', () => { finalized: { number: finalizedBlockNumberStep1 }, } as any); - await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore); + await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore, 'test'); // Verify the highest finalized index was not updated - expect(await taggingStore.getLastFinalizedIndex(secret)).toBe(finalizedIndexStep1); + expect(await taggingStore.getLastFinalizedIndex(secret, 'test')).toBe(finalizedIndexStep1); // Verify the highest used index was updated to the pending index - expect(await taggingStore.getLastUsedIndex(secret)).toBe(pendingIndexStep2); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(pendingIndexStep2); }); it('step 3: syncs logs across 2 windows', async () => { @@ -184,10 +184,10 @@ describe('syncSenderTaggingIndexes', () => { finalized: { number: newFinalizedBlockNumber }, } as any); - await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore); + await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore, 'test'); - expect(await taggingStore.getLastFinalizedIndex(secret)).toBe(newHighestFinalizedIndex); - expect(await taggingStore.getLastUsedIndex(secret)).toBe(newHighestUsedIndex); + expect(await taggingStore.getLastFinalizedIndex(secret, 'test')).toBe(newHighestFinalizedIndex); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(newHighestUsedIndex); }); }); @@ -238,10 +238,10 @@ describe('syncSenderTaggingIndexes', () => { } as any); // Sync tagged logs - await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore); + await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore, 'test'); // Verify that both highest finalized and highest used were set to the pending and finalized index - expect(await taggingStore.getLastFinalizedIndex(secret)).toBe(pendingAndFinalizedIndex); - expect(await taggingStore.getLastUsedIndex(secret)).toBe(pendingAndFinalizedIndex); + expect(await taggingStore.getLastFinalizedIndex(secret, 'test')).toBe(pendingAndFinalizedIndex); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(pendingAndFinalizedIndex); }); }); diff --git a/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.ts b/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.ts index ed90b0bdc1cd..7fdfb5e0b590 100644 --- a/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.ts +++ b/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.ts @@ -26,6 +26,7 @@ export async function syncSenderTaggingIndexes( app: AztecAddress, aztecNode: AztecNode, taggingStore: SenderTaggingStore, + jobId: string, ): Promise { // # Explanation of how syncing works // @@ -45,7 +46,7 @@ export async function syncSenderTaggingIndexes( // Each window advance requires two queries (logs + tx status). For example, syncing indexes 0–500 with a window of // 100 takes at least 10 round trips (5 windows × 2 queries). - const finalizedIndex = await taggingStore.getLastFinalizedIndex(secret); + const finalizedIndex = await taggingStore.getLastFinalizedIndex(secret, jobId); let start = finalizedIndex === undefined ? 0 : finalizedIndex + 1; let end = start + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN; @@ -56,21 +57,21 @@ export async function syncSenderTaggingIndexes( while (true) { // Load and store indexes for the current window. These indexes may already exist in the database if txs using // them were previously sent from this PXE. Any duplicates are handled by the tagging data provider. - await loadAndStoreNewTaggingIndexes(secret, app, start, end, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, start, end, aztecNode, taggingStore, jobId); // Retrieve all indexes within the current window from storage and update their status accordingly. - const pendingTxHashes = await taggingStore.getTxHashesOfPendingIndexes(secret, start, end); + const pendingTxHashes = await taggingStore.getTxHashesOfPendingIndexes(secret, start, end, jobId); if (pendingTxHashes.length === 0) { break; } const { txHashesToFinalize, txHashesToDrop } = await getStatusChangeOfPending(pendingTxHashes, aztecNode); - await taggingStore.dropPendingIndexes(txHashesToDrop); - await taggingStore.finalizePendingIndexes(txHashesToFinalize); + await taggingStore.dropPendingIndexes(txHashesToDrop, jobId); + await taggingStore.finalizePendingIndexes(txHashesToFinalize, jobId); // We check if the finalized index has been updated. - newFinalizedIndex = await taggingStore.getLastFinalizedIndex(secret); + newFinalizedIndex = await taggingStore.getLastFinalizedIndex(secret, jobId); if (previousFinalizedIndex !== newFinalizedIndex) { // A new finalized index was found, so we'll run the loop again. For example: // - Previous finalized index: 10 diff --git a/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.test.ts b/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.test.ts index 8139e8ce721f..6565e0720e53 100644 --- a/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.test.ts +++ b/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.test.ts @@ -46,14 +46,14 @@ describe('loadAndStoreNewTaggingIndexes', () => { return Promise.resolve(tags.map((_tag: SiloedTag) => [])); }); - await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore, 'test'); // Verify that no pending indexes were stored - expect(await taggingStore.getLastUsedIndex(secret)).toBeUndefined(); - expect(await taggingStore.getLastFinalizedIndex(secret)).toBeUndefined(); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBeUndefined(); + expect(await taggingStore.getLastFinalizedIndex(secret, 'test')).toBeUndefined(); // Verify the entire window has no pending tx hashes - const txHashesInWindow = await taggingStore.getTxHashesOfPendingIndexes(secret, 0, 10); + const txHashesInWindow = await taggingStore.getTxHashesOfPendingIndexes(secret, 0, 10, 'test'); expect(txHashesInWindow).toHaveLength(0); }); @@ -66,15 +66,15 @@ describe('loadAndStoreNewTaggingIndexes', () => { return Promise.resolve(tags.map((t: SiloedTag) => (t.equals(tag) ? [makeLog(txHash, tag.value)] : []))); }); - await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore, 'test'); // Verify that the pending index was stored for this txHash - const txHashesInRange = await taggingStore.getTxHashesOfPendingIndexes(secret, index, index + 1); + const txHashesInRange = await taggingStore.getTxHashesOfPendingIndexes(secret, index, index + 1, 'test'); expect(txHashesInRange).toHaveLength(1); expect(txHashesInRange[0].equals(txHash)).toBe(true); // Verify the last used index is correct - expect(await taggingStore.getLastUsedIndex(secret)).toBe(index); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(index); }); it('for multiple logs with same txHash stores the highest index', async () => { @@ -97,19 +97,19 @@ describe('loadAndStoreNewTaggingIndexes', () => { ); }); - await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore, 'test'); // Verify that only the highest index (7) was stored for this txHash and secret - const txHashesAtIndex2 = await taggingStore.getTxHashesOfPendingIndexes(secret, index2, index2 + 1); + const txHashesAtIndex2 = await taggingStore.getTxHashesOfPendingIndexes(secret, index2, index2 + 1, 'test'); expect(txHashesAtIndex2).toHaveLength(1); expect(txHashesAtIndex2[0].equals(txHash)).toBe(true); // Verify the lower index is not stored separately - const txHashesAtIndex1 = await taggingStore.getTxHashesOfPendingIndexes(secret, index1, index1 + 1); + const txHashesAtIndex1 = await taggingStore.getTxHashesOfPendingIndexes(secret, index1, index1 + 1, 'test'); expect(txHashesAtIndex1).toHaveLength(0); // Verify the last used index is the highest - expect(await taggingStore.getLastUsedIndex(secret)).toBe(index2); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(index2); }); it('multiple logs with different txHashes', async () => { @@ -133,19 +133,19 @@ describe('loadAndStoreNewTaggingIndexes', () => { ); }); - await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore, 'test'); // Verify that both txHashes have their respective indexes stored - const txHashesAtIndex1 = await taggingStore.getTxHashesOfPendingIndexes(secret, index1, index1 + 1); + const txHashesAtIndex1 = await taggingStore.getTxHashesOfPendingIndexes(secret, index1, index1 + 1, 'test'); expect(txHashesAtIndex1).toHaveLength(1); expect(txHashesAtIndex1[0].equals(txHash1)).toBe(true); - const txHashesAtIndex2 = await taggingStore.getTxHashesOfPendingIndexes(secret, index2, index2 + 1); + const txHashesAtIndex2 = await taggingStore.getTxHashesOfPendingIndexes(secret, index2, index2 + 1, 'test'); expect(txHashesAtIndex2).toHaveLength(1); expect(txHashesAtIndex2[0].equals(txHash2)).toBe(true); // Verify the last used index is the highest - expect(await taggingStore.getLastUsedIndex(secret)).toBe(index2); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(index2); }); // Expected to happen if sending logs from multiple PXEs at a similar time. @@ -161,17 +161,17 @@ describe('loadAndStoreNewTaggingIndexes', () => { ); }); - await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore, 'test'); // Verify that both txHashes have the same index stored - const txHashesAtIndex = await taggingStore.getTxHashesOfPendingIndexes(secret, index, index + 1); + const txHashesAtIndex = await taggingStore.getTxHashesOfPendingIndexes(secret, index, index + 1, 'test'); expect(txHashesAtIndex).toHaveLength(2); const txHashStrings = txHashesAtIndex.map(h => h.toString()); expect(txHashStrings).toContain(txHash1.toString()); expect(txHashStrings).toContain(txHash2.toString()); // Verify the last used index is correct - expect(await taggingStore.getLastUsedIndex(secret)).toBe(index); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(index); }); it('complex scenario: multiple txHashes with multiple indexes', async () => { @@ -207,29 +207,29 @@ describe('loadAndStoreNewTaggingIndexes', () => { ); }); - await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore, 'test'); // Verify txHash1 has highest index 8 (should not be at index 1) - const txHashesAtIndex1 = await taggingStore.getTxHashesOfPendingIndexes(secret, 1, 2); + const txHashesAtIndex1 = await taggingStore.getTxHashesOfPendingIndexes(secret, 1, 2, 'test'); expect(txHashesAtIndex1).toHaveLength(0); - const txHashesAtIndex8 = await taggingStore.getTxHashesOfPendingIndexes(secret, 8, 9); + const txHashesAtIndex8 = await taggingStore.getTxHashesOfPendingIndexes(secret, 8, 9, 'test'); expect(txHashesAtIndex8).toHaveLength(1); expect(txHashesAtIndex8[0].equals(txHash1)).toBe(true); // Verify txHash2 has highest index 5 (should not be at index 3) - const txHashesAtIndex3 = await taggingStore.getTxHashesOfPendingIndexes(secret, 3, 4); + const txHashesAtIndex3 = await taggingStore.getTxHashesOfPendingIndexes(secret, 3, 4, 'test'); expect(txHashesAtIndex3).toHaveLength(0); - const txHashesAtIndex5 = await taggingStore.getTxHashesOfPendingIndexes(secret, 5, 6); + const txHashesAtIndex5 = await taggingStore.getTxHashesOfPendingIndexes(secret, 5, 6, 'test'); expect(txHashesAtIndex5).toHaveLength(1); expect(txHashesAtIndex5[0].equals(txHash2)).toBe(true); // Verify txHash3 has index 9 - const txHashesAtIndex9 = await taggingStore.getTxHashesOfPendingIndexes(secret, 9, 10); + const txHashesAtIndex9 = await taggingStore.getTxHashesOfPendingIndexes(secret, 9, 10, 'test'); expect(txHashesAtIndex9).toHaveLength(1); expect(txHashesAtIndex9[0].equals(txHash3)).toBe(true); // Verify the last used index is the highest - expect(await taggingStore.getLastUsedIndex(secret)).toBe(9); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(9); }); it('start is inclusive and end is exclusive', async () => { @@ -255,18 +255,18 @@ describe('loadAndStoreNewTaggingIndexes', () => { ); }); - await loadAndStoreNewTaggingIndexes(secret, app, start, end, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, start, end, aztecNode, taggingStore, 'test'); // Verify that the log at start (inclusive) was processed - const txHashesAtStart = await taggingStore.getTxHashesOfPendingIndexes(secret, start, start + 1); + const txHashesAtStart = await taggingStore.getTxHashesOfPendingIndexes(secret, start, start + 1, 'test'); expect(txHashesAtStart).toHaveLength(1); expect(txHashesAtStart[0].equals(txHashAtStart)).toBe(true); // Verify that the log at end (exclusive) was NOT processed - const txHashesAtEnd = await taggingStore.getTxHashesOfPendingIndexes(secret, end, end + 1); + const txHashesAtEnd = await taggingStore.getTxHashesOfPendingIndexes(secret, end, end + 1, 'test'); expect(txHashesAtEnd).toHaveLength(0); // Verify the last used index is the start index (since end was not processed) - expect(await taggingStore.getLastUsedIndex(secret)).toBe(start); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(start); }); }); diff --git a/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.ts b/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.ts index 8b3a5eb7e6c8..7ce9091d7093 100644 --- a/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.ts +++ b/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.ts @@ -25,6 +25,7 @@ export async function loadAndStoreNewTaggingIndexes( end: number, aztecNode: AztecNode, taggingStore: SenderTaggingStore, + jobId: string, ) { // We compute the tags for the current window of indexes const preTagsForWindow: PreTag[] = Array(end - start) @@ -40,7 +41,7 @@ export async function loadAndStoreNewTaggingIndexes( // Now we iterate over the map, reconstruct the preTags and tx hash and store them in the db. for (const [txHashStr, highestIndex] of highestIndexMap.entries()) { const txHash = TxHash.fromString(txHashStr); - await taggingStore.storePendingIndexes([{ secret, index: highestIndex }], txHash); + await taggingStore.storePendingIndexes([{ secret, index: highestIndex }], txHash, jobId); } }