Skip to content

Commit d41cfbf

Browse files
authored
refactor: staged writes in tagging stores (#19476)
Third part of the series started with #19445. This makes the stores related to tagging synchronization work based on staged writes.
2 parents 253e438 + 9619e78 commit d41cfbf

13 files changed

+582
-227
lines changed

yarn-project/pxe/src/contract_function_simulator/oracle/private_execution_oracle.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,9 @@ export class PrivateExecutionOracle extends UtilityExecutionOracle implements IP
265265
// This is a tagging secret we've not yet used in this tx, so first sync our store to make sure its indices
266266
// are up to date. We do this here because this store is not synced as part of the global sync because
267267
// that'd be wasteful as most tagging secrets are not used in each tx.
268-
await syncSenderTaggingIndexes(secret, this.contractAddress, this.aztecNode, this.senderTaggingStore);
268+
await syncSenderTaggingIndexes(secret, this.contractAddress, this.aztecNode, this.senderTaggingStore, this.jobId);
269269

270-
const lastUsedIndex = await this.senderTaggingStore.getLastUsedIndex(secret);
270+
const lastUsedIndex = await this.senderTaggingStore.getLastUsedIndex(secret, this.jobId);
271271
// If lastUsedIndex is undefined, we've never used this secret, so start from 0
272272
// Otherwise, the next index to use is one past the last used index
273273
return lastUsedIndex === undefined ? 0 : lastUsedIndex + 1;

yarn-project/pxe/src/logs/log_service.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ export class LogService {
123123
this.aztecNode,
124124
this.recipientTaggingStore,
125125
anchorBlockNumber,
126+
this.jobId,
126127
),
127128
),
128129
);

yarn-project/pxe/src/pxe.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ export class PXE {
157157
);
158158

159159
const jobCoordinator = new JobCoordinator(store);
160-
jobCoordinator.registerStores([capsuleStore]);
160+
jobCoordinator.registerStores([capsuleStore, senderTaggingStore, recipientTaggingStore]);
161161

162162
const debugUtils = new PXEDebugUtils(contractStore, noteStore);
163163

