diff --git a/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts b/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts index 757ebef994be..1fcd05d35c6b 100644 --- a/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts +++ b/yarn-project/pxe/src/contract_function_simulator/oracle/utility_execution_oracle.ts @@ -354,6 +354,7 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra this.recipientTaggingStore, this.senderAddressBookStore, this.addressStore, + this.jobId, ); await logService.syncTaggedLogs(this.contractAddress, pendingTaggedLogArrayBaseSlot, this.scopes); @@ -385,11 +386,11 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra // We read all note and event validation requests and process them all concurrently. This makes the process much // faster as we don't need to wait for the network round-trip. const noteValidationRequests = ( - await this.capsuleStore.readCapsuleArray(contractAddress, noteValidationRequestsArrayBaseSlot) + await this.capsuleStore.readCapsuleArray(contractAddress, noteValidationRequestsArrayBaseSlot, this.jobId) ).map(NoteValidationRequest.fromFields); const eventValidationRequests = ( - await this.capsuleStore.readCapsuleArray(contractAddress, eventValidationRequestsArrayBaseSlot) + await this.capsuleStore.readCapsuleArray(contractAddress, eventValidationRequestsArrayBaseSlot, this.jobId) ).map(EventValidationRequest.fromFields); const noteService = new NoteService(this.noteStore, this.aztecNode, this.anchorBlockStore); @@ -424,8 +425,8 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra await Promise.all([...noteDeliveries, ...eventDeliveries]); // Requests are cleared once we're done. - await this.capsuleStore.setCapsuleArray(contractAddress, noteValidationRequestsArrayBaseSlot, []); - await this.capsuleStore.setCapsuleArray(contractAddress, eventValidationRequestsArrayBaseSlot, []); + await this.capsuleStore.setCapsuleArray(contractAddress, noteValidationRequestsArrayBaseSlot, [], this.jobId); + await this.capsuleStore.setCapsuleArray(contractAddress, eventValidationRequestsArrayBaseSlot, [], this.jobId); } public async utilityBulkRetrieveLogs( @@ -441,7 +442,7 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra // We read all log retrieval requests and process them all concurrently. This makes the process much faster as we // don't need to wait for the network round-trip. const logRetrievalRequests = ( - await this.capsuleStore.readCapsuleArray(contractAddress, logRetrievalRequestsArrayBaseSlot) + await this.capsuleStore.readCapsuleArray(contractAddress, logRetrievalRequestsArrayBaseSlot, this.jobId) ).map(LogRetrievalRequest.fromFields); const logService = new LogService( @@ -452,18 +453,20 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra this.recipientTaggingStore, this.senderAddressBookStore, this.addressStore, + this.jobId, ); const maybeLogRetrievalResponses = await logService.bulkRetrieveLogs(logRetrievalRequests); // Requests are cleared once we're done. - await this.capsuleStore.setCapsuleArray(contractAddress, logRetrievalRequestsArrayBaseSlot, []); + await this.capsuleStore.setCapsuleArray(contractAddress, logRetrievalRequestsArrayBaseSlot, [], this.jobId); // The responses are stored as Option in a second CapsuleArray. await this.capsuleStore.setCapsuleArray( contractAddress, logRetrievalResponsesArrayBaseSlot, maybeLogRetrievalResponses.map(LogRetrievalResponse.toSerializedOption), + this.jobId, ); } @@ -472,7 +475,8 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra // TODO(#10727): instead of this check that this.contractAddress is allowed to access the external DB throw new Error(`Contract ${contractAddress} is not allowed to access ${this.contractAddress}'s PXE DB`); } - return this.capsuleStore.storeCapsule(this.contractAddress, slot, capsule); + this.capsuleStore.storeCapsule(this.contractAddress, slot, capsule, this.jobId); + return Promise.resolve(); } public async utilityLoadCapsule(contractAddress: AztecAddress, slot: Fr): Promise { @@ -483,7 +487,7 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra return ( // TODO(#12425): On the following line, the pertinent capsule gets overshadowed by the transient one. Tackle this. this.capsules.find(c => c.contractAddress.equals(contractAddress) && c.storageSlot.equals(slot))?.data ?? - (await this.capsuleStore.loadCapsule(this.contractAddress, slot)) + (await this.capsuleStore.loadCapsule(this.contractAddress, slot, this.jobId)) ); } @@ -492,7 +496,8 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra // TODO(#10727): instead of this check that this.contractAddress is allowed to access the external DB throw new Error(`Contract ${contractAddress} is not allowed to access ${this.contractAddress}'s PXE DB`); } - return this.capsuleStore.deleteCapsule(this.contractAddress, slot); + this.capsuleStore.deleteCapsule(this.contractAddress, slot, this.jobId); + return Promise.resolve(); } public utilityCopyCapsule( @@ -505,7 +510,7 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra // TODO(#10727): instead of this check that this.contractAddress is allowed to access the external DB throw new Error(`Contract ${contractAddress} is not allowed to access ${this.contractAddress}'s PXE DB`); } - return this.capsuleStore.copyCapsule(this.contractAddress, srcSlot, dstSlot, numEntries); + return this.capsuleStore.copyCapsule(this.contractAddress, srcSlot, dstSlot, numEntries, this.jobId); } // TODO(#11849): consider replacing this oracle with a pure Noir implementation of aes decryption. diff --git a/yarn-project/pxe/src/logs/log_service.test.ts b/yarn-project/pxe/src/logs/log_service.test.ts index 25e2c3494b8e..69fd3f297ae2 100644 --- a/yarn-project/pxe/src/logs/log_service.test.ts +++ b/yarn-project/pxe/src/logs/log_service.test.ts @@ -50,6 +50,7 @@ describe('LogService', () => { recipientTaggingStore, senderAddressBookStore, addressStore, + 'test', ); aztecNode.getPrivateLogsByTags.mockReset(); diff --git a/yarn-project/pxe/src/logs/log_service.ts b/yarn-project/pxe/src/logs/log_service.ts index 4959d4a089e4..7a719572d80c 100644 --- a/yarn-project/pxe/src/logs/log_service.ts +++ b/yarn-project/pxe/src/logs/log_service.ts @@ -26,6 +26,7 @@ export class LogService { private readonly recipientTaggingStore: RecipientTaggingStore, private readonly senderAddressBookStore: SenderAddressBookStore, private readonly addressStore: AddressStore, + private readonly jobId: string, ) {} public async bulkRetrieveLogs(logRetrievalRequests: LogRetrievalRequest[]): Promise<(LogRetrievalResponse | null)[]> { @@ -186,7 +187,7 @@ export class LogService { }); // TODO: This looks like it could belong more at the oracle interface level - return this.capsuleStore.appendToCapsuleArray(contractAddress, capsuleArrayBaseSlot, pendingTaggedLogs); + return this.capsuleStore.appendToCapsuleArray(contractAddress, capsuleArrayBaseSlot, pendingTaggedLogs, this.jobId); } async #getCompleteAddress(account: AztecAddress): Promise { diff --git a/yarn-project/pxe/src/pxe.ts b/yarn-project/pxe/src/pxe.ts index 5b6c21f0300d..4d6618ba2edc 100644 --- a/yarn-project/pxe/src/pxe.ts +++ b/yarn-project/pxe/src/pxe.ts @@ -157,6 +157,7 @@ export class PXE { ); const jobCoordinator = new JobCoordinator(store); + jobCoordinator.registerStores([capsuleStore]); const debugUtils = new PXEDebugUtils(contractStore, noteStore); diff --git a/yarn-project/pxe/src/storage/capsule_store/capsule_store.test.ts b/yarn-project/pxe/src/storage/capsule_store/capsule_store.test.ts index 848c27d25e37..51217891caee 100644 --- a/yarn-project/pxe/src/storage/capsule_store/capsule_store.test.ts +++ b/yarn-project/pxe/src/storage/capsule_store/capsule_store.test.ts @@ -1,7 +1,7 @@ import { range } from '@aztec/foundation/array'; import { times } from '@aztec/foundation/collection'; import { Fr } from '@aztec/foundation/curves/bn254'; -import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; +import { AztecLMDBStoreV2, openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { AztecAddress } from '@aztec/stdlib/aztec-address'; import { CapsuleStore } from './capsule_store.js'; @@ -9,12 +9,13 @@ import { CapsuleStore } from './capsule_store.js'; describe('capsule data provider', () => { let contract: AztecAddress; let capsuleStore: CapsuleStore; + let store: AztecLMDBStoreV2; beforeEach(async () => { // Setup mock contract address contract = await AztecAddress.random(); - // Setup data provider - const store = await openTmpStore('capsule_store_test'); + // Setup store + store = await openTmpStore('capsule_store_test'); capsuleStore = new CapsuleStore(store); }); @@ -23,8 +24,8 @@ describe('capsule data provider', () => { const slot = new Fr(1); const values = [new Fr(42)]; - await capsuleStore.storeCapsule(contract, slot, values); - const result = await capsuleStore.loadCapsule(contract, slot); + capsuleStore.storeCapsule(contract, slot, values, 'test'); + const result = await capsuleStore.loadCapsule(contract, slot, 'test'); expect(result).toEqual(values); }); @@ -32,8 +33,8 @@ describe('capsule data provider', () => { const slot = new Fr(1); const values = [new Fr(42), new Fr(43), new Fr(44)]; - await capsuleStore.storeCapsule(contract, slot, values); - const result = await capsuleStore.loadCapsule(contract, slot); + capsuleStore.storeCapsule(contract, slot, values, 'test'); + const result = await capsuleStore.loadCapsule(contract, slot, 'test'); expect(result).toEqual(values); }); @@ -42,10 +43,10 @@ describe('capsule data provider', () => { const initialValues = [new Fr(42)]; const newValues = [new Fr(100)]; - await capsuleStore.storeCapsule(contract, slot, initialValues); - await capsuleStore.storeCapsule(contract, slot, newValues); + capsuleStore.storeCapsule(contract, slot, initialValues, 'test'); + capsuleStore.storeCapsule(contract, slot, newValues, 'test'); - const result = await capsuleStore.loadCapsule(contract, slot); + const result = await capsuleStore.loadCapsule(contract, slot, 'test'); expect(result).toEqual(newValues); }); @@ -55,11 +56,11 @@ describe('capsule data provider', () => { const values1 = [new Fr(42)]; const values2 = [new Fr(100)]; - await capsuleStore.storeCapsule(contract, slot, values1); - await capsuleStore.storeCapsule(anotherContract, slot, values2); + capsuleStore.storeCapsule(contract, slot, values1, 'test'); + capsuleStore.storeCapsule(anotherContract, slot, values2, 'test'); - const result1 = await capsuleStore.loadCapsule(contract, slot); - const result2 = await capsuleStore.loadCapsule(anotherContract, slot); + const result1 = await capsuleStore.loadCapsule(contract, slot, 'test'); + const result2 = await capsuleStore.loadCapsule(anotherContract, slot, 'test'); expect(result1).toEqual(values1); expect(result2).toEqual(values2); @@ -67,7 +68,7 @@ describe('capsule data provider', () => { it('returns null for non-existent slots', async () => { const slot = Fr.random(); - const result = await capsuleStore.loadCapsule(contract, slot); + const result = await capsuleStore.loadCapsule(contract, slot, 'test'); expect(result).toBeNull(); }); }); @@ -77,17 +78,17 @@ describe('capsule data provider', () => { const slot = new Fr(1); const values = [new Fr(42)]; - await capsuleStore.storeCapsule(contract, slot, values); - await capsuleStore.deleteCapsule(contract, slot); + capsuleStore.storeCapsule(contract, slot, values, 'test'); + capsuleStore.deleteCapsule(contract, slot, 'test'); - expect(await capsuleStore.loadCapsule(contract, slot)).toBeNull(); + expect(await capsuleStore.loadCapsule(contract, slot, 'test')).toBeNull(); }); it('deletes an empty slot', async () => { const slot = new Fr(1); - await capsuleStore.deleteCapsule(contract, slot); + capsuleStore.deleteCapsule(contract, slot, 'test'); - expect(await capsuleStore.loadCapsule(contract, slot)).toBeNull(); + expect(await capsuleStore.loadCapsule(contract, slot, 'test')).toBeNull(); }); }); @@ -96,82 +97,84 @@ describe('capsule data provider', () => { const slot = new Fr(1); const values = [new Fr(42)]; - await capsuleStore.storeCapsule(contract, slot, values); + capsuleStore.storeCapsule(contract, slot, values, 'test'); const dstSlot = new Fr(5); - await capsuleStore.copyCapsule(contract, slot, dstSlot, 1); + await capsuleStore.copyCapsule(contract, slot, dstSlot, 1, 'test'); - expect(await capsuleStore.loadCapsule(contract, dstSlot)).toEqual(values); + expect(await capsuleStore.loadCapsule(contract, dstSlot, 'test')).toEqual(values); }); it('copies multiple non-overlapping values', async () => { const src = new Fr(1); const valuesArray = [[new Fr(42)], [new Fr(1337)], [new Fr(13)]]; - await capsuleStore.storeCapsule(contract, src, valuesArray[0]); - await capsuleStore.storeCapsule(contract, src.add(new Fr(1)), valuesArray[1]); - await capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2]); + capsuleStore.storeCapsule(contract, src, valuesArray[0], 'test'); + capsuleStore.storeCapsule(contract, src.add(new Fr(1)), valuesArray[1], 'test'); + capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2], 'test'); const dst = new Fr(5); - await capsuleStore.copyCapsule(contract, src, dst, 3); + await capsuleStore.copyCapsule(contract, src, dst, 3, 'test'); - expect(await capsuleStore.loadCapsule(contract, dst)).toEqual(valuesArray[0]); - expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(1)))).toEqual(valuesArray[1]); - expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(2)))).toEqual(valuesArray[2]); + expect(await capsuleStore.loadCapsule(contract, dst, 'test')).toEqual(valuesArray[0]); + expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(1)), 'test')).toEqual(valuesArray[1]); + expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(2)), 'test')).toEqual(valuesArray[2]); }); it('copies overlapping values with src ahead', async () => { const src = new Fr(1); const valuesArray = [[new Fr(42)], [new Fr(1337)], [new Fr(13)]]; - await capsuleStore.storeCapsule(contract, src, valuesArray[0]); - await capsuleStore.storeCapsule(contract, src.add(new Fr(1)), valuesArray[1]); - await capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2]); + capsuleStore.storeCapsule(contract, src, valuesArray[0], 'test'); + capsuleStore.storeCapsule(contract, src.add(new Fr(1)), valuesArray[1], 'test'); + capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2], 'test'); const dst = new Fr(2); - await capsuleStore.copyCapsule(contract, src, dst, 3); + await capsuleStore.copyCapsule(contract, src, dst, 3, 'test'); - expect(await capsuleStore.loadCapsule(contract, dst)).toEqual(valuesArray[0]); - expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(1)))).toEqual(valuesArray[1]); - expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(2)))).toEqual(valuesArray[2]); + expect(await capsuleStore.loadCapsule(contract, dst, 'test')).toEqual(valuesArray[0]); + expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(1)), 'test')).toEqual(valuesArray[1]); + expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(2)), 'test')).toEqual(valuesArray[2]); // Slots 2 and 3 (src[1] and src[2]) should have been overwritten since they are also dst[0] and dst[1] - expect(await capsuleStore.loadCapsule(contract, src)).toEqual(valuesArray[0]); // src[0] (unchanged) - expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(1)))).toEqual(valuesArray[0]); // dst[0] - expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(2)))).toEqual(valuesArray[1]); // dst[1] + expect(await capsuleStore.loadCapsule(contract, src, 'test')).toEqual(valuesArray[0]); // src[0] (unchanged) + expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(1)), 'test')).toEqual(valuesArray[0]); // dst[0] + expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(2)), 'test')).toEqual(valuesArray[1]); // dst[1] }); it('copies overlapping values with dst ahead', async () => { const src = new Fr(5); const valuesArray = [[new Fr(42)], [new Fr(1337)], [new Fr(13)]]; - await capsuleStore.storeCapsule(contract, src, valuesArray[0]); - await capsuleStore.storeCapsule(contract, src.add(new Fr(1)), valuesArray[1]); - await capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2]); + capsuleStore.storeCapsule(contract, src, valuesArray[0], 'test'); + capsuleStore.storeCapsule(contract, src.add(new Fr(1)), valuesArray[1], 'test'); + capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2], 'test'); const dst = new Fr(4); - await capsuleStore.copyCapsule(contract, src, dst, 3); + await capsuleStore.copyCapsule(contract, src, dst, 3, 'test'); - expect(await capsuleStore.loadCapsule(contract, dst)).toEqual(valuesArray[0]); - expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(1)))).toEqual(valuesArray[1]); - expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(2)))).toEqual(valuesArray[2]); + expect(await capsuleStore.loadCapsule(contract, dst, 'test')).toEqual(valuesArray[0]); + expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(1)), 'test')).toEqual(valuesArray[1]); + expect(await capsuleStore.loadCapsule(contract, dst.add(new Fr(2)), 'test')).toEqual(valuesArray[2]); // Slots 5 and 6 (src[0] and src[1]) should have been overwritten since they are also dst[1] and dst[2] - expect(await capsuleStore.loadCapsule(contract, src)).toEqual(valuesArray[1]); // dst[1] - expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(1)))).toEqual(valuesArray[2]); // dst[2] - expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(2)))).toEqual(valuesArray[2]); // src[2] (unchanged) + expect(await capsuleStore.loadCapsule(contract, src, 'test')).toEqual(valuesArray[1]); // dst[1] + expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(1)), 'test')).toEqual(valuesArray[2]); // dst[2] + expect(await capsuleStore.loadCapsule(contract, src.add(new Fr(2)), 'test')).toEqual(valuesArray[2]); // src[2] (unchanged) }); it('copying fails if any value is empty', async () => { const src = new Fr(1); const valuesArray = [[new Fr(42)], [new Fr(1337)], [new Fr(13)]]; - await capsuleStore.storeCapsule(contract, src, valuesArray[0]); + capsuleStore.storeCapsule(contract, src, valuesArray[0], 'test'); // We skip src[1] - await capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2]); + capsuleStore.storeCapsule(contract, src.add(new Fr(2)), valuesArray[2], 'test'); const dst = new Fr(5); - await expect(capsuleStore.copyCapsule(contract, src, dst, 3)).rejects.toThrow('Attempted to copy empty slot'); + await expect(capsuleStore.copyCapsule(contract, src, dst, 3, 'test')).rejects.toThrow( + 'Attempted to copy empty slot', + ); }); }); @@ -181,11 +184,11 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); const array = range(4).map(x => [new Fr(x)]); - await capsuleStore.appendToCapsuleArray(contract, baseSlot, array); + await capsuleStore.appendToCapsuleArray(contract, baseSlot, array, 'test'); - expect(await capsuleStore.loadCapsule(contract, baseSlot)).toEqual([new Fr(array.length)]); + expect(await capsuleStore.loadCapsule(contract, baseSlot, 'test')).toEqual([new Fr(array.length)]); for (const i of range(array.length)) { - expect(await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + i)))).toEqual(array[i]); + expect(await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + i)), 'test')).toEqual(array[i]); } }); @@ -193,16 +196,16 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); const originalArray = range(4).map(x => [new Fr(x)]); - await capsuleStore.appendToCapsuleArray(contract, baseSlot, originalArray); + await capsuleStore.appendToCapsuleArray(contract, baseSlot, originalArray, 'test'); const newElements = [[new Fr(13)], [new Fr(42)]]; - await capsuleStore.appendToCapsuleArray(contract, baseSlot, newElements); + await capsuleStore.appendToCapsuleArray(contract, baseSlot, newElements, 'test'); const expectedLength = originalArray.length + newElements.length; - expect(await capsuleStore.loadCapsule(contract, baseSlot)).toEqual([new Fr(expectedLength)]); + expect(await capsuleStore.loadCapsule(contract, baseSlot, 'test')).toEqual([new Fr(expectedLength)]); for (const i of range(expectedLength)) { - expect(await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + i)))).toEqual( + expect(await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + i)), 'test')).toEqual( [...originalArray, ...newElements][i], ); } @@ -212,7 +215,7 @@ describe('capsule data provider', () => { describe('readCapsuleArray', () => { it('reads an empty array', async () => { const baseSlot = new Fr(3); - const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot); + const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot, 'test'); expect(retrievedArray).toEqual([]); }); @@ -220,9 +223,9 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); const storedArray = range(4).map(x => [new Fr(x)]); - await capsuleStore.appendToCapsuleArray(contract, baseSlot, storedArray); + await capsuleStore.appendToCapsuleArray(contract, baseSlot, storedArray, 'test'); - const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot); + const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot, 'test'); expect(retrievedArray).toEqual(storedArray); }); @@ -230,10 +233,12 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); // Store in the base slot a non-zero value, indicating a non-zero array length - await capsuleStore.storeCapsule(contract, baseSlot, [new Fr(1)]); + capsuleStore.storeCapsule(contract, baseSlot, [new Fr(1)], 'test'); // Reading should now fail as some of the capsules in the array are empty - await expect(capsuleStore.readCapsuleArray(contract, baseSlot)).rejects.toThrow('Expected non-empty value'); + await expect(capsuleStore.readCapsuleArray(contract, baseSlot, 'test')).rejects.toThrow( + 'Expected non-empty value', + ); }); }); @@ -242,9 +247,9 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); const newArray = range(4).map(x => [new Fr(x)]); - await capsuleStore.setCapsuleArray(contract, baseSlot, newArray); + await capsuleStore.setCapsuleArray(contract, baseSlot, newArray, 'test'); - const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot); + const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot, 'test'); expect(retrievedArray).toEqual(newArray); }); @@ -252,12 +257,12 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); const originalArray = range(4, 0).map(x => [new Fr(x)]); - await capsuleStore.setCapsuleArray(contract, baseSlot, originalArray); + await capsuleStore.setCapsuleArray(contract, baseSlot, originalArray, 'test'); const newArray = range(10, 10).map(x => [new Fr(x)]); - await capsuleStore.setCapsuleArray(contract, baseSlot, newArray); + await capsuleStore.setCapsuleArray(contract, baseSlot, newArray, 'test'); - const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot); + const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot, 'test'); expect(retrievedArray).toEqual(newArray); }); @@ -265,17 +270,19 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); const originalArray = range(10, 0).map(x => [new Fr(x)]); - await capsuleStore.setCapsuleArray(contract, baseSlot, originalArray); + await capsuleStore.setCapsuleArray(contract, baseSlot, originalArray, 'test'); const newArray = range(4, 10).map(x => [new Fr(x)]); - await capsuleStore.setCapsuleArray(contract, baseSlot, newArray); + await capsuleStore.setCapsuleArray(contract, baseSlot, newArray, 'test'); - const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot); + const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot, 'test'); expect(retrievedArray).toEqual(newArray); // Not only do we read the expected array, but also all capsules past the new array length have been cleared for (const i of range(originalArray.length - newArray.length)) { - expect(await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + newArray.length + i)))).toBeNull(); + expect( + await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + newArray.length + i)), 'test'), + ).toBeNull(); } }); @@ -283,16 +290,16 @@ describe('capsule data provider', () => { const baseSlot = new Fr(3); const originalArray = range(10, 0).map(x => [new Fr(x)]); - await capsuleStore.setCapsuleArray(contract, baseSlot, originalArray); + await capsuleStore.setCapsuleArray(contract, baseSlot, originalArray, 'test'); - await capsuleStore.setCapsuleArray(contract, baseSlot, []); + await capsuleStore.setCapsuleArray(contract, baseSlot, [], 'test'); - const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot); + const retrievedArray = await capsuleStore.readCapsuleArray(contract, baseSlot, 'test'); expect(retrievedArray).toEqual([]); // All capsules from the original array have been cleared for (const i of range(originalArray.length)) { - expect(await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + i)))).toBeNull(); + expect(await capsuleStore.loadCapsule(contract, baseSlot.add(new Fr(1 + i)), 'test')).toBeNull(); } }); }); @@ -319,7 +326,12 @@ describe('capsule data provider', () => { contract, new Fr(0), times(NUMBER_OF_ITEMS, () => range(ARRAY_LENGTH).map(x => new Fr(x))), + 'test', ); + + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); }, TEST_TIMEOUT_MS, ); @@ -331,7 +343,12 @@ describe('capsule data provider', () => { contract, new Fr(0), times(NUMBER_OF_ITEMS, () => range(ARRAY_LENGTH).map(x => new Fr(x))), + 'test', ); + + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); }, TEST_TIMEOUT_MS, ); @@ -343,10 +360,19 @@ describe('capsule data provider', () => { contract, new Fr(0), times(NUMBER_OF_ITEMS, () => range(ARRAY_LENGTH).map(x => new Fr(x))), + 'test', ); + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); + // Append a single element - await capsuleStore.appendToCapsuleArray(contract, new Fr(0), [range(ARRAY_LENGTH).map(x => new Fr(x))]); + await capsuleStore.appendToCapsuleArray(contract, new Fr(0), [range(ARRAY_LENGTH).map(x => new Fr(x))], 'test'); + + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); }, TEST_TIMEOUT_MS, ); @@ -358,10 +384,19 @@ describe('capsule data provider', () => { contract, new Fr(0), times(NUMBER_OF_ITEMS, () => range(ARRAY_LENGTH).map(x => new Fr(x))), + 'test', ); + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); + // We just move the entire thing one slot. - await capsuleStore.copyCapsule(contract, new Fr(0), new Fr(1), NUMBER_OF_ITEMS); + await capsuleStore.copyCapsule(contract, new Fr(0), new Fr(1), NUMBER_OF_ITEMS, 'test'); + + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); }, TEST_TIMEOUT_MS, ); @@ -373,9 +408,18 @@ describe('capsule data provider', () => { contract, new Fr(0), times(NUMBER_OF_ITEMS, () => range(ARRAY_LENGTH).map(x => new Fr(x))), + 'test', ); - await capsuleStore.readCapsuleArray(contract, new Fr(0)); + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); + + await capsuleStore.readCapsuleArray(contract, new Fr(0), 'test'); + + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); }, TEST_TIMEOUT_MS, ); @@ -387,11 +431,128 @@ describe('capsule data provider', () => { contract, new Fr(0), times(NUMBER_OF_ITEMS, () => range(ARRAY_LENGTH).map(x => new Fr(x))), + 'test', ); - await capsuleStore.setCapsuleArray(contract, new Fr(0), []); + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); + + await capsuleStore.setCapsuleArray(contract, new Fr(0), [], 'test'); + + await store.transactionAsync(async () => { + await capsuleStore.commit('test'); + }); }, TEST_TIMEOUT_MS, ); }); + + describe('staged writes', () => { + it('commit does not hold zombie data', async () => { + // This test tries to reproduce a scenario where + // we fail to clear a job's data after commit. + // The effect of such an incorrect behavior would be perceived + // if we re-used a jobId we had previously committed, + // which should not happen given we generate random job id's, + // but it's good to keep things clean and consistent. + const slot = Fr.random(); + const committedValues1 = [Fr.random()]; + const committedValues2 = [Fr.random()]; + + capsuleStore.storeCapsule(contract, slot, committedValues1, 'job-1'); + + // After this commit, 'job-1' should logically be reset + // Any read of contract-slot after this should see committedValues1 + await capsuleStore.commit('job-1'); + + // Any read of contract-slot should see job2committedValues + capsuleStore.storeCapsule(contract, slot, committedValues2, 'job-2'); + await capsuleStore.commit('job-2'); + + // If we failed to properly dispose 'job-1's staged writes on commit, + // Instead of reading committedValues2 (as we should), we would end + // up reading committedValues1 (which would be wrong) + expect(await capsuleStore.loadCapsule(contract, slot, 'job-1')).toEqual(committedValues2); + }); + + it('writes to job view are isolated from another job view', async () => { + const slot = Fr.random(); + const committedValues = [Fr.random()]; + const stagedValues = [Fr.random()]; + const commitJobId: string = 'commit-job'; + const stagedJob1: string = 'staged-job-1'; + const stagedJob2: string = 'staged-job-2'; + + // First set a committed capsule (using a different job that we commit) + capsuleStore.storeCapsule(contract, slot, committedValues, commitJobId); + await capsuleStore.commit(commitJobId); + + // Then set a staged capsule (not committed) + capsuleStore.storeCapsule(contract, slot, stagedValues, stagedJob1); + + // With jobId=1, should get staged capsule + expect(await capsuleStore.loadCapsule(contract, slot, stagedJob1)).toEqual(stagedValues); + + // With jobId=2, should get committed capsule + expect(await capsuleStore.loadCapsule(contract, slot, stagedJob2)).toEqual(committedValues); + }); + + it('staged deletions hide committed data', async () => { + const slot = Fr.random(); + const committedValues = [Fr.random()]; + const commitJobId: string = 'commit-job'; + const stagedJob1: string = 'staged-job-1'; + const stagedJob2: string = 'staged-job-2'; + + // First set a committed capsule + capsuleStore.storeCapsule(contract, slot, committedValues, commitJobId); + await capsuleStore.commit(commitJobId); + + // Delete in staging (not committed) + capsuleStore.deleteCapsule(contract, slot, stagedJob1); + + // Without jobId=2, should still see committed capsule + expect(await capsuleStore.loadCapsule(contract, slot, stagedJob2)).toEqual(committedValues); + + // With jobId=1, should see null (deleted in staging) + expect(await capsuleStore.loadCapsule(contract, slot, stagedJob1)).toBeNull(); + }); + + it('commit applies staged deletions', async () => { + const slot = Fr.random(); + const committedValues = [Fr.random()]; + const commitJobId: string = 'commit-job'; + const deleteJobId: string = 'delete-job'; + + capsuleStore.storeCapsule(contract, slot, committedValues, commitJobId); + await capsuleStore.commit(commitJobId); + capsuleStore.deleteCapsule(contract, slot, deleteJobId); + + await capsuleStore.commit(deleteJobId); + + // Now any job should see this null (deleted) + expect(await capsuleStore.loadCapsule(contract, slot, 'any-job-sees-this')).toBeNull(); + }); + + it('discardStaged removes staged data without affecting main', async () => { + const slot = Fr.random(); + const committedValues = [Fr.random()]; + const stagedValues = [Fr.random()]; + const commitJobId: string = 'commit-job'; + const stagingJobId: string = 'staging-job'; + + capsuleStore.storeCapsule(contract, slot, committedValues, commitJobId); + await capsuleStore.commit(commitJobId); + capsuleStore.storeCapsule(contract, slot, stagedValues, stagingJobId); + + await capsuleStore.discardStaged(stagingJobId); + + // Should still get committed capsule + expect(await capsuleStore.loadCapsule(contract, slot, 'any-job')).toEqual(committedValues); + + // With stagingJobId should fall back to committed since staging was discarded + expect(await capsuleStore.loadCapsule(contract, slot, stagingJobId)).toEqual(committedValues); + }); + }); }); diff --git a/yarn-project/pxe/src/storage/capsule_store/capsule_store.ts b/yarn-project/pxe/src/storage/capsule_store/capsule_store.ts index ac39dc88dafb..3424ff546844 100644 --- a/yarn-project/pxe/src/storage/capsule_store/capsule_store.ts +++ b/yarn-project/pxe/src/storage/capsule_store/capsule_store.ts @@ -3,12 +3,21 @@ import { type Logger, createLogger } from '@aztec/foundation/log'; import type { AztecAsyncKVStore, AztecAsyncMap } from '@aztec/kv-store'; import type { AztecAddress } from '@aztec/stdlib/aztec-address'; -export class CapsuleStore { +import type { StagedStore } from '../../job_coordinator/job_coordinator.js'; + +export class CapsuleStore implements StagedStore { + readonly storeName = 'capsule'; + #store: AztecAsyncKVStore; // Arbitrary data stored by contracts. Key is computed as `${contractAddress}:${key}` #capsules: AztecAsyncMap; + // jobId => `${contractAddress}:${key}` => capsule data + // when `#stagedCapsules.get('some-job-id').get('${some-contract-address:some-key') === null`, + // it signals that the capsule was deleted during the job, so it needs to be deleted on commit + #stagedCapsules: Map>; + logger: Logger; constructor(store: AztecAsyncKVStore) { @@ -16,21 +25,120 @@ export class CapsuleStore { this.#capsules = this.#store.openMap('capsules'); + this.#stagedCapsules = new Map(); + this.logger = createLogger('pxe:capsule-data-provider'); } + /** + * Given a job denoted by `jobId`, it returns the + * capsules that said job has interacted with. + * + * Capsules that haven't been committed to persistence KV storage + * are kept in-memory in `#stagedCapsules`, this method provides a convenient + * way to access that in-memory collection of data. + * + * @param jobId + * @returns + */ + #getJobStagedCapsules(jobId: string): Map { + let jobStagedCapsules = this.#stagedCapsules.get(jobId); + if (!jobStagedCapsules) { + jobStagedCapsules = new Map(); + this.#stagedCapsules.set(jobId, jobStagedCapsules); + } + return jobStagedCapsules; + } + + /** + * Reads a capsule's slot from the staged version of the data associated to the given jobId. + * + * If it is not there, it reads it from the KV store. + */ + async #getFromStage(jobId: string, dbSlotKey: string): Promise { + const jobStagedCapsules = this.#getJobStagedCapsules(jobId); + let staged: Buffer | null | undefined = jobStagedCapsules.get(dbSlotKey); + // Note that if staged === null, we marked it for deletion, so we don't want to + // re-read it from DB + if (staged === undefined) { + // If we don't have a staged version of this dbSlotKey, first we check if there's one in DB + staged = await this.#loadCapsuleFromDb(dbSlotKey); + } + return staged; + } + + /** + * Writes a capsule to the stage of a job. + */ + #setOnStage(jobId: string, dbSlotKey: string, capsuleData: Buffer) { + this.#getJobStagedCapsules(jobId).set(dbSlotKey, capsuleData); + } + + /** + * Deletes a capsule on the stage of a job. Note the capsule will still + * exist in storage until the job is committed. + */ + #deleteOnStage(jobId: string, dbSlotKey: string) { + this.#getJobStagedCapsules(jobId).set(dbSlotKey, null); + } + + async #loadCapsuleFromDb(dbSlotKey: string): Promise { + const dataBuffer = await this.#capsules.getAsync(dbSlotKey); + if (!dataBuffer) { + return null; + } + + return dataBuffer; + } + + /** + * Commits staged data to main storage. + * Called by JobCoordinator when a job completes successfully. + * Note: JobCoordinator wraps all commits in a single transaction, so we don't + * need our own transactionAsync here (and using one would deadlock on IndexedDB). + * @param jobId - The jobId identifying which staged data to commit + */ + async commit(jobId: string): Promise { + const jobStagedCapsules = this.#getJobStagedCapsules(jobId); + + for (const [key, value] of jobStagedCapsules) { + // In the write stage, we represent deleted capsules with null + // (as opposed to undefined, which denotes there was never a capsule there to begin with). + // So we delete from actual KV store here. + if (value === null) { + await this.#capsules.delete(key); + } else { + await this.#capsules.set(key, value); + } + } + + this.#stagedCapsules.delete(jobId); + } + + /** + * Discards staged data without committing. + */ + discardStaged(jobId: string): Promise { + this.#stagedCapsules.delete(jobId); + return Promise.resolve(); + } + /** * Stores arbitrary information in a per-contract non-volatile database, which can later be retrieved with `loadCapsule`. * * If data was already stored at this slot, it is overwritten. * @param contractAddress - The contract address to scope the data under. * @param slot - The slot in the database in which to store the value. Slots need not be contiguous. * @param capsule - An array of field elements representing the capsule. + * @param jobId - The context in which this store will be visible until PXE decides to persist it to underlying KV store * @remarks A capsule is a "blob" of data that is passed to the contract through an oracle. It works similarly * to public contract storage in that it's indexed by the contract address and storage slot but instead of the global * network state it's backed by local PXE db. */ - async storeCapsule(contractAddress: AztecAddress, slot: Fr, capsule: Fr[]): Promise { - await this.#capsules.set(dbSlotToKey(contractAddress, slot), Buffer.concat(capsule.map(value => value.toBuffer()))); + storeCapsule(contractAddress: AztecAddress, slot: Fr, capsule: Fr[], jobId: string) { + const dbSlotKey = dbSlotToKey(contractAddress, slot); + + // A store overrides any pre-existing data on the slot + this.#setOnStage(jobId, dbSlotKey, Buffer.concat(capsule.map(value => value.toBuffer()))); } /** @@ -39,8 +147,8 @@ export class CapsuleStore { * @param slot - The slot in the database to read. * @returns The stored data or `null` if no data is stored under the slot. */ - async loadCapsule(contractAddress: AztecAddress, slot: Fr): Promise { - const dataBuffer = await this.#capsules.getAsync(dbSlotToKey(contractAddress, slot)); + async loadCapsule(contractAddress: AztecAddress, slot: Fr, jobId: string): Promise { + const dataBuffer = await this.#getFromStage(jobId, dbSlotToKey(contractAddress, slot)); if (!dataBuffer) { this.logger.trace(`Data not found for contract ${contractAddress.toString()} and slot ${slot.toString()}`); return null; @@ -57,8 +165,9 @@ export class CapsuleStore { * @param contractAddress - The contract address under which the data is scoped. * @param slot - The slot in the database to delete. */ - async deleteCapsule(contractAddress: AztecAddress, slot: Fr): Promise { - await this.#capsules.delete(dbSlotToKey(contractAddress, slot)); + deleteCapsule(contractAddress: AztecAddress, slot: Fr, jobId: string) { + // When we commit this, we will interpret null as a deletion, so we'll propagate the delete to the KV store + this.#deleteOnStage(jobId, dbSlotToKey(contractAddress, slot)); } /** @@ -72,13 +181,22 @@ export class CapsuleStore { * @param dstSlot - The first slot to copy to. * @param numEntries - The number of entries to copy. */ - copyCapsule(contractAddress: AztecAddress, srcSlot: Fr, dstSlot: Fr, numEntries: number): Promise { + copyCapsule( + contractAddress: AztecAddress, + srcSlot: Fr, + dstSlot: Fr, + numEntries: number, + jobId: string, + ): Promise { + // This transactional context gives us "copy atomicity": + // there shouldn't be concurrent writes to what's being copied here. + // Equally important: this in practice is expected to perform thousands of DB operations + // and not using a transaction here would heavily impact performance. return this.#store.transactionAsync(async () => { // In order to support overlapping source and destination regions, we need to check the relative positions of source // and destination. If destination is ahead of source, then by the time we overwrite source elements using forward // indexes we'll have already read those. On the contrary, if source is ahead of destination we need to use backward // indexes to avoid reading elements that've been overwritten. - const indexes = Array.from(Array(numEntries).keys()); if (srcSlot.lt(dstSlot)) { indexes.reverse(); @@ -88,12 +206,12 @@ export class CapsuleStore { const currentSrcSlot = dbSlotToKey(contractAddress, srcSlot.add(new Fr(i))); const currentDstSlot = dbSlotToKey(contractAddress, dstSlot.add(new Fr(i))); - const toCopy = await this.#capsules.getAsync(currentSrcSlot); + const toCopy = await this.#getFromStage(jobId, currentSrcSlot); if (!toCopy) { throw new Error(`Attempted to copy empty slot ${currentSrcSlot} for contract ${contractAddress.toString()}`); } - await this.#capsules.set(currentDstSlot, toCopy); + this.#setOnStage(jobId, currentDstSlot, toCopy); } }); } @@ -106,35 +224,45 @@ export class CapsuleStore { * @param baseSlot - The slot where the array length is stored * @param content - Array of capsule data to append */ - appendToCapsuleArray(contractAddress: AztecAddress, baseSlot: Fr, content: Fr[][]): Promise { + appendToCapsuleArray(contractAddress: AztecAddress, baseSlot: Fr, content: Fr[][], jobId: string): Promise { + // We wrap this in a transaction to serialize concurrent calls from Promise.all. + // Without this, concurrent appends to the same array could race: both read length=0, + // both write at the same slots, one overwrites the other. + // Equally important: this in practice is expected to perform thousands of DB operations + // and not using a transaction here would heavily impact performance. return this.#store.transactionAsync(async () => { // Load current length, defaulting to 0 if not found - const lengthData = await this.loadCapsule(contractAddress, baseSlot); + const lengthData = await this.loadCapsule(contractAddress, baseSlot, jobId); const currentLength = lengthData ? lengthData[0].toNumber() : 0; // Store each capsule at consecutive slots after baseSlot + 1 + currentLength for (let i = 0; i < content.length; i++) { const nextSlot = arraySlot(baseSlot, currentLength + i); - await this.storeCapsule(contractAddress, nextSlot, content[i]); + this.storeCapsule(contractAddress, nextSlot, content[i], jobId); } // Update length to include all new capsules const newLength = currentLength + content.length; - await this.storeCapsule(contractAddress, baseSlot, [new Fr(newLength)]); + this.storeCapsule(contractAddress, baseSlot, [new Fr(newLength)], jobId); }); } - readCapsuleArray(contractAddress: AztecAddress, baseSlot: Fr): Promise { + readCapsuleArray(contractAddress: AztecAddress, baseSlot: Fr, jobId: string): Promise { + // I'm leaving this transactional context here though because I'm assuming this + // gives us "read array atomicity": there shouldn't be concurrent writes to what's being copied + // here. + // This is one point we should revisit in the future if we want to relax the concurrency + // of jobs: different calls running concurrently on the same contract may cause trouble. return this.#store.transactionAsync(async () => { // Load length, defaulting to 0 if not found - const maybeLength = await this.loadCapsule(contractAddress, baseSlot); + const maybeLength = await this.loadCapsule(contractAddress, baseSlot, jobId); const length = maybeLength ? maybeLength[0].toBigInt() : 0n; const values: Fr[][] = []; // Read each capsule at consecutive slots after baseSlot for (let i = 0; i < length; i++) { - const currentValue = await this.loadCapsule(contractAddress, arraySlot(baseSlot, i)); + const currentValue = await this.loadCapsule(contractAddress, arraySlot(baseSlot, i), jobId); if (currentValue == undefined) { throw new Error( `Expected non-empty value at capsule array in base slot ${baseSlot} at index ${i} for contract ${contractAddress}`, @@ -148,23 +276,31 @@ export class CapsuleStore { }); } - setCapsuleArray(contractAddress: AztecAddress, baseSlot: Fr, content: Fr[][]) { + setCapsuleArray(contractAddress: AztecAddress, baseSlot: Fr, content: Fr[][], jobId: string) { + // This transactional context in theory isn't so critical now because we aren't + // writing to DB so if there's exceptions midway and it blows up, no visible impact + // to persistent storage will happen. + // I'm leaving this transactional context here though because I'm assuming this + // gives us "write array atomicity": there shouldn't be concurrent writes to what's being copied + // here. + // This is one point we should revisit in the future if we want to relax the concurrency + // of jobs: different calls running concurrently on the same contract may cause trouble. return this.#store.transactionAsync(async () => { // Load current length, defaulting to 0 if not found - const maybeLength = await this.loadCapsule(contractAddress, baseSlot); + const maybeLength = await this.loadCapsule(contractAddress, baseSlot, jobId); const originalLength = maybeLength ? maybeLength[0].toNumber() : 0; // Set the new length - await this.storeCapsule(contractAddress, baseSlot, [new Fr(content.length)]); + this.storeCapsule(contractAddress, baseSlot, [new Fr(content.length)], jobId); // Store the new content, possibly overwriting existing values for (let i = 0; i < content.length; i++) { - await this.storeCapsule(contractAddress, arraySlot(baseSlot, i), content[i]); + this.storeCapsule(contractAddress, arraySlot(baseSlot, i), content[i], jobId); } // Clear any stragglers for (let i = content.length; i < originalLength; i++) { - await this.deleteCapsule(contractAddress, arraySlot(baseSlot, i)); + this.deleteCapsule(contractAddress, arraySlot(baseSlot, i), jobId); } }); }