Skip to content

Commit a49254b

Browse files
committed
fix: fix SnapSync regression from network layer refactor (#10753)
Fix four issues introduced in #10753 that caused SnapSync to regress from 25% in 5 minutes to 1% in 15 minutes on Hoodi: 1. Session.Handshake: only disconnect on NodeId mismatch for static/bootnode peers (operator-verified identities). Discovered peers accept the new identity as before — stale discovery data is common and benign. Also fix HandshakeComplete firing after InitiateDisconnect on doomed sessions. 2. PeerManager: stop EnsureAvailableActivePeerSlotAsync from consuming SemaphoreSlim signals meant for the main peer update loop. Use Task.Delay for polling instead of WaitAsync on the shared semaphore. 3. PeerManager: reduce _sessionLock scope in OnDisconnected and OnHandshakeComplete to only guard session bookkeeping. Peer processing runs outside the lock to avoid serializing all session events. 4. PeerManager: restore dedicated thread for peer update loop via Task.Factory.StartNew(LongRunning) with .Unwrap() — fixes the original Task<Task> bug while keeping the loop off the thread pool.
1 parent d6b387d commit a49254b

File tree

3 files changed

+108
-78
lines changed

3 files changed

+108
-78
lines changed

src/Nethermind/Nethermind.Network.Test/P2P/SessionTests.cs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -427,10 +427,37 @@ public void On_incoming_sessions_can_fill_remote_id_on_handshake()
427427
}
428428

429429
[Test]
430-
public void Outgoing_handshake_with_unexpected_identity_is_rejected()
430+
public void Outgoing_handshake_with_unexpected_identity_updates_node_id_for_discovered_peer()
431431
{
432-
DisconnectReason? disconnectReason = null;
433432
Session session = new(30312, new Node(TestItem.PublicKeyA, "127.0.0.1", 8545), _channel, NullDisconnectsAnalyzer.Instance, LimboLogs.Instance);
433+
434+
session.Handshake(TestItem.PublicKeyB);
435+
436+
Assert.That(session.ObsoleteRemoteNodeId, Is.EqualTo(TestItem.PublicKeyA));
437+
Assert.That(session.RemoteNodeId, Is.EqualTo(TestItem.PublicKeyB));
438+
}
439+
440+
[Test]
441+
public void Outgoing_handshake_with_unexpected_identity_is_rejected_for_static_peer()
442+
{
443+
DisconnectReason? disconnectReason = null;
444+
Node node = new(TestItem.PublicKeyA, "127.0.0.1", 8545) { IsStatic = true };
445+
Session session = new(30312, node, _channel, NullDisconnectsAnalyzer.Instance, LimboLogs.Instance);
446+
session.Disconnecting += (_, e) => disconnectReason = e.DisconnectReason;
447+
448+
session.Handshake(TestItem.PublicKeyB);
449+
450+
Assert.That(session.RemoteNodeId, Is.EqualTo(TestItem.PublicKeyA));
451+
session.Init(5, _channelHandlerContext, _packetSender);
452+
Assert.That(disconnectReason, Is.EqualTo(DisconnectReason.UnexpectedIdentity));
453+
}
454+
455+
[Test]
456+
public void Outgoing_handshake_with_unexpected_identity_is_rejected_for_bootnode()
457+
{
458+
DisconnectReason? disconnectReason = null;
459+
Node node = new(TestItem.PublicKeyA, "127.0.0.1", 8545) { IsBootnode = true };
460+
Session session = new(30312, node, _channel, NullDisconnectsAnalyzer.Instance, LimboLogs.Instance);
434461
session.Disconnecting += (_, e) => disconnectReason = e.DisconnectReason;
435462

436463
session.Handshake(TestItem.PublicKeyB);

src/Nethermind/Nethermind.Network/P2P/Session.cs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -336,18 +336,19 @@ public void Handshake(PublicKey? handshakeRemoteNodeId)
336336
}
337337
else if (handshakeRemoteNodeId is not null && RemoteNodeId != handshakeRemoteNodeId)
338338
{
339-
if (Direction == ConnectionDirection.Out)
339+
// Static/bootnode peers have operator-verified identities — a mismatch is suspicious.
340+
if (Direction == ConnectionDirection.Out && (Node?.IsStatic == true || Node?.IsBootnode == true))
340341
{
341342
if (_logger.IsDebug) DebugUnexpectedNodeId(handshakeRemoteNodeId);
342343
InitiateDisconnect(DisconnectReason.UnexpectedIdentity, $"expected {RemoteNodeId}, received {handshakeRemoteNodeId}");
344+
return;
343345
}
344-
else
345-
{
346-
if (_logger.IsTrace) TraceDifferentNodeId(handshakeRemoteNodeId);
347-
ObsoleteRemoteNodeId = RemoteNodeId;
348-
RemoteNodeId = handshakeRemoteNodeId;
349-
Node = new Node(RemoteNodeId, RemoteHost, RemotePort);
350-
}
346+
347+
// Discovered peers frequently have stale identity data — accept the new identity.
348+
if (_logger.IsTrace) TraceDifferentNodeId(handshakeRemoteNodeId);
349+
ObsoleteRemoteNodeId = RemoteNodeId;
350+
RemoteNodeId = handshakeRemoteNodeId;
351+
Node = new Node(RemoteNodeId, RemoteHost, RemotePort);
351352
}
352353

