Skip to content

Commit d6c7dcb

Browse files
committed
Refactor locks and more
1 parent eea0de4 commit d6c7dcb

File tree

16 files changed

+221
-161
lines changed

16 files changed

+221
-161
lines changed

src/libp2p/Libp2p.Core/Exceptions/Libp2pException.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
22
// SPDX-License-Identifier: MIT
33

4+
5+
using System.Diagnostics.CodeAnalysis;
6+
using System.Runtime.CompilerServices;
7+
48
namespace Nethermind.Libp2p.Core.Exceptions;
59

610
public class Libp2pException : Exception
@@ -18,7 +22,20 @@ public class ChannelClosedException() : Libp2pException("Channel closed");
1822
/// Appears when libp2p is not set up properly in part of protocol tack, IoC, etc.
1923
/// </summary>
2024
/// <param name="message"></param>
21-
public class Libp2pSetupException(string? message = null) : Libp2pException(message);
25+
public class Libp2pSetupException(string? message = null) : Libp2pException(message)
26+
{
27+
public static void ThrowIfNull([NotNull] object? argument, [CallerArgumentExpression(nameof(argument))] string? paramName = null)
28+
{
29+
if (argument is null)
30+
{
31+
Throw(paramName);
32+
}
33+
}
34+
35+
[DoesNotReturn]
36+
internal static void Throw(string? paramName) =>
37+
throw new Libp2pSetupException($"{paramName} is not set during libp2p initialization");
38+
}
2239

2340
/// <summary>
2441
/// Appears when there is already active session for the given peer
Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
1-
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
1+
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
22
// SPDX-License-Identifier: MIT
33

44
using Multiformats.Address;
5+
using Multiformats.Address.Net;
56
using Multiformats.Address.Protocols;
7+
using System.Net.Sockets;
68

79
namespace Nethermind.Libp2p.Core;
810

911
public static class MultiaddressExtensions
1012
{
11-
public static PeerId? GetPeerId(this Multiaddress? addr) => addr is not null && addr.Has<P2P>() ? new PeerId(addr.Get<P2P>().ToString()) : default;
13+
public static PeerId? GetPeerId(this Multiaddress? addr)
14+
=> addr is not null && addr.Has<P2P>() ? new PeerId(addr.Get<P2P>().ToString()) : default;
15+
16+
public static Multiaddress GetEndpointPart(this Multiaddress multiaddress)
17+
=> multiaddress.ToEndPoint(out ProtocolType proto).ToMultiaddress(proto);
1218
}

src/libp2p/Libp2p.Protocols.Identify/IdentifyNotifier.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
1+
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
22
// SPDX-License-Identifier: MIT
33

44
using Nethermind.Libp2p.Core;

