Skip to content

Commit 327fa80

Browse files
authored
fix: handle peers connected before PeerManager init (#8770)
## Summary - Fix race condition where libp2p bootstrap module connects to bootnodes before PeerManager registers its `connectionOpen` event listener - Add `handleAlreadyConnectedPeers()` method that checks for existing open connections after registering event listeners - Send STATUS to any outbound peers that were missed, ensuring compliance with the consensus spec requirement that "The dialing client MUST send a Status request upon connection" ## Test plan - [ ] Build passes (`yarn build`) - [ ] Verify in Kurtosis testnet that Lodestar nodes properly send STATUS to Lighthouse bootnodes - [ ] Confirm no orphaned blocks at genesis due to missing peer connections
1 parent 636e19c commit 327fa80

File tree

1 file changed

+98
-51
lines changed

1 file changed

+98
-51
lines changed

packages/beacon-node/src/network/peers/peerManager.ts

Lines changed: 98 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ export class PeerManager {
193193
this.libp2p.services.components.events.addEventListener(Libp2pEvent.connectionClose, this.onLibp2pPeerDisconnect);
194194
this.networkEventBus.on(NetworkEvent.reqRespRequest, this.onRequest);
195195

196+
// Handle peers that were connected before event listener was registered
197+
// This can happen due to libp2p bootstrap module dialing bootnodes
198+
// before PeerManager is initialized
199+
this.handleAlreadyConnectedPeers();
200+
196201
this.lastStatus = this.statusCache.get();
197202

198203
// On start-up will connected to existing peers in libp2p.peerStore, same as autoDial behaviour
@@ -231,6 +236,97 @@ export class PeerManager {
231236
for (const interval of this.intervals) clearInterval(interval);
232237
}
233238

239+
/**
240+
* Check for peers that were connected before PeerManager started listening
241+
* for connection events. Send STATUS to outbound peers that we haven't
242+
* exchanged status with yet.
243+
*
244+
* This handles the race condition where libp2p bootstrap module connects
245+
* to bootnodes before PeerManager registers its connectionOpen listener.
246+
*/
247+
private handleAlreadyConnectedPeers(): void {
248+
const connections = getConnectionsMap(this.libp2p);
249+
250+
for (const [peerIdStr, conns] of connections) {
251+
// Check if we already have this peer tracked
252+
if (this.connectedPeers.has(peerIdStr)) {
253+
continue;
254+
}
255+
256+
// Find an open connection
257+
const openConn = conns.value.find((c) => c.status === "open");
258+
if (!openConn) {
259+
continue;
260+
}
261+
262+
this.logger.verbose("Found peer connected before PeerManager init", {
263+
peer: prettyPrintPeerId(openConn.remotePeer),
264+
direction: openConn.direction,
265+
});
266+
267+
this.trackNewPeer(openConn);
268+
}
269+
}
270+
271+
/**
272+
* Track a new peer connection and initialize communication.
273+
* Creates PeerData, sends PING/STATUS for outbound connections, and runs identify protocol.
274+
*/
275+
private trackNewPeer(connection: Connection): void {
276+
const {direction, remotePeer} = connection;
277+
const remotePeerStr = remotePeer.toString();
278+
const remotePeerPrettyStr = prettyPrintPeerId(remotePeer);
279+
280+
// On connection:
281+
// - Outbound connections: send a STATUS and PING request
282+
// - Inbound connections: expect to be STATUS'd, schedule STATUS and PING for later
283+
// NOTE: libp2p may emit two "peer:connect" events: One for inbound, one for outbound
284+
// If that happens, it's okay. Only the "outbound" connection triggers immediate action
285+
const now = Date.now();
286+
const nodeId = computeNodeId(remotePeer);
287+
const peerData: PeerData = {
288+
lastReceivedMsgUnixTsMs: direction === "outbound" ? 0 : now,
289+
// If inbound, request after STATUS_INBOUND_GRACE_PERIOD
290+
lastStatusUnixTsMs: direction === "outbound" ? 0 : now - STATUS_INTERVAL_MS + STATUS_INBOUND_GRACE_PERIOD,
291+
connectedUnixTsMs: now,
292+
relevantStatus: RelevantPeerStatus.Unknown,
293+
direction,
294+
nodeId,
295+
peerId: remotePeer,
296+
status: null,
297+
metadata: null,
298+
agentVersion: null,
299+
agentClient: null,
300+
encodingPreference: null,
301+
};
302+
this.connectedPeers.set(remotePeerStr, peerData);
303+
304+
if (direction === "outbound") {
305+
void this.requestPing(remotePeer);
306+
void this.requestStatus(remotePeer, this.statusCache.get());
307+
}
308+
309+
this.libp2p.services.identify
310+
.identify(connection)
311+
.then((result) => {
312+
const agentVersion = result.agentVersion;
313+
if (agentVersion) {
314+
peerData.agentVersion = agentVersion;
315+
peerData.agentClient = getKnownClientFromAgentVersion(agentVersion);
316+
}
317+
})
318+
.catch((err) => {
319+
if (connection.status !== "open") {
320+
this.logger.debug("Peer disconnected during identify protocol", {
321+
peerId: remotePeerPrettyStr,
322+
error: (err as Error).message,
323+
});
324+
} else {
325+
this.logger.debug("Error setting agentVersion for the peer", {peerId: remotePeerPrettyStr}, err);
326+
}
327+
});
328+
}
329+
234330
/**
235331
* Return peers with at least one connection in status "open"
236332
*/
@@ -698,9 +794,8 @@ export class PeerManager {
698794
* Registers a peer as connected. The `direction` parameter determines if the peer is being
699795
* dialed or connecting to us.
700796
*/
701-
private onLibp2pPeerConnect = async (evt: CustomEvent<Connection>): Promise<void> => {
797+
private onLibp2pPeerConnect = (evt: CustomEvent<Connection>): void => {
702798
const {direction, status, remotePeer} = evt.detail;
703-
const remotePeerStr = remotePeer.toString();
704799
const remotePeerPrettyStr = prettyPrintPeerId(remotePeer);
705800
this.logger.verbose("peer connected", {peer: remotePeerPrettyStr, direction, status});
706801
// NOTE: The peerConnect event is not emitted here here, but after asserting peer relevance
@@ -714,55 +809,7 @@ export class PeerManager {
714809
return;
715810
}
716811

717-
// On connection:
718-
// - Outbound connections: send a STATUS and PING request
719-
// - Inbound connections: expect to be STATUS'd, schedule STATUS and PING for latter
720-
// NOTE: libp2p may emit two "peer:connect" events: One for inbound, one for outbound
721-
// If that happens, it's okay. Only the "outbound" connection triggers immediate action
722-
const now = Date.now();
723-
const nodeId = computeNodeId(remotePeer);
724-
const peerData: PeerData = {
725-
lastReceivedMsgUnixTsMs: direction === "outbound" ? 0 : now,
726-
// If inbound, request after STATUS_INBOUND_GRACE_PERIOD
727-
lastStatusUnixTsMs: direction === "outbound" ? 0 : now - STATUS_INTERVAL_MS + STATUS_INBOUND_GRACE_PERIOD,
728-
connectedUnixTsMs: now,
729-
relevantStatus: RelevantPeerStatus.Unknown,
730-
direction,
731-
nodeId,
732-
peerId: remotePeer,
733-
status: null,
734-
metadata: null,
735-
agentVersion: null,
736-
agentClient: null,
737-
encodingPreference: null,
738-
};
739-
this.connectedPeers.set(remotePeerStr, peerData);
740-
741-
if (direction === "outbound") {
742-
// this.pingAndStatusTimeouts();
743-
void this.requestPing(remotePeer);
744-
void this.requestStatus(remotePeer, this.statusCache.get());
745-
}
746-
747-
this.libp2p.services.identify
748-
.identify(evt.detail)
749-
.then((result) => {
750-
const agentVersion = result.agentVersion;
751-
if (agentVersion) {
752-
peerData.agentVersion = agentVersion;
753-
peerData.agentClient = getKnownClientFromAgentVersion(agentVersion);
754-
}
755-
})
756-
.catch((err) => {
757-
if (evt.detail.status !== "open") {
758-
this.logger.debug("Peer disconnected during identify protocol", {
759-
peerId: remotePeerPrettyStr,
760-
error: (err as Error).message,
761-
});
762-
} else {
763-
this.logger.debug("Error setting agentVersion for the peer", {peerId: remotePeerPrettyStr}, err);
764-
}
765-
});
812+
this.trackNewPeer(evt.detail);
766813
};
767814

768815
/**

0 commit comments

Comments
 (0)