353354
Metrics.Handshakes++;
@@ -358,7 +359,7 @@ public void Handshake(PublicKey? handshakeRemoteNodeId)
358359
void TraceHandshakeCalled() => _logger.Trace($"{nameof(Handshake)} called on {this}");
359360

360361
[MethodImpl(MethodImplOptions.NoInlining)]
361-
void DebugUnexpectedNodeId(PublicKey remoteNodeId) => _logger.Debug($"Unexpected remote node id in handshake: expected {RemoteNodeId}, received {remoteNodeId}");
362+
void DebugUnexpectedNodeId(PublicKey remoteNodeId) => _logger.Debug($"Unexpected remote node id in handshake with static/bootnode peer: expected {RemoteNodeId}, received {remoteNodeId}");
362363

363364
[MethodImpl(MethodImplOptions.NoInlining)]
364365
void TraceDifferentNodeId(PublicKey remoteNodeId) => _logger.Trace($"Different NodeId received in handshake: old: {RemoteNodeId}, new: {remoteNodeId}");

src/Nethermind/Nethermind.Network/PeerManager.cs

Lines changed: 69 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -191,22 +191,23 @@ public void Start()
191191
SignalPeerUpdateNeeded();
192192
}
193193

194-
private async Task RunPeerUpdateLoopAsync()
194+
private Task RunPeerUpdateLoopAsync()
195195
{
196-
await Task.Yield();
197-
198-
try
199-
{
200-
await RunPeerUpdateLoop();
201-
}
202-
catch (Exception e) when (e is not OperationCanceledException)
196+
return Task.Factory.StartNew(async () =>
203197
{
204-
if (_logger.IsError) _logger.Error("Peer update loop encountered an exception.", e);
205-
}
206-
catch (OperationCanceledException)
207-
{
208-
if (_logger.IsDebug) _logger.Debug("Peer update loop stopped.");
209-
}
198+
try
199+
{
200+
await RunPeerUpdateLoop();
201+
}
202+
catch (Exception e) when (e is not OperationCanceledException)
203+
{
204+
if (_logger.IsError) _logger.Error("Peer update loop encountered an exception.", e);
205+
}
206+
catch (OperationCanceledException)
207+
{
208+
if (_logger.IsDebug) _logger.Debug("Peer update loop stopped.");
209+
}
210+
}, _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();
210211
}
211212