src/libp2p/Libp2p.Protocols.Identify/IdentifyProtocol.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public IdentifyProtocol(IProtocolStackSettings protocolStackSettings, IdentifyPr
2222
public async Task DialAsync(IChannel channel, ISessionContext context)
2323
{
2424
_logger?.LogInformation("Dial");
25-
await ReadAndVerifyIndentity(channel, context);
25+
await ReadAndVerifyIdentity(channel, context);
2626
}
2727

2828
public async Task ListenAsync(IChannel channel, ISessionContext context)

src/libp2p/Libp2p.Protocols.Identify/IdentifyProtocolBase.cs

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public abstract class IdentifyProtocolBase(IProtocolStackSettings protocolStackS
2020
private readonly IProtocolStackSettings _protocolStackSettings = protocolStackSettings;
2121
private readonly IdentifyProtocolSettings _settings = settings ?? new IdentifyProtocolSettings();
2222

23-
protected async Task ReadAndVerifyIndentity(IChannel channel, ISessionContext context)
23+
protected async Task ReadAndVerifyIdentity(IChannel channel, ISessionContext context)
2424
{
2525
ArgumentNullException.ThrowIfNull(context.State.RemotePublicKey);
2626
ArgumentNullException.ThrowIfNull(context.State.RemotePeerId);
@@ -34,11 +34,28 @@ protected async Task ReadAndVerifyIndentity(IChannel channel, ISessionContext co
3434
throw new PeerConnectionException("Malformed peer identity: the remote public key corresponds to a different peer id");
3535
}
3636

37-
ulong seq = 0;
37+
VerifySignedPeerRecordOrThrow(identify.SignedPeerRecord, context.State.RemotePublicKey, context.State.RemotePeerId, out ulong seq);
3838

39-
if (identify.SignedPeerRecord is not null)
39+
if (_peerStore is not null)
40+
{
41+
PeerStore.PeerInfo peerInfo = _peerStore.GetPeerInfo(context.State.RemotePeerId);
42+
43+
if (identify.SignedPeerRecord is not null && peerInfo.Seq >= seq)
44+
{
45+
// do nothing if seq is for an older record
46+
return;
47+
}
48+
peerInfo.SupportedProtocols = identify.Protocols.ToArray();
49+
peerInfo.SignedPeerRecord = identify.SignedPeerRecord;
50+
peerInfo.Seq = seq;
51+
}
52+
}
53+
54+
private void VerifySignedPeerRecordOrThrow(ByteString? signedPeerRecordBytes, PublicKey remotePublicKey, PeerId remotePeerId, out ulong seq)
55+
{
56+
if (signedPeerRecordBytes is not null)
4057
{
41-
if (!SigningHelper.VerifyPeerRecord(identify.SignedPeerRecord, context.State.RemotePublicKey, out seq))
58+
if (!SigningHelper.VerifyPeerRecord(signedPeerRecordBytes, remotePublicKey, out seq))
4259
{
4360
if (_settings?.PeerRecordsVerificationPolicy == PeerRecordsVerificationPolicy.RequireCorrect)
4461
{
@@ -51,43 +68,38 @@ protected async Task ReadAndVerifyIndentity(IChannel channel, ISessionContext co
5168
}
5269
else
5370
{
54-
_logger?.LogDebug("Confirmed peer record: {peerId}", context.State.RemotePeerId);
71+
_logger?.LogDebug("Confirmed peer record: {peerId}", remotePeerId);
5572
}
5673
}
5774
else if (_settings.PeerRecordsVerificationPolicy != PeerRecordsVerificationPolicy.DoesNotRequire)
5875
{
5976
throw new PeerConnectionException("Malformed peer identity: there is no peer record which is required");
6077
}
6178

62-
if (_peerStore is not null)
63-
{
64-
PeerStore.PeerInfo peerInfo = _peerStore.GetPeerInfo(context.State.RemotePeerId);
65-
66-
if (identify.SignedPeerRecord is not null && peerInfo.Seq >= seq)
67-
{
68-
// do nothing if seq is for an older record
69-
return;
70-
}
71-
peerInfo.SupportedProtocols = identify.Protocols.ToArray();
72-
peerInfo.SignedPeerRecord = identify.SignedPeerRecord;
73-
peerInfo.Seq = seq;
74-
}
79+
seq = 0;
7580
}
7681

7782
protected async Task SendIdentity(IChannel channel, ISessionContext context, ulong idVersion = 1)
7883
{
84+
ArgumentNullException.ThrowIfNull(context.State.RemoteAddress);
85+
Libp2pSetupException.ThrowIfNull(_protocolStackSettings.Protocols);
86+
7987
Identify.Dto.Identify identify = new()
8088
{
8189
ProtocolVersion = _settings.ProtocolVersion,
8290
AgentVersion = _settings.AgentVersion,
8391
PublicKey = context.Peer.Identity.PublicKey.ToByteString(),
8492
ListenAddrs = { },
85-
ObservedAddr = ByteString.CopyFrom(context.State.RemoteAddress!.ToEndPoint(out ProtocolType proto).ToMultiaddress(proto).ToBytes()),
86-
Protocols = { _protocolStackSettings.Protocols!.Select(r => r.Key.Protocol).OfType<ISessionListenerProtocol>().Select(p => p.Id) },
93+
ObservedAddr = ByteString.CopyFrom(context.State.RemoteAddress.GetEndpointPart().ToBytes()),
94+
Protocols = { _protocolStackSettings.Protocols.Select(r => r.Key.Protocol).OfType<ISessionListenerProtocol>().Select(p => p.Id) },
8795
SignedPeerRecord = SigningHelper.CreateSignedEnvelope(context.Peer.Identity, [.. context.Peer.ListenAddresses], idVersion),
8896
};
8997

90-
ByteString[] endpoints = context.Peer.ListenAddresses.Where(a => !a.ToEndPoint().Address.IsPrivate()).Select(a => a.ToEndPoint(out ProtocolType proto).ToMultiaddress(proto)).Select(a => ByteString.CopyFrom(a.ToBytes())).ToArray();
98+
ByteString[] endpoints = context.Peer.ListenAddresses
99+
.Where(a => !a.ToEndPoint().Address.IsPrivate())
100+
.Select(a => a.ToEndPoint(out ProtocolType proto).ToMultiaddress(proto))
101+
.Select(a => ByteString.CopyFrom(a.ToBytes())).ToArray();
102+
91103
identify.ListenAddrs.AddRange(endpoints);
92104

93105
await channel.WriteSizeAndProtobufAsync(identify);

src/libp2p/Libp2p.Protocols.Identify/IdentifyPushProtocol.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
1+
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
22
// SPDX-License-Identifier: MIT
33

44
using Microsoft.Extensions.Logging;
@@ -25,6 +25,6 @@ public async Task<ulong> DialAsync(IChannel channel, ISessionContext context, ul
2525
public async Task ListenAsync(IChannel channel, ISessionContext context)
2626
{
2727
_logger?.LogDebug("Receiving identity update");
28-
await ReadAndVerifyIndentity(channel, context);
28+
await ReadAndVerifyIdentity(channel, context);
2929
}
3030
}

src/libp2p/Libp2p.Protocols.IpTcp/IpTcpProtocol.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,10 @@ public async Task DialAsync(ITransportContext context, Multiaddress remoteAddr,
190190
_ = upChannel.CloseAsync();
191191
_logger?.LogDebug("Ctx{0}: end sending, socket exception {1}", connectionCtx.Id, e.Message);
192192
}
193-
194-
client.Close();
193+
finally
194+
{
195+
client.Close();
196+
}
195197
});
196198

197199
await Task.WhenAll(receiveTask, sendTask).ContinueWith(t => connectionCtx.Dispose());

src/libp2p/Libp2p.Protocols.MDns/MDnsDiscoveryProtocol.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public Task StartDiscoveryAsync(IReadOnlyList<Multiaddress> localPeerAddrs, Canc
6464
}
6565
}
6666

67-
_logger?.LogTrace("Records {0}", string.Join(",", service.Resources));
67+
_logger?.LogTrace("DNS records to share: {0}", string.Join(",", service.Resources));
6868
_logger?.LogInformation("Started as {0} {1}", PeerName, ServiceNameOverride ?? ServiceName);
6969

7070
sd.ServiceDiscovered += (s, serviceName) =>

src/libp2p/Libp2p.Protocols.Pubsub/PubsubRouter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ internal enum ConnectionInitiation
492492
Remote,
493493
}
494494

495-
readonly struct MessageWithId(MessageId id, Message message)
495+
internal readonly struct MessageWithId(MessageId id, Message message)
496496
{
497497
public MessageId Id { get; } = id;
498498
public Message Message { get; } = message;

src/libp2p/Libp2p.Protocols.Yamux.Tests/DataWindowTests.cs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,32 @@ public class DataWindowTests
99
[Test]
1010
public async Task Test_WindowExtendsProperly_WhenChannelWaitsForUpdate()
1111
{
12-
int windowSize = 100;
13-
var w = new DataWindow(windowSize);
14-
Task spendingTask = Task.Run(async () =>
12+
const int WindowSize = 100;
13+
14+
for (int bytesToSendCase = 0; bytesToSendCase < 500; bytesToSendCase++)
1515
{
16-
int bytesToSend = windowSize + 1;
17-
while (bytesToSend != 0)
16+
int bytesToSend = bytesToSendCase;
17+
int windowUdatesNeeded = (bytesToSend - 1) / WindowSize;
18+
int finalAvailable = WindowSize - bytesToSend + WindowSize * windowUdatesNeeded;
19+
20+
var w = new RemoteDataWindow(WindowSize);
21+
Task spendingTask = Task.Run(async () =>
22+
{
23+
await Task.Delay(bytesToSend % 3);
24+
while (bytesToSend != 0)
25+
{
26+
bytesToSend -= await w.SpendOrWait(bytesToSend);
27+
}
28+
});
29+
30+
await Task.Delay(bytesToSend % 5);
31+
for (int i = 0; i < windowUdatesNeeded; i++)
1832
{
19-
bytesToSend -= await w.SpendWindowOrWait(bytesToSend);
33+
int val = w.Extend(WindowSize);
2034
}
21-
});
22-
await Task.Delay(10);
23-
int val = w.ExtendWindowIfNeeded();
2435

25-
await spendingTask;
26-
Assert.That(val, Is.EqualTo(windowSize));
27-
Assert.That(w.Available, Is.EqualTo(windowSize - 1));
36+
await spendingTask;
37+
Assert.That(w.Available, Is.EqualTo(finalAvailable));
38+
}
2839
}
2940
}

0 commit comments

Comments
 (0)