Skip to content

Commit 598d440

Browse files
authored
fix: Enable block proposal mempool in p2p prover clients (#19190)
This allows prover clients to track block proposals as they are broadcasted, which they rely on to mark txs as non evictable. Note that prover clients were already doing this, but they were unable to protect against an evil proposer blasting multiple proposals per block.
2 parents d3be5a7 + ee97aa9 commit 598d440

File tree

9 files changed

+33
-51
lines changed

9 files changed

+33
-51
lines changed

yarn-project/p2p/src/client/factory.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import { configureP2PClientAddresses, createLibP2PPeerIdFromPrivateKey, getPeerI
2626
export type P2PClientDeps<T extends P2PClientType> = {
2727
txPool?: TxPool;
2828
store?: AztecAsyncKVStore;
29-
attestationPool?: T extends P2PClientType.Full ? AttestationPool : undefined;
29+
attestationPool?: AttestationPool;
3030
logger?: Logger;
3131
txCollectionNodeSources?: TxSource[];
3232
p2pServiceFactory?: (...args: Parameters<(typeof LibP2PService)['new']>) => Promise<LibP2PService<T>>;
@@ -73,19 +73,14 @@ export async function createP2PClient<T extends P2PClientType>(
7373
);
7474
const l1Constants = await archiver.getL1Constants();
7575

76-
const mempools: MemPools<T> = {
76+
const mempools: MemPools = {
7777
txPool:
7878
deps.txPool ??
7979
new AztecKVTxPool(store, archive, worldStateSynchronizer, telemetry, {
8080
maxTxPoolSize: config.maxTxPoolSize,
8181
archivedTxLimit: config.archivedTxLimit,
8282
}),
83-
attestationPool:
84-
clientType === P2PClientType.Full
85-
? ((deps.attestationPool ?? new KvAttestationPool(attestationStore, telemetry)) as T extends P2PClientType.Full
86-
? AttestationPool
87-
: undefined)
88-
: undefined,
83+
attestationPool: deps.attestationPool ?? new KvAttestationPool(attestationStore, telemetry),
8984
};
9085

9186
const p2pService = await createP2PService<T>(
@@ -147,7 +142,7 @@ async function createP2PService<T extends P2PClientType>(
147142
epochCache: EpochCacheInterface,
148143
store: AztecAsyncKVStore,
149144
peerStore: AztecLMDBStoreV2,
150-
mempools: MemPools<T>,
145+
mempools: MemPools,
151146
p2pServiceFactory: P2PClientDeps<T>['p2pServiceFactory'],
152147
packageVersion: string,
153148
logger: Logger,

yarn-project/p2p/src/client/p2p_client.ts

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
6969
private synchedLatestSlot: AztecAsyncSingleton<bigint>;
7070

7171
private txPool: TxPool;
72-
private attestationPool: T extends P2PClientType.Full ? AttestationPool : undefined;
72+
private attestationPool: AttestationPool;
7373

7474
private config: P2PConfig;
7575

@@ -91,7 +91,7 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
9191
_clientType: T,
9292
private store: AztecAsyncKVStore,
9393
private l2BlockSource: L2BlockSource & ContractDataSource,
94-
mempools: MemPools<T>,
94+
mempools: MemPools,
9595
private p2pService: P2PService,
9696
private txCollection: TxCollection,
9797
config: Partial<P2PConfig> = {},
@@ -103,7 +103,7 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
103103

104104
this.config = { ...getP2PDefaultConfig(), ...config };
105105
this.txPool = mempools.txPool;
106-
this.attestationPool = mempools.attestationPool!;
106+
this.attestationPool = mempools.attestationPool;
107107

108108
this.txProvider = new TxProvider(
109109
this.txCollection,
@@ -282,10 +282,7 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
282282
const syncedProvenBlock = (await this.getSyncedProvenBlockNum()) + 1;
283283
const syncedFinalizedBlock = (await this.getSyncedFinalizedBlockNum()) + 1;
284284

285-
if (
286-
(await this.txPool.isEmpty()) &&
287-
(this.attestationPool === undefined || (await this.attestationPool?.isEmpty()))
288-
) {
285+
if ((await this.txPool.isEmpty()) && (await this.attestationPool.isEmpty())) {
289286
// if mempools are empty, we don't care about syncing prior blocks
290287
this.initBlockStream(BlockNumber(this.latestBlockNumberAtStart));
291288
this.setCurrentState(P2PClientState.RUNNING);
@@ -389,19 +386,17 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
389386
}
390387

391388
public async getAttestationsForSlot(slot: SlotNumber, proposalId?: string): Promise<BlockAttestation[]> {
392-
return (
393-
(await (proposalId
394-
? this.attestationPool?.getAttestationsForSlotAndProposal(slot, proposalId)
395-
: this.attestationPool?.getAttestationsForSlot(slot))) ?? []
396-
);
389+
return await (proposalId
390+
? this.attestationPool.getAttestationsForSlotAndProposal(slot, proposalId)
391+
: this.attestationPool.getAttestationsForSlot(slot));
397392
}
398393

399394
public addAttestations(attestations: BlockAttestation[]): Promise<void> {
400-
return this.attestationPool?.addAttestations(attestations) ?? Promise.resolve();
395+
return this.attestationPool.addAttestations(attestations);
401396
}
402397

403398
public deleteAttestation(attestation: BlockAttestation): Promise<void> {
404-
return this.attestationPool?.deleteAttestations([attestation]) ?? Promise.resolve();
399+
return this.attestationPool.deleteAttestations([attestation]);
405400
}
406401

407402
// REVIEW: https://github.com/AztecProtocol/aztec-packages/issues/7963
@@ -782,7 +777,7 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
782777
await this.txPool.deleteTxs(txHashes, { permanently: true });
783778
await this.txPool.cleanupDeletedMinedTxs(lastBlockNum);
784779

785-
await this.attestationPool?.deleteAttestationsOlderThan(lastBlockSlot);
780+
await this.attestationPool.deleteAttestationsOlderThan(lastBlockSlot);
786781

787782
await this.synchedFinalizedBlockNumber.set(lastBlockNum);
788783
this.log.debug(`Synched to finalized block ${lastBlockNum} at slot ${lastBlockSlot}`);
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
1-
import type { P2PClientType } from '@aztec/stdlib/p2p';
2-
31
import type { AttestationPool } from './attestation_pool/attestation_pool.js';
42
import type { TxPool } from './tx_pool/tx_pool.js';
53

64
/**
75
* A interface the combines all mempools
86
*/
9-
export type MemPools<T extends P2PClientType = P2PClientType.Full> = {
7+
export type MemPools = {
108
txPool: TxPool;
11-
attestationPool?: T extends P2PClientType.Full ? AttestationPool : undefined;
9+
attestationPool: AttestationPool;
1210
};

yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ describe('LibP2PService', () => {
231231
function setProposalTxHashes(
232232
svc: {
233233
mempools: {
234-
attestationPool?: {
234+
attestationPool: {
235235
getBlockProposal: (id: string) => Promise<{ txHashes: { toString(): string }[] } | undefined>;
236236
};
237237
};

yarn-project/p2p/src/services/libp2p/libp2p_service.ts

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
153153
private peerDiscoveryService: PeerDiscoveryService,
154154
private reqresp: ReqRespInterface,
155155
private peerManager: PeerManagerInterface,
156-
protected mempools: MemPools<T>,
156+
protected mempools: MemPools,
157157
private archiver: L2BlockSource & ContractDataSource,
158158
private epochCache: EpochCacheInterface,
159159
private proofVerifier: ClientProtocolCircuitVerifier,
@@ -185,7 +185,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
185185

186186
// Use FishermanAttestationValidator in fisherman mode to validate attestation payloads against proposals
187187
this.attestationValidator = config.fishermanMode
188-
? new FishermanAttestationValidator(epochCache, mempools.attestationPool!, telemetry)
188+
? new FishermanAttestationValidator(epochCache, mempools.attestationPool, telemetry)
189189
: new AttestationValidator(epochCache);
190190
this.blockProposalValidator = new BlockProposalValidator(epochCache, { txsPermitted: !config.disableTransactions });
191191

@@ -215,7 +215,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
215215
config: P2PConfig,
216216
peerId: PeerId,
217217
deps: {
218-
mempools: MemPools<T>;
218+
mempools: MemPools;
219219
l2BlockSource: L2BlockSource & ContractDataSource;
220220
epochCache: EpochCacheInterface;
221221
proofVerifier: ClientProtocolCircuitVerifier;
@@ -486,8 +486,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
486486
[ReqRespSubProtocol.BLOCK]: blockHandler.bind(this),
487487
};
488488

489-
// Only handle block transactions request if attestation pool is available to the client
490-
if (this.mempools.attestationPool && !this.config.disableTransactions) {
489+
if (!this.config.disableTransactions) {
491490
const blockTxsHandler = reqRespBlockTxsHandler(this.mempools.attestationPool, this.mempools.txPool);
492491
requestResponseHandlers[ReqRespSubProtocol.BLOCK_TXS] = blockTxsHandler.bind(this);
493492
}
@@ -809,7 +808,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
809808
private async processAttestationFromPeer(payloadData: Buffer, msgId: string, source: PeerId): Promise<void> {
810809
const validationFunc: () => Promise<ReceivedMessageValidationResult<BlockAttestation>> = async () => {
811810
const attestation = BlockAttestation.fromBuffer(payloadData);
812-
const pool = this.mempools.attestationPool!;
811+
const pool = this.mempools.attestationPool;
813812
const isValid = await this.validateAttestation(source, attestation);
814813
const exists = isValid && (await pool.hasAttestation(attestation));
815814

@@ -866,7 +865,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
866865
},
867866
);
868867

869-
await this.mempools.attestationPool!.addAttestations([attestation]);
868+
await this.mempools.attestationPool.addAttestations([attestation]);
870869
}
871870

872871
private async processBlockFromPeer(payloadData: Buffer, msgId: string, source: PeerId): Promise<void> {
@@ -875,10 +874,8 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
875874
const isValid = await this.validateBlockProposal(source, block);
876875
const pool = this.mempools.attestationPool;
877876

878-
// Note that we dont have an attestation pool if we're a prover node, but we still
879-
// subscribe to block proposal topics in order to prevent their txs from being cleared.
880-
const exists = isValid && pool ? await pool.hasBlockProposal(block) : false;
881-
const canAdd = isValid && pool ? await pool.canAddProposal(block) : true; // If pool DNE, set canAdd to true to avoid peer penalization (the block is not added to the pool)
877+
const exists = isValid && (await pool.hasBlockProposal(block));
878+
const canAdd = isValid && (await pool.canAddProposal(block));
882879

883880
this.logger.trace(`Validate propagated block proposal`, {
884881
isValid,
@@ -934,14 +931,12 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
934931
archive: block.archive.toString(),
935932
source: sender.toString(),
936933
});
937-
const attestationsForPreviousSlot = await this.mempools.attestationPool?.getAttestationsForSlot(previousSlot);
938-
if (attestationsForPreviousSlot !== undefined) {
939-
this.logger.verbose(`Received ${attestationsForPreviousSlot.length} attestations for slot ${previousSlot}`);
940-
}
934+
const attestationsForPreviousSlot = await this.mempools.attestationPool.getAttestationsForSlot(previousSlot);
935+
this.logger.verbose(`Received ${attestationsForPreviousSlot.length} attestations for slot ${previousSlot}`);
941936

942937
// Attempt to add proposal, then mark the txs in this proposal as non-evictable
943938
try {
944-
await this.mempools.attestationPool?.addBlockProposal(block);
939+
await this.mempools.attestationPool.addBlockProposal(block);
945940
} catch (err: unknown) {
946941
// Drop proposals if we hit per-slot cap in the attestation pool; rethrow unknown errors
947942
if (err instanceof ProposalSlotCapExceededError) {
@@ -1047,7 +1042,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
10471042
}
10481043

10491044
// Given proposal (should have locally), ensure returned txs are valid subset and match request indices
1050-
const proposal = await this.mempools.attestationPool?.getBlockProposal(request.blockHash.toString());
1045+
const proposal = await this.mempools.attestationPool.getBlockProposal(request.blockHash.toString());
10511046
if (proposal) {
10521047
// Build intersected indices
10531048
const intersectIdx = request.txIndices.getTrueIndices().filter(i => response.txIndices.isSet(i));

yarn-project/p2p/src/services/reqresp/protocols/tx.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { chunk } from '@aztec/foundation/collection';
2-
import type { P2PClientType } from '@aztec/stdlib/p2p';
32
import { TxArray, TxHash, TxHashArray } from '@aztec/stdlib/tx';
43

54
import type { PeerId } from '@libp2p/interface';
@@ -16,7 +15,7 @@ import { ReqRespStatus, ReqRespStatusError } from '../status.js';
1615
* @param mempools - the mempools
1716
* @returns the Tx request handler
1817
*/
19-
export function reqRespTxHandler<T extends P2PClientType>(mempools: MemPools<T>): ReqRespSubProtocolHandler {
18+
export function reqRespTxHandler(mempools: MemPools): ReqRespSubProtocolHandler {
2019
/**
2120
* Handler for tx requests
2221
* @param msg - the tx request message

yarn-project/p2p/src/test-helpers/mock-pubsub.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ export function getMockPubSubP2PServiceFactory<T extends P2PClientType>(
3939
peerId: PeerId,
4040
deps: {
4141
packageVersion: string;
42-
mempools: MemPools<T>;
42+
mempools: MemPools;
4343
l2BlockSource: L2BlockSource & ContractDataSource;
4444
epochCache: EpochCacheInterface;
4545
proofVerifier: ClientProtocolCircuitVerifier;

yarn-project/p2p/src/test-helpers/reqresp-nodes.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ export async function createTestLibP2PService<T extends P2PClientType>(
112112
archiver: L2BlockSource & ContractDataSource,
113113
worldStateSynchronizer: WorldStateSynchronizer,
114114
epochCache: EpochCache,
115-
mempools: MemPools<T>,
115+
mempools: MemPools,
116116
telemetry: TelemetryClient,
117117
port: number = 0,
118118
peerId?: PeerId,

yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class TestLibP2PService<T extends P2PClientType = P2PClientType.Full> extends Li
126126
peerDiscoveryService: PeerDiscoveryService,
127127
reqresp: ReqResp,
128128
peerManager: PeerManager,
129-
mempools: MemPools<T>,
129+
mempools: MemPools,
130130
archiver: L2BlockSource & ContractDataSource,
131131
epochCache: EpochCacheInterface,
132132
proofVerifier: ClientProtocolCircuitVerifier,

0 commit comments

Comments
 (0)