Skip to content

Commit 0d19b92

Browse files
authored
feat: message propagation metrics (#19265)
Ref: A-413, A-412 Span to track attestation propagation on `AttestToProposal` function Peer connection time metric Also changed metrics from previous pr: 1. track deleted transactions in `cleanupDeletedMinedTxs` 3. Record p2p requested transaction even if we requested zero
2 parents 7910a74 + caa79de commit 0d19b92

File tree

10 files changed

+52
-14
lines changed

10 files changed

+52
-14
lines changed

yarn-project/p2p/src/mem_pools/instrumentation.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { Gossipable } from '@aztec/stdlib/p2p';
2-
import type { Tx, TxHash } from '@aztec/stdlib/tx';
2+
import type { Tx } from '@aztec/stdlib/tx';
33
import {
44
Attributes,
55
type BatchObservableResult,
@@ -72,7 +72,7 @@ export class PoolInstrumentation<PoolObject extends Gossipable> {
7272
private defaultAttributes;
7373
private meter: Meter;
7474

75-
private txAddedTimestamp: Map<string, number> = new Map<string, number>();
75+
private txAddedTimestamp: Map<bigint, number> = new Map<bigint, number>();
7676

7777
constructor(
7878
telemetry: TelemetryClient,
@@ -124,17 +124,16 @@ export class PoolInstrumentation<PoolObject extends Gossipable> {
124124
public transactionsAdded(transactions: Tx[]) {
125125
const timestamp = Date.now();
126126
for (const transaction of transactions) {
127-
this.txAddedTimestamp.set(transaction.txHash.toString(), timestamp);
127+
this.txAddedTimestamp.set(transaction.txHash.toBigInt(), timestamp);
128128
}
129129
}
130130

131-
public transactionsRemoved(hashes: TxHash[]) {
131+
public transactionsRemoved(hashes: Iterable<bigint> | Iterable<string>) {
132132
const timestamp = Date.now();
133133
for (const hash of hashes) {
134-
const key = hash.toString();
135-
const addedAt = this.txAddedTimestamp.get(key);
134+
const addedAt = this.txAddedTimestamp.get(BigInt(hash));
136135
if (addedAt) {
137-
this.txAddedTimestamp.delete(key);
136+
this.txAddedTimestamp.delete(BigInt(hash));
138137
this.minedDelay.record(addedAt - timestamp);
139138
}
140139
}

yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ export class AztecKVTxPool extends (EventEmitter as new () => TypedEventEmitter<
187187

188188
await this.evictInvalidTxsAfterMining(txHashes, blockHeader, minedNullifiers, minedFeePayers);
189189
});
190-
this.#metrics.transactionsRemoved(txHashes);
190+
this.#metrics.transactionsRemoved(txHashes.map(hash => hash.toBigInt()));
191191
// We update this after the transaction above. This ensures that the non-evictable transactions are not evicted
192192
// until any that have been mined are marked as such.
193193
// The non-evictable set is not considered when evicting transactions that are invalid after a block is mined.
@@ -386,7 +386,7 @@ export class AztecKVTxPool extends (EventEmitter as new () => TypedEventEmitter<
386386

387387
await this.#pendingTxSize.set(pendingTxSize);
388388
});
389-
this.#metrics.transactionsRemoved(txHashes);
389+
this.#metrics.transactionsRemoved(txHashes.map(hash => hash.toBigInt()));
390390
this.#log.debug(`Deleted ${txHashes.length} txs from pool`, { txHashes });
391391
return this.#archivedTxLimit ? poolDbTx.then(() => this.archiveTxs(deletedTxs)) : poolDbTx;
392392
}
@@ -464,6 +464,7 @@ export class AztecKVTxPool extends (EventEmitter as new () => TypedEventEmitter<
464464
deletedCount++;
465465
}
466466
}
467+
this.#metrics.transactionsRemoved(txHashesToDelete);
467468

468469
// Clean up block-to-hash mapping - delete all values for each block
469470
for (const block of blocksToDelete) {

yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ export class InMemoryTxPool extends (EventEmitter as new () => TypedEventEmitter
7676
this.minedTxs.set(key, blockHeader.globalVariables.blockNumber);
7777
this.pendingTxs.delete(key);
7878
}
79-
this.metrics.transactionsRemoved(txHashes);
79+
this.metrics.transactionsRemoved(txHashes.map(hash => hash.toBigInt()));
8080
return Promise.resolve();
8181
}
8282

@@ -226,7 +226,7 @@ export class InMemoryTxPool extends (EventEmitter as new () => TypedEventEmitter
226226
}
227227
}
228228
}
229-
this.metrics.transactionsRemoved(txHashes);
229+
this.metrics.transactionsRemoved(txHashes.map(hash => hash.toBigInt()));
230230

231231
return Promise.resolve();
232232
}
@@ -271,6 +271,7 @@ export class InMemoryTxPool extends (EventEmitter as new () => TypedEventEmitter
271271
this.deletedMinedTxHashes.delete(txHash);
272272
deletedCount++;
273273
}
274+
this.metrics.transactionsRemoved(txHashes);
274275
blocksToDelete.push(block);
275276
}
276277
}

