Skip to content

Commit 1cde5f1

Browse files
committed
changed hashset to concurrent dictionary on relay peers
1 parent bed41d2 commit 1cde5f1

File tree

2 files changed

+31
-48
lines changed

2 files changed

+31
-48
lines changed

Protobuff/P2P/RelayClient.cs

Lines changed: 27 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class RelayClient
5353
private SecureProtoClient protoClient;
5454
private ConcurrentProtoSerialiser serialiser = new ConcurrentProtoSerialiser();
5555
public Guid sessionId { get; private set; }
56-
public HashSet<Guid> Peers = new HashSet<Guid>();
56+
public ConcurrentDictionary<Guid, bool> Peers = new ConcurrentDictionary<Guid, bool>();
5757

5858
private ConcurrentDictionary<Guid, EncryptedUdpProtoClient> holePunchCandidates = new ConcurrentDictionary<Guid, EncryptedUdpProtoClient>();
5959
private ConcurrentDictionary<Guid, UdpP2PChannels> directUdpClients = new ConcurrentDictionary<Guid, UdpP2PChannels>();
@@ -158,8 +158,8 @@ public async Task<bool> ConnectAsync(string host, int port)
158158
}
159159
public void Disconnect()
160160
{
161-
protoClient.Disconnect();
162-
udpRelayClient.Dispose();
161+
protoClient?.Disconnect();
162+
udpRelayClient?.Dispose();
163163
foreach (var item in directUdpClients)
164164
{
165165
item.Value.ch1.Dispose();
@@ -222,8 +222,7 @@ private async void SendPing(int intervalMs)
222222
pinger.NotifyUdpPingSent(sessionId, msg.TimeStamp);
223223

224224
await Task.Delay(intervalMs/2);
225-
var peerSnapshot = Peers.ToArray();
226-
foreach (var peer in peerSnapshot)
225+
foreach (var peer in Peers.Keys)
227226
{
228227
msg.TimeStamp = DateTime.Now;
229228
SendAsyncMessage(peer, msg);
@@ -288,7 +287,7 @@ public Dictionary<Guid, double> GetUdpPingStatus()
288287
#region Send
289288
public void SendUdpMesssage<T>(Guid toId, T message, string messageHeader = null, int channel = 0) where T : IProtoMessage
290289
{
291-
if (!Peers.Contains(toId) && toId != sessionId)
290+
if (!Peers.TryGetValue(toId, out _) && toId != sessionId)
292291
return;
293292
MessageEnvelope env = new MessageEnvelope();
294293
env.From = sessionId;
@@ -309,7 +308,7 @@ public void SendUdpMesssage<T>(Guid toId, T message, string messageHeader = null
309308

310309
public void SendUdpMesssage(Guid toId, MessageEnvelope message, int channel = 0)
311310
{
312-
if (!Peers.Contains(toId) && toId != sessionId)
311+
if (!Peers.TryGetValue(toId, out _) && toId != sessionId)
313312
return;
314313
message.From = sessionId;
315314
message.To = toId;
@@ -327,7 +326,7 @@ public void SendUdpMesssage(Guid toId, MessageEnvelope message, int channel = 0)
327326
}
328327
public void SendUdpMesssage(Guid toId, byte[] data, int offset, int count, string dataName, int channel = 0)
329328
{
330-
if (!Peers.Contains(toId) && toId != sessionId)
329+
if (!Peers.TryGetValue(toId, out _) && toId != sessionId)
331330
return;
332331

333332
MessageEnvelope env = new MessageEnvelope();
@@ -351,7 +350,7 @@ public void SendUdpMesssage(Guid toId, byte[] data, int offset, int count, strin
351350

352351
public void SendAsyncMessage(Guid toId, MessageEnvelope message)
353352
{
354-
if (!Peers.Contains(toId) && toId != sessionId)
353+
if (!Peers.TryGetValue(toId, out _) && toId != sessionId)
355354
return;
356355

357356
message.From = sessionId;
@@ -360,7 +359,7 @@ public void SendAsyncMessage(Guid toId, MessageEnvelope message)
360359
}
361360
public void SendAsyncMessage<T>(Guid toId, T message, string messageHeader = null) where T : IProtoMessage
362361
{
363-
if (!Peers.Contains(toId) && toId != sessionId)
362+
if (!Peers.TryGetValue(toId, out _) && toId != sessionId)
364363
return;
365364

366365
var envelopedMessage = InternalMessageResources.MakeRelayMessage(sessionId, toId, null);
@@ -369,7 +368,7 @@ public void SendAsyncMessage<T>(Guid toId, T message, string messageHeader = nul
369368
}
370369
public void SendAsyncMessage(Guid toId, byte[] data, string dataName)
371370
{
372-
if (!Peers.Contains(toId) && toId != sessionId)
371+
if (!Peers.TryGetValue(toId, out _) && toId != sessionId)
373372
return;
374373

375374
var envelopedMessage = InternalMessageResources.MakeRelayMessage(sessionId, toId, null);
@@ -379,7 +378,7 @@ public void SendAsyncMessage(Guid toId, byte[] data, string dataName)
379378

380379
public void SendAsyncMessage(Guid toId, byte[] data, int offset, int count, string dataName)
381380
{
382-
if (!Peers.Contains(toId) && toId != sessionId)
381+
if (!Peers.TryGetValue(toId, out _) && toId != sessionId)
383382
return;
384383

385384
var envelopedMessage = InternalMessageResources.MakeRelayMessage(sessionId, toId, null);
@@ -622,55 +621,38 @@ protected virtual void UpdatePeerList(MessageEnvelope message)
622621
{
623622
lock (registeryLocker)
624623
{
625-
HashSet<Guid> serverSet;
626-
PeerList<PeerInfo> pperInfoList = null;
624+
PeerList<PeerInfo> serverPeerInfo = null;
627625
if (message.Payload == null)
628-
serverSet = new HashSet<Guid>();
626+
serverPeerInfo = new PeerList<PeerInfo>() { PeerIds = new Dictionary<Guid, PeerInfo>() };
629627
else
630628
{
631-
pperInfoList = serialiser.Deserialize<PeerList<PeerInfo>>
629+
serverPeerInfo = serialiser.Deserialize<PeerList<PeerInfo>>
632630
(message.Payload, message.PayloadOffset, message.PayloadCount);
633631

634-
if (pperInfoList.PeerIds == null || pperInfoList.PeerIds.Count == 0)
635-
{
636-
serverSet = new HashSet<Guid>();
637-
}
638-
else
639-
{
640-
serverSet = new HashSet<Guid>(pperInfoList.PeerIds.Keys);
641-
}
642632
}
643633

644-
foreach (var peer in Peers)
634+
foreach (var peer in Peers.Keys)
645635
{
646-
if (!serverSet.Contains(peer))
636+
if (!serverPeerInfo.PeerIds.ContainsKey(peer))
647637
{
648-
try
649-
{
650-
OnPeerUnregistered?.Invoke(peer);
651-
pinger.PeerUnregistered(peer);
652-
PeerInfos.TryRemove(peer, out _);
653-
}
654-
catch { }
655-
638+
Peers.TryRemove(peer, out _);
639+
OnPeerUnregistered?.Invoke(peer);
640+
pinger.PeerUnregistered(peer);
641+
PeerInfos.TryRemove(peer, out _);
656642
}
657643
}
658644

659-
foreach (var peer in serverSet)
645+
foreach (var peer in serverPeerInfo.PeerIds.Keys)
660646
{
661-
if (!Peers.Contains(peer))
647+
if(!Peers.TryGetValue(peer, out _))
662648
{
663-
try
664-
{
665-
OnPeerRegistered?.Invoke(peer);
666-
pinger.PeerRegistered(peer);
667-
PeerInfos.TryAdd(peer, pperInfoList.PeerIds[peer]);
668-
}
669-
catch { }
649+
Peers.TryAdd(peer,true);
650+
OnPeerRegistered?.Invoke(peer);
651+
pinger.PeerRegistered(peer);
652+
PeerInfos.TryAdd(peer, serverPeerInfo.PeerIds[peer]);
670653
}
671654
}
672-
Peers.UnionWith(serverSet);
673-
Peers.RemoveWhere(x => !serverSet.Contains(x));
655+
674656
}
675657
}
676658

Tests/ConsoleTest/Program.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ internal class Program
6161
static void Main(string[] args)
6262
{
6363
MiniLogger.AllLog += (log) => Console.WriteLine(log);
64+
6465
////Parallel.For(0, 2, j =>
6566
////{
6667
// for (int i = 0; i < 10000000; i++)
@@ -545,7 +546,7 @@ private static void RelayTest()
545546
return;
546547
Parallel.ForEach(clients, async(client) =>
547548
{
548-
foreach (var peer in client.Peers)
549+
foreach (var peer in client.Peers.Keys)
549550
{
550551
while (true)
551552
{
@@ -566,7 +567,7 @@ private static void RelayTest()
566567
for (int i = 0; i < 10; i++)
567568
{
568569
//return;
569-
foreach (var peer in client.Peers)
570+
foreach (var peer in client.Peers.Keys)
570571
{
571572
//await client.SendRequestAndWaitResponse(peer, testMessage,1000);
572573
client.SendAsyncMessage(peer, testMessage_);
@@ -608,7 +609,7 @@ void ClientUdpReceived(RelayClient client, MessageEnvelope reply)
608609

609610
//client.SendUpMesssage(reply.From, reply);
610611
//reply.Payload = new byte[randomG.Next(500, 32000)];
611-
client.SendUdpMesssage(client.Peers.First(), reply);
612+
client.SendUdpMesssage(client.Peers.Keys.First(), reply);
612613
//Task.Run(() =>
613614
//{
614615
// client.SendUdpMesssage(client.Peers.First(), reply);

0 commit comments

Comments
 (0)