diff --git a/packages/beacon-node/src/network/peers/peerManager.ts b/packages/beacon-node/src/network/peers/peerManager.ts index 51e3588992b5..a9f9efcb013d 100644 --- a/packages/beacon-node/src/network/peers/peerManager.ts +++ b/packages/beacon-node/src/network/peers/peerManager.ts @@ -193,6 +193,11 @@ export class PeerManager { this.libp2p.services.components.events.addEventListener(Libp2pEvent.connectionClose, this.onLibp2pPeerDisconnect); this.networkEventBus.on(NetworkEvent.reqRespRequest, this.onRequest); + // Handle peers that were connected before event listener was registered + // This can happen due to libp2p bootstrap module dialing bootnodes + // before PeerManager is initialized + this.handleAlreadyConnectedPeers(); + this.lastStatus = this.statusCache.get(); // On start-up will connected to existing peers in libp2p.peerStore, same as autoDial behaviour @@ -231,6 +236,97 @@ export class PeerManager { for (const interval of this.intervals) clearInterval(interval); } + /** + * Check for peers that were connected before PeerManager started listening + * for connection events. Send STATUS to outbound peers that we haven't + * exchanged status with yet. + * + * This handles the race condition where libp2p bootstrap module connects + * to bootnodes before PeerManager registers its connectionOpen listener. + */ + private handleAlreadyConnectedPeers(): void { + const connections = getConnectionsMap(this.libp2p); + + for (const [peerIdStr, conns] of connections) { + // Check if we already have this peer tracked + if (this.connectedPeers.has(peerIdStr)) { + continue; + } + + // Find an open connection + const openConn = conns.value.find((c) => c.status === "open"); + if (!openConn) { + continue; + } + + this.logger.verbose("Found peer connected before PeerManager init", { + peer: prettyPrintPeerId(openConn.remotePeer), + direction: openConn.direction, + }); + + this.trackNewPeer(openConn); + } + } + + /** + * Track a new peer connection and initialize communication. + * Creates PeerData, sends PING/STATUS for outbound connections, and runs identify protocol. + */ + private trackNewPeer(connection: Connection): void { + const {direction, remotePeer} = connection; + const remotePeerStr = remotePeer.toString(); + const remotePeerPrettyStr = prettyPrintPeerId(remotePeer); + + // On connection: + // - Outbound connections: send a STATUS and PING request + // - Inbound connections: expect to be STATUS'd, schedule STATUS and PING for later + // NOTE: libp2p may emit two "peer:connect" events: One for inbound, one for outbound + // If that happens, it's okay. Only the "outbound" connection triggers immediate action + const now = Date.now(); + const nodeId = computeNodeId(remotePeer); + const peerData: PeerData = { + lastReceivedMsgUnixTsMs: direction === "outbound" ? 0 : now, + // If inbound, request after STATUS_INBOUND_GRACE_PERIOD + lastStatusUnixTsMs: direction === "outbound" ? 0 : now - STATUS_INTERVAL_MS + STATUS_INBOUND_GRACE_PERIOD, + connectedUnixTsMs: now, + relevantStatus: RelevantPeerStatus.Unknown, + direction, + nodeId, + peerId: remotePeer, + status: null, + metadata: null, + agentVersion: null, + agentClient: null, + encodingPreference: null, + }; + this.connectedPeers.set(remotePeerStr, peerData); + + if (direction === "outbound") { + void this.requestPing(remotePeer); + void this.requestStatus(remotePeer, this.statusCache.get()); + } + + this.libp2p.services.identify + .identify(connection) + .then((result) => { + const agentVersion = result.agentVersion; + if (agentVersion) { + peerData.agentVersion = agentVersion; + peerData.agentClient = getKnownClientFromAgentVersion(agentVersion); + } + }) + .catch((err) => { + if (connection.status !== "open") { + this.logger.debug("Peer disconnected during identify protocol", { + peerId: remotePeerPrettyStr, + error: (err as Error).message, + }); + } else { + this.logger.debug("Error setting agentVersion for the peer", {peerId: remotePeerPrettyStr}, err); + } + }); + } + /** * Return peers with at least one connection in status "open" */ @@ -698,9 +794,8 @@ export class PeerManager { * Registers a peer as connected. The `direction` parameter determines if the peer is being * dialed or connecting to us. */ - private onLibp2pPeerConnect = async (evt: CustomEvent): Promise => { + private onLibp2pPeerConnect = (evt: CustomEvent): void => { const {direction, status, remotePeer} = evt.detail; - const remotePeerStr = remotePeer.toString(); const remotePeerPrettyStr = prettyPrintPeerId(remotePeer); this.logger.verbose("peer connected", {peer: remotePeerPrettyStr, direction, status}); // NOTE: The peerConnect event is not emitted here here, but after asserting peer relevance @@ -714,55 +809,7 @@ export class PeerManager { return; } - // On connection: - // - Outbound connections: send a STATUS and PING request - // - Inbound connections: expect to be STATUS'd, schedule STATUS and PING for latter - // NOTE: libp2p may emit two "peer:connect" events: One for inbound, one for outbound - // If that happens, it's okay. Only the "outbound" connection triggers immediate action - const now = Date.now(); - const nodeId = computeNodeId(remotePeer); - const peerData: PeerData = { - lastReceivedMsgUnixTsMs: direction === "outbound" ? 0 : now, - // If inbound, request after STATUS_INBOUND_GRACE_PERIOD - lastStatusUnixTsMs: direction === "outbound" ? 0 : now - STATUS_INTERVAL_MS + STATUS_INBOUND_GRACE_PERIOD, - connectedUnixTsMs: now, - relevantStatus: RelevantPeerStatus.Unknown, - direction, - nodeId, - peerId: remotePeer, - status: null, - metadata: null, - agentVersion: null, - agentClient: null, - encodingPreference: null, - }; - this.connectedPeers.set(remotePeerStr, peerData); - - if (direction === "outbound") { - // this.pingAndStatusTimeouts(); - void this.requestPing(remotePeer); - void this.requestStatus(remotePeer, this.statusCache.get()); - } - - this.libp2p.services.identify - .identify(evt.detail) - .then((result) => { - const agentVersion = result.agentVersion; - if (agentVersion) { - peerData.agentVersion = agentVersion; - peerData.agentClient = getKnownClientFromAgentVersion(agentVersion); - } - }) - .catch((err) => { - if (evt.detail.status !== "open") { - this.logger.debug("Peer disconnected during identify protocol", { - peerId: remotePeerPrettyStr, - error: (err as Error).message, - }); - } else { - this.logger.debug("Error setting agentVersion for the peer", {peerId: remotePeerPrettyStr}, err); - } - }); + this.trackNewPeer(evt.detail); }; /**