@@ -673,7 +673,7 @@ export class PXE {
673673
// TODO(benesjan): The following is an expensive operation. Figure out a way to avoid it.
674674
const txHash = (await txProvingResult.toTx()).txHash;
675675

676-
await this.senderTaggingStore.storePendingIndexes(preTagsUsedInTheTx, txHash);
676+
await this.senderTaggingStore.storePendingIndexes(preTagsUsedInTheTx, txHash, jobId);
677677
this.log.debug(`Stored used pre-tags as sender for the tx`, {
678678
preTagsUsedInTheTx,
679679
});
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import { Fr } from '@aztec/foundation/curves/bn254';
2+
import { openTmpStore } from '@aztec/kv-store/lmdb-v2';
3+
import { DirectionalAppTaggingSecret } from '@aztec/stdlib/logs';
4+
5+
import { RecipientTaggingStore } from './recipient_tagging_store.js';
6+
7+
describe('RecipientTaggingStore', () => {
8+
let taggingStore: RecipientTaggingStore;
9+
let secret1: DirectionalAppTaggingSecret;
10+
let secret2: DirectionalAppTaggingSecret;
11+
12+
beforeEach(async () => {
13+
taggingStore = new RecipientTaggingStore(await openTmpStore('test'));
14+
secret1 = DirectionalAppTaggingSecret.fromString(Fr.random().toString());
15+
secret2 = DirectionalAppTaggingSecret.fromString(Fr.random().toString());
16+
});
17+
18+
describe('staged writes', () => {
19+
it('persists staged highest aged index to the store', async () => {
20+
await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1');
21+
22+
expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBeUndefined();
23+
24+
await taggingStore.commit('job1');
25+
26+
expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(5);
27+
});
28+
29+
it('persists staged highest finalized index to the store', async () => {
30+
await taggingStore.updateHighestFinalizedIndex(secret1, 10, 'job1');
31+
32+
expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBeUndefined();
33+
34+
await taggingStore.commit('job1');
35+
36+
expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBe(10);
37+
});
38+
39+
it('persists multiple secrets for the same job', async () => {
40+
await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1');
41+
await taggingStore.updateHighestAgedIndex(secret2, 8, 'job1');
42+
await taggingStore.updateHighestFinalizedIndex(secret1, 3, 'job1');
43+
await taggingStore.updateHighestFinalizedIndex(secret2, 6, 'job1');
44+
45+
await taggingStore.commit('job1');
46+
47+
expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(5);
48+
expect(await taggingStore.getHighestAgedIndex(secret2, 'job2')).toBe(8);
49+
expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBe(3);
50+
expect(await taggingStore.getHighestFinalizedIndex(secret2, 'job2')).toBe(6);
51+
});
52+
53+
it('clears staged data after commit', async () => {
54+
await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1');
55+
await taggingStore.commit('job1');
56+
57+
// Updating again with a higher value in the same job should work
58+
// (if staged data wasn't cleared, it would still have the old value cached)
59+
await taggingStore.updateHighestAgedIndex(secret1, 10, 'job2');
60+
expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(10);
61+
await taggingStore.commit('job2');
62+
63+
expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBe(10);
64+
});
65+
66+
it('does not affect other jobs when committing', async () => {
67+
await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1');
68+
await taggingStore.updateHighestAgedIndex(secret1, 10, 'job2');
69+
70+
await taggingStore.commit('job2');
71+
72+
// job1's staged value should still be intact
73+
expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBe(5);
74+
});
75+
76+
it('discards staged highest aged index without persisting', async () => {
77+
await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1');
78+
await taggingStore.discardStaged('job1');
79+
expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBeUndefined();
80+
});
81+
82+
it('discards staged highest finalized index without persisting', async () => {
83+
await taggingStore.updateHighestFinalizedIndex(secret1, 5, 'job1');
84+
await taggingStore.discardStaged('job1');
85+
expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job1')).toBeUndefined();
86+
});
87+
});
88+
});
Lines changed: 89 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,129 @@
11
import type { AztecAsyncKVStore, AztecAsyncMap } from '@aztec/kv-store';
22
import type { DirectionalAppTaggingSecret } from '@aztec/stdlib/logs';
33

4+
import type { StagedStore } from '../../job_coordinator/job_coordinator.js';
5+
46
/**
57
* Data provider of tagging data used when syncing the logs as a recipient. The sender counterpart of this class
68
* is called SenderTaggingStore. We have the providers separate for the sender and recipient because
79
* the algorithms are completely disjoint and there is not data reuse between the two.
810
*
911
* @dev Chain reorgs do not need to be handled here because both the finalized and aged indexes refer to finalized
1012
* blocks, which by definition cannot be affected by reorgs.
11-
*
12-
* TODO(benesjan): Relocate to yarn-project/pxe/src/storage/tagging_store
1313
*/
14-
export class RecipientTaggingStore {
14+
export class RecipientTaggingStore implements StagedStore {
15+
storeName: string = 'recipient_tagging';
16+
1517
#store: AztecAsyncKVStore;
1618

1719
#highestAgedIndex: AztecAsyncMap<string, number>;
1820
#highestFinalizedIndex: AztecAsyncMap<string, number>;
1921

22+
// jobId => secret => number
23+
#highestAgedIndexForJob: Map<string, Map<string, number>>;
24+
25+
// jobId => secret => number
26+
#highestFinalizedIndexForJob: Map<string, Map<string, number>>;
27+
2028
constructor(store: AztecAsyncKVStore) {
2129
this.#store = store;
2230

2331
this.#highestAgedIndex = this.#store.openMap('highest_aged_index');
2432
this.#highestFinalizedIndex = this.#store.openMap('highest_finalized_index');
33+
34+
this.#highestAgedIndexForJob = new Map();
35+
this.#highestFinalizedIndexForJob = new Map();
36+
}
37+
38+
#getHighestAgedIndexForJob(jobId: string): Map<string, number> {
39+
let highestAgedIndexForJob = this.#highestAgedIndexForJob.get(jobId);
40+
if (!highestAgedIndexForJob) {
41+
highestAgedIndexForJob = new Map();
42+
this.#highestAgedIndexForJob.set(jobId, highestAgedIndexForJob);
43+
}
44+
return highestAgedIndexForJob;
45+
}
46+
47+
async #readHighestAgedIndex(jobId: string, secret: string): Promise<number | undefined> {
48+
return this.#getHighestAgedIndexForJob(jobId).get(secret) ?? (await this.#highestAgedIndex.getAsync(secret));
49+
}
50+
51+
#writeHighestAgedIndex(jobId: string, secret: string, index: number) {
52+
this.#getHighestAgedIndexForJob(jobId).set(secret, index);
53+
}
54+
55+
#getHighestFinalizedIndexForJob(jobId: string): Map<string, number> {
56+
let jobStagedHighestFinalizedIndex = this.#highestFinalizedIndexForJob.get(jobId);
57+
if (!jobStagedHighestFinalizedIndex) {
58+
jobStagedHighestFinalizedIndex = new Map();
59+
this.#highestFinalizedIndexForJob.set(jobId, jobStagedHighestFinalizedIndex);
60+
}
61+
return jobStagedHighestFinalizedIndex;
62+
}
63+
64+
async #readHighestFinalizedIndex(jobId: string, secret: string): Promise<number | undefined> {
65+
return (
66+
this.#getHighestFinalizedIndexForJob(jobId).get(secret) ?? (await this.#highestFinalizedIndex.getAsync(secret))
67+
);
68+
}
69+
70+
#writeHighestFinalizedIndex(jobId: string, secret: string, index: number) {
71+
this.#getHighestFinalizedIndexForJob(jobId).set(secret, index);
72+
}
73+
74+
/**
75+
* Writes all job-specific in-memory data to persistent storage.
76+
*
77+
* @remark This method must run in a DB transaction context. It's designed to be called from JobCoordinator#commitJob.
78+
*/
79+
async commit(jobId: string): Promise<void> {
80+
const highestAgedIndexForJob = this.#highestAgedIndexForJob.get(jobId);
81+
if (highestAgedIndexForJob) {
82+
for (const [secret, index] of highestAgedIndexForJob.entries()) {
83+
await this.#highestAgedIndex.set(secret, index);
84+
}
85+
}
86+
87+
const highestFinalizedIndexForJob = this.#highestFinalizedIndexForJob.get(jobId);
88+
if (highestFinalizedIndexForJob) {
89+
for (const [secret, index] of highestFinalizedIndexForJob.entries()) {
90+
await this.#highestFinalizedIndex.set(secret, index);
91+
}
92+
}
93+
94+
return this.discardStaged(jobId);
95+
}
96+
97+
discardStaged(jobId: string): Promise<void> {
98+
this.#highestAgedIndexForJob.delete(jobId);
99+
this.#highestFinalizedIndexForJob.delete(jobId);
100+
return Promise.resolve();
25101
}
26102