yarn-project/p2p/src/services/peer-manager/metrics.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {
22
Attributes,
33
type Gauge,
4+
type Histogram,
45
Metrics,
56
type TelemetryClient,
67
type Tracer,
@@ -9,13 +10,18 @@ import {
910
getTelemetryClient,
1011
} from '@aztec/telemetry-client';
1112

13+
import type { PeerId } from '@libp2p/interface';
14+
1215
import { type GoodByeReason, prettyGoodbyeReason } from '../reqresp/protocols/index.js';
1316

1417
export class PeerManagerMetrics {
1518
private sentGoodbyes: UpDownCounter;
1619
private receivedGoodbyes: UpDownCounter;
1720
private peerCount: Gauge;
1821
private lowScoreDisconnects: UpDownCounter;
22+
private peerConnectionDuration: Histogram;
23+
24+
private peerConnectedAt: Map<string, number> = new Map<string, number>();
1925

2026
public readonly tracer: Tracer;
2127

@@ -46,6 +52,11 @@ export class PeerManagerMetrics {
4652
unit: 'peers',
4753
valueType: ValueType.INT,
4854
});
55+
this.peerConnectionDuration = meter.createHistogram(Metrics.PEER_MANAGER_PEER_CONNECTION_DURATION, {
56+
description: 'Time duration between peer connection and disconnection',
57+
unit: 'ms',
58+
valueType: ValueType.INT,
59+
});
4960
}
5061

