Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 98 additions & 51 deletions packages/beacon-node/src/network/peers/peerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ export class PeerManager {
this.libp2p.services.components.events.addEventListener(Libp2pEvent.connectionClose, this.onLibp2pPeerDisconnect);
this.networkEventBus.on(NetworkEvent.reqRespRequest, this.onRequest);

// Handle peers that were connected before event listener was registered
// This can happen due to libp2p bootstrap module dialing bootnodes
// before PeerManager is initialized
this.handleAlreadyConnectedPeers();

this.lastStatus = this.statusCache.get();

// On start-up will connected to existing peers in libp2p.peerStore, same as autoDial behaviour
Expand Down Expand Up @@ -231,6 +236,97 @@ export class PeerManager {
for (const interval of this.intervals) clearInterval(interval);
}

/**
* Check for peers that were connected before PeerManager started listening
* for connection events. Send STATUS to outbound peers that we haven't
* exchanged status with yet.
*
* This handles the race condition where libp2p bootstrap module connects
* to bootnodes before PeerManager registers its connectionOpen listener.
*/
private handleAlreadyConnectedPeers(): void {
const connections = getConnectionsMap(this.libp2p);

for (const [peerIdStr, conns] of connections) {
// Check if we already have this peer tracked
if (this.connectedPeers.has(peerIdStr)) {
continue;
}

// Find an open connection
const openConn = conns.value.find((c) => c.status === "open");
if (!openConn) {
continue;
}

this.logger.verbose("Found peer connected before PeerManager init", {
peer: prettyPrintPeerId(openConn.remotePeer),
direction: openConn.direction,
});

this.trackNewPeer(openConn);
}
}

/**
* Track a new peer connection and initialize communication.
* Creates PeerData, sends PING/STATUS for outbound connections, and runs identify protocol.
*/
private trackNewPeer(connection: Connection): void {
const {direction, remotePeer} = connection;
const remotePeerStr = remotePeer.toString();
const remotePeerPrettyStr = prettyPrintPeerId(remotePeer);

// On connection:
// - Outbound connections: send a STATUS and PING request
// - Inbound connections: expect to be STATUS'd, schedule STATUS and PING for later
// NOTE: libp2p may emit two "peer:connect" events: One for inbound, one for outbound
// If that happens, it's okay. Only the "outbound" connection triggers immediate action
const now = Date.now();
const nodeId = computeNodeId(remotePeer);
const peerData: PeerData = {
lastReceivedMsgUnixTsMs: direction === "outbound" ? 0 : now,
// If inbound, request after STATUS_INBOUND_GRACE_PERIOD
lastStatusUnixTsMs: direction === "outbound" ? 0 : now - STATUS_INTERVAL_MS + STATUS_INBOUND_GRACE_PERIOD,
connectedUnixTsMs: now,
relevantStatus: RelevantPeerStatus.Unknown,
direction,
nodeId,
peerId: remotePeer,
status: null,
metadata: null,
agentVersion: null,
agentClient: null,
encodingPreference: null,
};
this.connectedPeers.set(remotePeerStr, peerData);

if (direction === "outbound") {
void this.requestPing(remotePeer);
void this.requestStatus(remotePeer, this.statusCache.get());
}

this.libp2p.services.identify
.identify(connection)
.then((result) => {
const agentVersion = result.agentVersion;
if (agentVersion) {
peerData.agentVersion = agentVersion;
peerData.agentClient = getKnownClientFromAgentVersion(agentVersion);
}
})
.catch((err) => {
if (connection.status !== "open") {
this.logger.debug("Peer disconnected during identify protocol", {
peerId: remotePeerPrettyStr,
error: (err as Error).message,
});
} else {
this.logger.debug("Error setting agentVersion for the peer", {peerId: remotePeerPrettyStr}, err);
}
});
}

/**
* Return peers with at least one connection in status "open"
*/
Expand Down Expand Up @@ -698,9 +794,8 @@ export class PeerManager {
* Registers a peer as connected. The `direction` parameter determines if the peer is being
* dialed or connecting to us.
*/
private onLibp2pPeerConnect = async (evt: CustomEvent<Connection>): Promise<void> => {
private onLibp2pPeerConnect = (evt: CustomEvent<Connection>): void => {
const {direction, status, remotePeer} = evt.detail;
const remotePeerStr = remotePeer.toString();
const remotePeerPrettyStr = prettyPrintPeerId(remotePeer);
this.logger.verbose("peer connected", {peer: remotePeerPrettyStr, direction, status});
// NOTE: The peerConnect event is not emitted here here, but after asserting peer relevance
Expand All @@ -714,55 +809,7 @@ export class PeerManager {
return;
}

// On connection:
// - Outbound connections: send a STATUS and PING request
// - Inbound connections: expect to be STATUS'd, schedule STATUS and PING for latter
// NOTE: libp2p may emit two "peer:connect" events: One for inbound, one for outbound
// If that happens, it's okay. Only the "outbound" connection triggers immediate action
const now = Date.now();
const nodeId = computeNodeId(remotePeer);
const peerData: PeerData = {
lastReceivedMsgUnixTsMs: direction === "outbound" ? 0 : now,
// If inbound, request after STATUS_INBOUND_GRACE_PERIOD
lastStatusUnixTsMs: direction === "outbound" ? 0 : now - STATUS_INTERVAL_MS + STATUS_INBOUND_GRACE_PERIOD,
connectedUnixTsMs: now,
relevantStatus: RelevantPeerStatus.Unknown,
direction,
nodeId,
peerId: remotePeer,
status: null,
metadata: null,
agentVersion: null,
agentClient: null,
encodingPreference: null,
};
this.connectedPeers.set(remotePeerStr, peerData);

if (direction === "outbound") {
// this.pingAndStatusTimeouts();
void this.requestPing(remotePeer);
void this.requestStatus(remotePeer, this.statusCache.get());
}

this.libp2p.services.identify
.identify(evt.detail)
.then((result) => {
const agentVersion = result.agentVersion;
if (agentVersion) {
peerData.agentVersion = agentVersion;
peerData.agentClient = getKnownClientFromAgentVersion(agentVersion);
}
})
.catch((err) => {
if (evt.detail.status !== "open") {
this.logger.debug("Peer disconnected during identify protocol", {
peerId: remotePeerPrettyStr,
error: (err as Error).message,
});
} else {
this.logger.debug("Error setting agentVersion for the peer", {peerId: remotePeerPrettyStr}, err);
}
});
this.trackNewPeer(evt.detail);
};

/**
Expand Down
Loading