212213
public async Task StopAsync()
@@ -507,8 +508,9 @@ private async Task<bool> EnsureAvailableActivePeerSlotAsync()
507508
TimeSpan.FromMilliseconds(_networkConfig.ConnectTimeoutMs);
508509
while (DateTimeOffset.UtcNow < deadline && (AvailableActivePeersCount - _pending) <= 0)
509510
{
510-
// Wait for a signal or poll every 100ms.
511-
await _peerUpdateRequested.WaitAsync(TimeSpan.FromMilliseconds(100), _cancellationTokenSource.Token);
511+
// Poll every 100ms. Do not consume _peerUpdateRequested here — consuming the
512+
// semaphore signal would starve the main peer update loop (WaitForPeerUpdateRequestAsync).
513+
await Task.Delay(TimeSpan.FromMilliseconds(100), _cancellationTokenSource.Token);
512514
}
513515

514516
return AvailableActivePeersCount - _pending > 0;
@@ -1154,42 +1156,42 @@ private void OnDisconnected(object sender, DisconnectEventArgs e)
11541156
{
11551157
return;
11561158
}
1159+
}
11571160

1158-
if (_logger.IsTrace) TraceSessionLifecycle(session, SessionLifecycleTraceEvent.Closing);
1161+
if (_logger.IsTrace) TraceSessionLifecycle(session, SessionLifecycleTraceEvent.Closing);
11591162

1160-
if (session.State != SessionState.Disconnected)
1161-
{
1162-
ThrowInvalidOnDisconnectedState(session);
1163-
}
1163+
if (session.State != SessionState.Disconnected)
1164+
{
1165+
ThrowInvalidOnDisconnectedState(session);
1166+
}
11641167

1165-
if (_logger.IsTrace) TracePeerDisconnected();
1168+
if (_logger.IsTrace) TracePeerDisconnected();
11661169

1167-
if (session.RemoteNodeId is null)
1168-
{
1169-
// this happens when we have a disconnect on incoming connection before handshake
1170-
if (_logger.IsTrace) TraceDisconnectWithoutRemoteNodeId();
1171-
return;
1172-
}
1170+
if (session.RemoteNodeId is null)
1171+
{
1172+
// this happens when we have a disconnect on incoming connection before handshake
1173+
if (_logger.IsTrace) TraceDisconnectWithoutRemoteNodeId();
1174+
return;
1175+
}
11731176

1174-
Peer peer = _peerPool.GetOrAdd(session.Node);
1175-
if (session.Direction == ConnectionDirection.Out)
1176-
{
1177-
peer.IsAwaitingConnection = false;
1178-
}
1177+
Peer peer = _peerPool.GetOrAdd(session.Node);
1178+
if (session.Direction == ConnectionDirection.Out)
1179+
{
1180+
peer.IsAwaitingConnection = false;
1181+
}
11791182

1180-
if (_peerPool.ActivePeers.TryGetValue(session.RemoteNodeId, out Peer activePeer))
1183+
if (_peerPool.ActivePeers.TryGetValue(session.RemoteNodeId, out Peer activePeer))
1184+
{
1185+
//we want to update reputation always
1186+
_stats.ReportDisconnect(session.Node, e.DisconnectType, e.DisconnectReason);
1187+
if (activePeer.InSession?.SessionId != session.SessionId && activePeer.OutSession?.SessionId != session.SessionId)
11811188
{
1182-
//we want to update reputation always
1183-
_stats.ReportDisconnect(session.Node, e.DisconnectType, e.DisconnectReason);
1184-
if (activePeer.InSession?.SessionId != session.SessionId && activePeer.OutSession?.SessionId != session.SessionId)
1185-
{
1186-
if (_logger.IsTrace) TraceIgnoringDifferentSessionDisconnect(activePeer.Node.Id);
1187-
return;
1188-
}
1189-
1190-
DeactivatePeerIfDisconnected(activePeer, "session disconnected");
1191-
SignalPeerUpdateNeeded();
1189+
if (_logger.IsTrace) TraceIgnoringDifferentSessionDisconnect(activePeer.Node.Id);
1190+
return;
11921191
}
1192+
1193+
DeactivatePeerIfDisconnected(activePeer, "session disconnected");
1194+
SignalPeerUpdateNeeded();
11931195
}
11941196