27-
getHighestAgedIndex(secret: DirectionalAppTaggingSecret): Promise<number | undefined> {
28-
return this.#highestAgedIndex.getAsync(secret.toString());
103+
getHighestAgedIndex(secret: DirectionalAppTaggingSecret, jobId: string): Promise<number | undefined> {
104+
return this.#readHighestAgedIndex(jobId, secret.toString());
29105
}
30106

31-
async updateHighestAgedIndex(secret: DirectionalAppTaggingSecret, index: number): Promise<void> {
32-
const currentIndex = await this.#highestAgedIndex.getAsync(secret.toString());
107+
async updateHighestAgedIndex(secret: DirectionalAppTaggingSecret, index: number, jobId: string): Promise<void> {
108+
const currentIndex = await this.#readHighestAgedIndex(jobId, secret.toString());
33109
if (currentIndex !== undefined && index <= currentIndex) {
34110
// Log sync should never set a lower highest aged index.
35111
throw new Error(`New highest aged index (${index}) must be higher than the current one (${currentIndex})`);
36112
}
37-
await this.#highestAgedIndex.set(secret.toString(), index);
113+
this.#writeHighestAgedIndex(jobId, secret.toString(), index);
38114
}
39115

40-
getHighestFinalizedIndex(secret: DirectionalAppTaggingSecret): Promise<number | undefined> {
41-
return this.#highestFinalizedIndex.getAsync(secret.toString());
116+
getHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, jobId: string): Promise<number | undefined> {
117+
return this.#readHighestFinalizedIndex(jobId, secret.toString());
42118
}
43119

44-
async updateHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, index: number): Promise<void> {
45-
const currentIndex = await this.#highestFinalizedIndex.getAsync(secret.toString());
120+
async updateHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, index: number, jobId: string): Promise<void> {
121+
const currentIndex = await this.#readHighestFinalizedIndex(jobId, secret.toString());
46122
if (currentIndex !== undefined && index < currentIndex) {
47123
// Log sync should never set a lower highest finalized index but it can happen that it would try to set the same
48124
// one because we are loading logs from highest aged index + 1 and not from the highest finalized index.
49125
throw new Error(`New highest finalized index (${index}) must be higher than the current one (${currentIndex})`);
50126
}
51-
await this.#highestFinalizedIndex.set(secret.toString(), index);
127+
this.#writeHighestFinalizedIndex(jobId, secret.toString(), index);
52128
}
53129
}

0 commit comments

Comments
 (0)