5162
public recordGoodbyeSent(reason: GoodByeReason) {
@@ -63,4 +74,15 @@ export class PeerManagerMetrics {
6374
public recordLowScoreDisconnect(scoreState: 'Banned' | 'Disconnect') {
6475
this.lowScoreDisconnects.add(1, { [Attributes.P2P_PEER_SCORE_STATE]: scoreState });
6576
}
77+
78+
public peerConnected(id: PeerId) {
79+
this.peerConnectedAt.set(id.toString(), Date.now());
80+
}
81+
82+
public peerDisconnected(id: PeerId) {
83+
const connectedAt = this.peerConnectedAt.get(id.toString());
84+
if (connectedAt) {
85+
this.peerConnectionDuration.record(Date.now() - connectedAt);
86+
}
87+
}
6688
}

yarn-project/p2p/src/services/peer-manager/peer_manager.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ export class PeerManager implements PeerManagerInterface {
278278
private handleConnectedPeerEvent(e: CustomEvent<PeerId>) {
279279
const peerId = e.detail;
280280
this.logger.verbose(`Connected to peer ${peerId.toString()}`);
281+
this.metrics.peerConnected(peerId);
281282
if (this.config.p2pDisableStatusHandshake) {
282283
return;
283284
}
@@ -303,6 +304,7 @@ export class PeerManager implements PeerManagerInterface {
303304
*/
304305
private handleDisconnectedPeerEvent(e: CustomEvent<PeerId>) {
305306
const peerId = e.detail;
307+
this.metrics.peerDisconnected(peerId);
306308
this.logger.verbose(`Disconnected from peer ${peerId.toString()}`);
307309
const validatorAddress = this.authenticatedPeerIdToValidatorAddress.get(peerId.toString());
308310
if (validatorAddress !== undefined) {

yarn-project/p2p/src/services/tx_provider.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ export class TxProvider implements ITxProvider {
137137
);
138138

139139
if (missingTxHashes.size === 0) {
140+
this.instrumentation.incTxsFromP2P(0, txHashes.length);
140141
return { txsFromMempool };
141142
}
142143

@@ -155,6 +156,7 @@ export class TxProvider implements ITxProvider {
155156

156157
if (missingTxHashes.size === 0) {
157158
await this.processProposalTxs(txsFromProposal);
159+
this.instrumentation.incTxsFromP2P(0, txHashes.length);
158160
return { txsFromMempool, txsFromProposal };
159161
}
160162

@@ -166,13 +168,14 @@ export class TxProvider implements ITxProvider {
166168

167169
if (txsFromNetwork.length > 0) {
168170
txsFromNetwork.forEach(tx => missingTxHashes.delete(tx.txHash.toString()));
169-
this.instrumentation.incTxsFromP2P(txsFromNetwork.length, txHashes.length);
170171
this.log.debug(
171172
`Retrieved ${txsFromNetwork.length} txs from network for block proposal (${missingTxHashes.size} pending)`,
172173
{ ...blockInfo, missingTxHashes: [...missingTxHashes] },
173174
);
174175
}
175176

177+
this.instrumentation.incTxsFromP2P(txsFromNetwork.length, txHashes.length);
178+
176179
if (missingTxHashes.size === 0) {
177180
return { txsFromNetwork, txsFromMempool, txsFromProposal };
178181
}

yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ export class CheckpointProposalJob {
108108
// Wait until the voting promises have resolved, so all requests are enqueued (not sent)
109109
await Promise.all(votesPromises);
110110

111+
if (checkpoint) {
112+
this.metrics.recordBlockProposalSuccess();
113+
}
114+
111115
// Do not post anything to L1 if we are fishermen, but do perform L1 fee analysis
112116
if (this.config.fishermanMode) {
113117
await this.handleCheckpointEndAsFisherman(checkpoint);
@@ -669,7 +673,6 @@ export class CheckpointProposalJob {
669673
...checkpoint.getStats(),
670674
feeAnalysisId: feeAnalysis?.id,
671675
});
672-
this.metrics.recordBlockProposalSuccess();
673676
} else {
674677
this.log.warn(`Validation block building FAILED for slot ${this.slot}`, {
675678
slot: this.slot,

yarn-project/telemetry-client/src/attributes.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ export const P2P_REQ_RESP_BATCH_REQUESTS_COUNT = 'aztec.p2p.req_resp.batch_reque
9999
export const P2P_PEER_SCORE_STATE = 'aztec.p2p.peer_score_state';
100100
export const POOL_NAME = 'aztec.pool.name';
101101

102+
export const PEER_ID = 'aztec.p2p.peer_id';
103+
102104
export const SEQUENCER_STATE = 'aztec.sequencer.state';
103105

104106
export const SIMULATOR_PHASE = 'aztec.simulator.phase';

yarn-project/telemetry-client/src/metrics.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ export const PEER_MANAGER_GOODBYES_SENT = 'aztec.peer_manager.goodbyes_sent';
143143
export const PEER_MANAGER_GOODBYES_RECEIVED = 'aztec.peer_manager.goodbyes_received';
144144
export const PEER_MANAGER_PEER_COUNT = 'aztec.peer_manager.peer_count';
145145
export const PEER_MANAGER_LOW_SCORE_DISCONNECTS = 'aztec.peer_manager.low_score_disconnects';
146+
export const PEER_MANAGER_PEER_CONNECTION_DURATION = 'aztec.peer_manager.peer_connection_duration';
146147
export const P2P_PEER_STATE_COUNT = 'aztec.p2p.peer_state_count';
147148

148149
export const P2P_REQ_RESP_SENT_REQUESTS = 'aztec.p2p.req_resp.sent_requests';

yarn-project/validator-client/src/validator.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import type { BlockAttestation, BlockProposal, BlockProposalOptions } from '@azt
2121
import type { CheckpointHeader } from '@aztec/stdlib/rollup';
2222
import type { Tx } from '@aztec/stdlib/tx';
2323
import { AttestationTimeoutError } from '@aztec/stdlib/validators';
24-
import { type TelemetryClient, type Tracer, getTelemetryClient } from '@aztec/telemetry-client';
24+
import { Attributes, type TelemetryClient, type Tracer, getTelemetryClient, trackSpan } from '@aztec/telemetry-client';
2525

2626
import { EventEmitter } from 'events';
2727
import type { TypedDataDefinition } from 'viem';
@@ -264,6 +264,10 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter)
264264
}
265265
}
266266

267+
@trackSpan('validator.attestToProposal', (proposal, proposalSender) => ({
268+
[Attributes.BLOCK_HASH]: proposal.payload.header.hash.toString(),
269+
[Attributes.PEER_ID]: proposalSender.toString(),
270+
}))
267271
async attestToProposal(proposal: BlockProposal, proposalSender: PeerId): Promise<BlockAttestation[] | undefined> {
268272
const slotNumber = proposal.slotNumber;
269273
const proposer = proposal.getSender();

0 commit comments

Comments
 (0)