11951197
[MethodImpl(MethodImplOptions.NoInlining)]
@@ -1228,38 +1230,38 @@ private void OnHandshakeComplete(object sender, EventArgs args)
12281230
{
12291231
return;
12301232
}
1233+
}
12311234

1232-
_stats.GetOrAdd(session.Node);
1235+
_stats.GetOrAdd(session.Node);
12331236

1234-
//In case of OUT connections and different RemoteNodeId we need to replace existing Active Peer with new peer
1235-
ManageNewRemoteNodeId(session);
1237+
//In case of OUT connections and different RemoteNodeId we need to replace existing Active Peer with new peer
1238+
ManageNewRemoteNodeId(session);
12361239

1237-
if (_logger.IsTrace) TraceSessionLifecycle(session, SessionLifecycleTraceEvent.HandshakeCompleted);
1240+
if (_logger.IsTrace) TraceSessionLifecycle(session, SessionLifecycleTraceEvent.HandshakeCompleted);
12381241

1239-
//This is the first moment we get confirmed publicKey of remote node in case of incoming connections
1240-
if (session.Direction == ConnectionDirection.In)
1242+
//This is the first moment we get confirmed publicKey of remote node in case of incoming connections
1243+
if (session.Direction == ConnectionDirection.In)
1244+
{
1245+
// For incoming connection, this is the entry point.
1246+
ProcessIncomingConnection(session);
1247+
}
1248+
else
1249+
{
1250+
if (!_peerPool.ActivePeers.TryGetValue(session.RemoteNodeId, out Peer peer))
12411251
{
1242-
// For incoming connection, this is the entry point.
1243-
ProcessIncomingConnection(session);
1252+
//Can happen when peer sent Disconnect message before handshake is done, it takes us a while to disconnect
1253+
if (_logger.IsTrace) TraceHandshakeWithoutActivePeer();
1254+
return;
12441255
}
1245-
else
1246-
{
1247-
if (!_peerPool.ActivePeers.TryGetValue(session.RemoteNodeId, out Peer peer))
1248-
{
1249-
//Can happen when peer sent Disconnect message before handshake is done, it takes us a while to disconnect
1250-
if (_logger.IsTrace) TraceHandshakeWithoutActivePeer();
1251-
return;
1252-
}
12531256

1254-
peer.Stats.AddNodeStatsHandshakeEvent(ConnectionDirection.Out);
1255-
}
1257+
peer.Stats.AddNodeStatsHandshakeEvent(ConnectionDirection.Out);
1258+
}
12561259

1257-
if (_logger.IsTrace) TraceSessionLifecycle(session, SessionLifecycleTraceEvent.HandshakeInitialized);
1260+
if (_logger.IsTrace) TraceSessionLifecycle(session, SessionLifecycleTraceEvent.HandshakeInitialized);
12581261

1259-
[MethodImpl(MethodImplOptions.NoInlining)]
1260-
void TraceHandshakeWithoutActivePeer()
1261-
=> _logger.Trace($"Initiated handshake (OUT) with a peer without adding it to the Active collection : {session}");
1262-
}
1262+
[MethodImpl(MethodImplOptions.NoInlining)]
1263+
void TraceHandshakeWithoutActivePeer()
1264+
=> _logger.Trace($"Initiated handshake (OUT) with a peer without adding it to the Active collection : {session}");
12631265
}
12641266

12651267
private void ManageNewRemoteNodeId(ISession session)

0 commit comments

Comments
 (0)