Skip to content

Commit 1596ab9

Browse files
authored
refactor: atomic reorg handling (#19364)
As Martin mentioned [here](#19327 (comment)) makes sense to have the reorg handling be atomic. In this PR I tackle that. Note that this PR doesn't need to be in stack with #19443 but #19451 depends on both so putting this one into stack was the only way to unblock myself working on #19451.
2 parents c74003f + 911b2b3 commit 1596ab9

File tree

8 files changed

+147
-124
lines changed

8 files changed

+147
-124
lines changed

yarn-project/pxe/src/block_synchronizer/block_synchronizer.test.ts

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { BlockNumber, CheckpointNumber } from '@aztec/foundation/branded-types';
22
import { timesParallel } from '@aztec/foundation/collection';
3+
import { Fr } from '@aztec/foundation/curves/bn254';
34
import { openTmpStore } from '@aztec/kv-store/lmdb-v2';
45
import { L2TipsKVStore } from '@aztec/kv-store/stores';
56
import { GENESIS_CHECKPOINT_HEADER_HASH, L2Block, L2BlockNew, type L2BlockStream } from '@aztec/stdlib/block';
@@ -36,7 +37,7 @@ describe('BlockSynchronizer', () => {
3637
anchorBlockStore = new AnchorBlockStore(store);
3738
noteStore = await NoteStore.create(store);
3839
privateEventStore = new PrivateEventStore(store);
39-
synchronizer = new TestSynchronizer(aztecNode, anchorBlockStore, noteStore, privateEventStore, tipsStore);
40+
synchronizer = new TestSynchronizer(aztecNode, store, anchorBlockStore, noteStore, privateEventStore, tipsStore);
4041
});
4142

4243
it('sets header from latest block', async () => {
@@ -48,12 +49,14 @@ describe('BlockSynchronizer', () => {
4849
});
4950

5051
it('removes notes from db on a reorg', async () => {
51-
const rollbackNotesAndNullifiers = jest
52-
.spyOn(noteStore, 'rollbackNotesAndNullifiers')
53-
.mockImplementation(() => Promise.resolve());
54-
aztecNode.getBlockHeader.mockImplementation(async blockNumber =>
55-
(await L2Block.random(BlockNumber(blockNumber as number))).getBlockHeader(),
56-
);
52+
const rollback = jest.spyOn(noteStore, 'rollback').mockImplementation(() => Promise.resolve());
53+
aztecNode.getBlockHeaderByHash.mockImplementation(async hash => {
54+
// For the test, when hash is '0x3', return block header for block 3
55+
if (hash.equals(Fr.fromString('0x3'))) {
56+
return (await L2Block.random(BlockNumber(3))).getBlockHeader();
57+
}
58+
return undefined;
59+
});
5760

5861
await synchronizer.handleBlockStreamEvent({
5962
type: 'blocks-added',
@@ -66,16 +69,18 @@ describe('BlockSynchronizer', () => {
6669
checkpoint: { number: CheckpointNumber.ZERO, hash: GENESIS_CHECKPOINT_HEADER_HASH.toString() },
6770
});
6871

69-
expect(rollbackNotesAndNullifiers).toHaveBeenCalledWith(3, 4);
72+
expect(rollback).toHaveBeenCalledWith(3, 4);
7073
});
7174

7275
it('removes private events from db on a reorg', async () => {
73-
const rollbackEventsAfterBlock = jest
74-
.spyOn(privateEventStore, 'rollbackEventsAfterBlock')
75-
.mockImplementation(() => Promise.resolve());
76-
aztecNode.getBlockHeader.mockImplementation(async blockNumber =>
77-
(await L2Block.random(BlockNumber(blockNumber as number))).getBlockHeader(),
78-
);
76+
const rollback = jest.spyOn(privateEventStore, 'rollback').mockImplementation(() => Promise.resolve());
77+
aztecNode.getBlockHeaderByHash.mockImplementation(async hash => {
78+
// For the test, when hash is '0x3', return block header for block 3
79+
if (hash.equals(Fr.fromString('0x3'))) {
80+
return (await L2Block.random(BlockNumber(3))).getBlockHeader();
81+
}
82+
return undefined;
83+
});
7984

8085
await synchronizer.handleBlockStreamEvent({
8186
type: 'blocks-added',
@@ -88,6 +93,6 @@ describe('BlockSynchronizer', () => {
8893
checkpoint: { number: CheckpointNumber.ZERO, hash: GENESIS_CHECKPOINT_HEADER_HASH.toString() },
8994
});
9095

91-
expect(rollbackEventsAfterBlock).toHaveBeenCalledWith(3, 4);
96+
expect(rollback).toHaveBeenCalledWith(3, 4);
9297
});
9398
});

yarn-project/pxe/src/block_synchronizer/block_synchronizer.ts

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { BlockNumber } from '@aztec/foundation/branded-types';
2+
import { Fr } from '@aztec/foundation/curves/bn254';
23
import { type Logger, createLogger } from '@aztec/foundation/log';
4+
import type { AztecAsyncKVStore } from '@aztec/kv-store';
35
import type { L2TipsKVStore } from '@aztec/kv-store/stores';
46
import { L2BlockStream, type L2BlockStreamEvent, type L2BlockStreamEventHandler } from '@aztec/stdlib/block';
57
import type { AztecNode } from '@aztec/stdlib/interfaces/client';
@@ -21,6 +23,7 @@ export class BlockSynchronizer implements L2BlockStreamEventHandler {
2123

2224
constructor(
2325
private node: AztecNode,
26+
private store: AztecAsyncKVStore,
2427
private anchorBlockStore: AnchorBlockStore,
2528
private noteStore: NoteStore,
2629
private privateEventStore: PrivateEventStore,
@@ -61,17 +64,25 @@ export class BlockSynchronizer implements L2BlockStreamEventHandler {
6164
}
6265
case 'chain-pruned': {
6366
this.log.warn(`Pruning data after block ${event.block.number} due to reorg`);
64-
// We first unnullify and then remove so that unnullified notes that were created after the block number end up deleted.
65-
const lastSynchedBlockNumber = (await this.anchorBlockStore.getBlockHeader()).getBlockNumber();
66-
await this.noteStore.rollbackNotesAndNullifiers(event.block.number, lastSynchedBlockNumber);
67-
await this.privateEventStore.rollbackEventsAfterBlock(event.block.number, lastSynchedBlockNumber);
68-
// Update the header to the last block.
69-
const newHeader = await this.node.getBlockHeader(event.block.number);
70-
if (!newHeader) {
71-
this.log.error(`Block header not found for block number ${event.block.number} during chain prune`);
72-
} else {
73-
await this.anchorBlockStore.setHeader(newHeader);
67+
68+
const oldAnchorBlockNumber = (await this.anchorBlockStore.getBlockHeader()).getBlockNumber();
69+
// Note that the following is not necessarily the anchor block that will be used in the transaction - if
70+
// the chain has already moved past the reorg, we'll also see blocks-added events that will push the anchor
71+
// forward.
72+
const newAnchorBlockHeader = await this.node.getBlockHeaderByHash(Fr.fromString(event.block.hash));
73+
74+
if (!newAnchorBlockHeader) {
75+
throw new Error(
76+
`Block header for block number ${event.block.number} and hash ${event.block.hash} not found during chain prune. This likely indicates a bug in the node, as we receive block stream events and fetch block headers from the same node.`,
77+
);
7478
}
79+
80+
// Operations are wrapped in a single transaction to ensure atomicity.
81+
await this.store.transactionAsync(async () => {
82+
await this.noteStore.rollback(event.block.number, oldAnchorBlockNumber);
83+
await this.privateEventStore.rollback(event.block.number, oldAnchorBlockNumber);
84+
await this.anchorBlockStore.setHeader(newAnchorBlockHeader);
85+
});
7586
break;
7687
}
7788
}

yarn-project/pxe/src/pxe.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ export class PXE {
148148
const tipsStore = new L2TipsKVStore(store, 'pxe');
149149
const synchronizer = new BlockSynchronizer(
150150
node,
151+
store,
151152
anchorBlockStore,
152153
noteStore,
153154
privateEventStore,

yarn-project/pxe/src/storage/note_store/note_store.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ describe('NoteStore', () => {
488488
});
489489
});
490490

491-
describe('NoteStore.rollbackNotesAndNullifiers', () => {
491+
describe('NoteStore.rollback', () => {
492492
let provider: NoteStore;
493493
let store: AztecLMDBStoreV2;
494494

@@ -521,7 +521,7 @@ describe('NoteStore', () => {
521521
// Apply nullifiers and rollback to block 3
522522
// - should restore noteBlock3 (nullified at block 4) and preserve noteBlock1 (nullified at block 2)
523523
await provider.applyNullifiers(nullifiers);
524-
await provider.rollbackNotesAndNullifiers(3, 6);
524+
await provider.rollback(3, 6);
525525
}
526526

527527
beforeEach(async () => {
@@ -584,7 +584,7 @@ describe('NoteStore', () => {
584584

585585
// Since nullification happened at block 5 (not after), it should stay nullified
586586
// The rewind loop processes blocks (blockNumber+1) to synchedBlockNumber = 6 to 5 = no iterations
587-
await provider.rollbackNotesAndNullifiers(5, 5);
587+
await provider.rollback(5, 5);
588588

589589
const activeNotes = await provider.getNotes({ contractAddress: CONTRACT_A });
590590
expect(activeNotes).toHaveLength(0);
@@ -610,7 +610,7 @@ describe('NoteStore', () => {
610610
await provider.applyNullifiers(nullifiers);
611611

612612
// blockNumber=6, synchedBlockNumber=4 therefore no nullifications to rewind
613-
await provider.rollbackNotesAndNullifiers(6, 4);
613+
await provider.rollback(6, 4);
614614

615615
const activeNotes = await provider.getNotes({ contractAddress: CONTRACT_A });
616616
expect(activeNotes).toHaveLength(0);
@@ -635,7 +635,7 @@ describe('NoteStore', () => {
635635
},
636636
];
637637
await provider.applyNullifiers(nullifiers);
638-
await provider.rollbackNotesAndNullifiers(5, 100);
638+
await provider.rollback(5, 100);
639639

640640
// note1 should be restored (nullified at block 7 > rollback block 5)
641641
// note2 should be deleted (created at block 10 > rollback block 5)
@@ -644,7 +644,7 @@ describe('NoteStore', () => {
644644
});
645645

646646
it('handles rollback on empty PXE database gracefully', async () => {
647-
await expect(provider.rollbackNotesAndNullifiers(10, 20)).resolves.not.toThrow();
647+
await expect(provider.rollback(10, 20)).resolves.not.toThrow();
648648
const notes = await provider.getNotes({ contractAddress: CONTRACT_A });
649649
expect(notes).toHaveLength(0);
650650
});

yarn-project/pxe/src/storage/note_store/note_store.ts

Lines changed: 58 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,12 @@ export class NoteStore {
124124
* specified block number. It restores any notes that were nullified after the given block
125125
* and deletes any active notes created after that block.
126126
*
127+
* IMPORTANT: This method must be called within a transaction to ensure atomicity.
128+
*
127129
* @param blockNumber - The new chain tip after a reorg
128130
* @param synchedBlockNumber - The block number up to which PXE managed to sync before the reorg happened.
129131
*/
130-
public async rollbackNotesAndNullifiers(blockNumber: number, synchedBlockNumber: number): Promise<void> {
132+
public async rollback(blockNumber: number, synchedBlockNumber: number): Promise<void> {
131133
await this.#rewindNullifiersAfterBlock(blockNumber, synchedBlockNumber);
132134
await this.#deleteActiveNotesAfterBlock(blockNumber);
133135
}
@@ -140,24 +142,22 @@ export class NoteStore {
140142
*
141143
* @param blockNumber - Notes created after this block number will be deleted
142144
*/
143-
#deleteActiveNotesAfterBlock(blockNumber: number): Promise<void> {
144-
return this.#store.transactionAsync(async () => {
145-
const notes = await toArray(this.#notes.valuesAsync());
146-
for (const note of notes) {
147-
const noteDao = NoteDao.fromBuffer(note);
148-
if (noteDao.l2BlockNumber > blockNumber) {
149-
const noteIndex = toBufferBE(noteDao.index, 32).toString('hex');
150-
await this.#notes.delete(noteIndex);
151-
await this.#notesToScope.delete(noteIndex);
152-
await this.#nullifierToNoteId.delete(noteDao.siloedNullifier.toString());
153-
const scopes = await toArray(this.#scopes.keysAsync());
154-
for (const scope of scopes) {
155-
await this.#notesByContractAndScope.get(scope)!.deleteValue(noteDao.contractAddress.toString(), noteIndex);
156-
await this.#notesByStorageSlotAndScope.get(scope)!.deleteValue(noteDao.storageSlot.toString(), noteIndex);
157-
}
145+
async #deleteActiveNotesAfterBlock(blockNumber: number): Promise<void> {
146+
const notes = await toArray(this.#notes.valuesAsync());
147+
for (const note of notes) {
148+
const noteDao = NoteDao.fromBuffer(note);
149+
if (noteDao.l2BlockNumber > blockNumber) {
150+
const noteIndex = toBufferBE(noteDao.index, 32).toString('hex');
151+
await this.#notes.delete(noteIndex);
152+
await this.#notesToScope.delete(noteIndex);
153+
await this.#nullifierToNoteId.delete(noteDao.siloedNullifier.toString());
154+
const scopes = await toArray(this.#scopes.keysAsync());
155+
for (const scope of scopes) {
156+
await this.#notesByContractAndScope.get(scope)!.deleteValue(noteDao.contractAddress.toString(), noteIndex);
157+
await this.#notesByStorageSlotAndScope.get(scope)!.deleteValue(noteDao.storageSlot.toString(), noteIndex);
158158
}
159159
}
160-
});
160+
}
161161
}
162162

163163
/**
@@ -171,50 +171,52 @@ export class NoteStore {
171171
* @param synchedBlockNumber - Upper bound for the block range to process
172172
*/
173173
async #rewindNullifiersAfterBlock(blockNumber: number, synchedBlockNumber: number): Promise<void> {
174-
await this.#store.transactionAsync(async () => {
175-
const nullifiersToUndo: string[] = [];
176-
const currentBlockNumber = blockNumber + 1;
177-
for (let i = currentBlockNumber; i <= synchedBlockNumber; i++) {
178-
nullifiersToUndo.push(...(await toArray(this.#nullifiersByBlockNumber.getValuesAsync(i))));
179-
}
180-
const notesIndexesToReinsert = await Promise.all(
181-
nullifiersToUndo.map(nullifier => this.#nullifiedNotesByNullifier.getAsync(nullifier)),
182-
);
183-
const notNullNoteIndexes = notesIndexesToReinsert.filter(noteIndex => noteIndex != undefined);
184-
const nullifiedNoteBuffers = await Promise.all(
185-
notNullNoteIndexes.map(noteIndex => this.#nullifiedNotes.getAsync(noteIndex!)),
186-
);
187-
const noteDaos = nullifiedNoteBuffers
188-
.filter(buffer => buffer != undefined)
189-
.map(buffer => NoteDao.fromBuffer(buffer!));
190-
191-
for (const dao of noteDaos) {
192-
const noteIndex = toBufferBE(dao.index, 32).toString('hex');
193-
await this.#notes.set(noteIndex, dao.toBuffer());
194-
await this.#nullifierToNoteId.set(dao.siloedNullifier.toString(), noteIndex);
174+
const nullifiersToUndo: string[] = [];
175+
const currentBlockNumber = blockNumber + 1;
176+
for (let i = currentBlockNumber; i <= synchedBlockNumber; i++) {
177+
nullifiersToUndo.push(...(await toArray(this.#nullifiersByBlockNumber.getValuesAsync(i))));
178+
}
179+
const notesIndexesToReinsert = await Promise.all(
180+
nullifiersToUndo.map(nullifier => this.#nullifiedNotesByNullifier.getAsync(nullifier)),
181+
);
182+
const notNullNoteIndexes = notesIndexesToReinsert.filter(noteIndex => noteIndex != undefined);
183+
const nullifiedNoteBuffers = await Promise.all(
184+
notNullNoteIndexes.map(noteIndex => this.#nullifiedNotes.getAsync(noteIndex!)),
185+
);
186+
const noteDaos = nullifiedNoteBuffers
187+
.filter(buffer => buffer != undefined)
188+
.map(buffer => NoteDao.fromBuffer(buffer!));
195189

196-
const scopes = await toArray(this.#nullifiedNotesToScope.getValuesAsync(noteIndex));
190+
for (const dao of noteDaos) {
191+
const noteIndex = toBufferBE(dao.index, 32).toString('hex');
197192

198-
if (scopes.length === 0) {
199-
// We should never run into this error because notes always have a scope assigned to them - either on initial
200-
// insertion via `addNotes` or when removing their nullifiers.
201-
throw new Error(`No scopes found for nullified note with index ${noteIndex}`);
202-
}
193+
const scopes = await toArray(this.#nullifiedNotesToScope.getValuesAsync(noteIndex));
203194

204-
for (const scope of scopes) {
205-
await this.#notesByContractAndScope.get(scope.toString())!.set(dao.contractAddress.toString(), noteIndex);
206-
await this.#notesByStorageSlotAndScope.get(scope.toString())!.set(dao.storageSlot.toString(), noteIndex);
207-
await this.#notesToScope.set(noteIndex, scope);
208-
}
195+
if (scopes.length === 0) {
196+
// We should never run into this error because notes always have a scope assigned to them - either on initial
197+
// insertion via `addNotes` or when removing their nullifiers.
198+
throw new Error(`No scopes found for nullified note with index ${noteIndex}`);
199+
}
209200

210-
await this.#nullifiedNotes.delete(noteIndex);
211-
await this.#nullifiedNotesToScope.delete(noteIndex);
212-
await this.#nullifiersByBlockNumber.deleteValue(dao.l2BlockNumber, dao.siloedNullifier.toString());
213-
await this.#nullifiedNotesByContract.deleteValue(dao.contractAddress.toString(), noteIndex);
214-
await this.#nullifiedNotesByStorageSlot.deleteValue(dao.storageSlot.toString(), noteIndex);
215-
await this.#nullifiedNotesByNullifier.delete(dao.siloedNullifier.toString());
201+
for (const scope of scopes) {
202+
await Promise.all([
203+
this.#notesByContractAndScope.get(scope.toString())!.set(dao.contractAddress.toString(), noteIndex),
204+
this.#notesByStorageSlotAndScope.get(scope.toString())!.set(dao.storageSlot.toString(), noteIndex),
205+
this.#notesToScope.set(noteIndex, scope),
206+
]);
216207
}
217-
});
208+
209+
await Promise.all([
210+
this.#notes.set(noteIndex, dao.toBuffer()),
211+
this.#nullifierToNoteId.set(dao.siloedNullifier.toString(), noteIndex),
212+
this.#nullifiedNotes.delete(noteIndex),
213+
this.#nullifiedNotesToScope.delete(noteIndex),
214+
this.#nullifiersByBlockNumber.deleteValue(dao.l2BlockNumber, dao.siloedNullifier.toString()),
215+
this.#nullifiedNotesByContract.deleteValue(dao.contractAddress.toString(), noteIndex),
216+
this.#nullifiedNotesByStorageSlot.deleteValue(dao.storageSlot.toString(), noteIndex),
217+
this.#nullifiedNotesByNullifier.delete(dao.siloedNullifier.toString()),
218+
]);
219+
}
218220
}
219221

220222
/**

yarn-project/pxe/src/storage/private_event_store/private_event_store.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ describe('PrivateEventStore', () => {
286286
});
287287

288288
// Rollback to block 150 (should remove events from blocks 200 and 300)
289-
await privateEventStore.rollbackEventsAfterBlock(150, 300);
289+
await privateEventStore.rollback(150, 300);
290290

291291
const events = await privateEventStore.getPrivateEvents(eventSelector, {
292292
contractAddress,
@@ -315,7 +315,7 @@ describe('PrivateEventStore', () => {
315315
});
316316

317317
// Rollback to block 100
318-
await privateEventStore.rollbackEventsAfterBlock(100, 200);
318+
await privateEventStore.rollback(100, 200);
319319

320320
// Verify event was removed
321321
let events = await privateEventStore.getPrivateEvents(eventSelector, {
@@ -355,7 +355,7 @@ describe('PrivateEventStore', () => {
355355
});
356356

357357
// Rollback after all existing events
358-
await privateEventStore.rollbackEventsAfterBlock(200, 300);
358+
await privateEventStore.rollback(200, 300);
359359

360360
const events = await privateEventStore.getPrivateEvents(eventSelector, {
361361
contractAddress,

0 commit comments

Comments
 (0)