Skip to content

Commit 46d810b

Browse files
committed
Bug fix on peer reconnect. Udp port option added
1 parent 706c35f commit 46d810b

File tree

10 files changed

+117
-48
lines changed

10 files changed

+117
-48
lines changed

NetworkLibrary/MessageProtocol/Serialization/PrimitiveEncoder.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,16 @@ public static void WriteFixedUint16(PooledMemoryStream stream, ushort value)
143143
stream.WriteUshort(value);
144144
}
145145

146+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
147+
public unsafe static void WriteFixedUint16(byte[]buffer, int offset, ushort value)
148+
{
149+
if(buffer.Length-offset < 2)
150+
throw new InvalidDataException("Buffer does not have enough space");
151+
152+
fixed (byte* b = &buffer[offset])
153+
*(short*)b = (short)value;
154+
}
155+
146156
[MethodImpl(MethodImplOptions.AggressiveInlining)]
147157
public static ushort ReadUInt16(PooledMemoryStream stream)
148158
{

NetworkLibrary/P2P/Components/Modules/ReliableUdp.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void Send(in Segment first, in Segment second)
101101
sender.ProcessBytesToSend(first, second);
102102
}
103103

104-
internal void Close()
104+
internal void Release()
105105
{
106106
OnReceived = null;
107107
OnSend = null;

NetworkLibrary/P2P/Generic/RelayClientBase.cs

Lines changed: 70 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public PeerInformation() { }
5454
public RemoteCertificateValidationCallback RemoteCertificateValidationCallback;
5555

5656
[Obsolete("Use SessionId instead")]
57-
public Guid sessionId { get; private set; }
57+
private Guid sessionId;
5858
public Guid SessionId => sessionId;
5959

6060
public bool IsConnected { get => isConnected; private set => isConnected = value; }
@@ -80,15 +80,15 @@ public PeerInformation() { }
8080
private ConcurrentDictionary<Guid, IPEndPoint> punchedEndpoints = new ConcurrentDictionary<Guid, IPEndPoint>();
8181
private ConcurrentDictionary<IPEndPoint, ConcurrentAesAlgorithm> peerCryptos = new ConcurrentDictionary<IPEndPoint, ConcurrentAesAlgorithm>();
8282
private ClientStateManager<S> clientStateManager;
83-
public RelayClientBase(X509Certificate2 clientCert)
83+
public RelayClientBase(X509Certificate2 clientCert, int udpPort = 0)
8484
{
8585
clientStateManager = new ClientStateManager<S>(this);
8686
tcpMessageClient = new SecureMessageClient<S>(clientCert);
8787
tcpMessageClient.OnMessageReceived += HandleMessageReceived;
8888
tcpMessageClient.OnDisconnected += HandleDisconnect;
8989
tcpMessageClient.RemoteCertificateValidationCallback += CertificateValidation;
9090

91-
udpServer = new ClientUdpModule<S>(0);
91+
udpServer = new ClientUdpModule<S>(udpPort);
9292
udpServer.SocketReceiveBufferSize = 12800000;
9393
udpServer.SocketSendBufferSize = 12800000;
9494
udpServer.OnBytesRecieved += HandleUdpBytesReceived;
@@ -207,6 +207,19 @@ private void HandleDisconnect()
207207

208208
OnDisconnected?.Invoke();
209209
IsConnected = false;
210+
foreach (var item in RUdpModules)
211+
{
212+
foreach (var m in item.Value)
213+
{
214+
m?.Release();
215+
}
216+
}
217+
RUdpModules.Clear();
218+
foreach (var item in JumboUdpModules)
219+
{
220+
item.Value?.Release();
221+
}
222+
JumboUdpModules.Clear();
210223
}
211224

212225
}
@@ -400,6 +413,32 @@ private void SendUdpMesssageInternal<T>(Guid toId, MessageEnvelope message, T in
400413

401414
}
402415

416+
private void SendUdpMesssageInternal(Guid toId, MessageEnvelope message, Action<PooledMemoryStream> serializationCallback)
417+
{
418+
ConcurrentAesAlgorithm algo;
419+
420+
if (punchedEndpoints.TryGetValue(toId, out var endpoint))
421+
{
422+
algo = peerCryptos[endpoint];
423+
}
424+
else
425+
{
426+
endpoint = relayServerEndpoint;
427+
algo = udpEncriptor;
428+
429+
}
430+
431+
if (!udpServer.TrySendAsync(endpoint, message, serializationCallback, algo, out var excessStream))
432+
{
433+
if (JumboUdpModules.TryGetValue(toId, out var mod))
434+
mod.Send(excessStream.GetBuffer(), 0, excessStream.Position32);
435+
else
436+
MiniLogger.Log(MiniLogger.LogLevel.Error, "Unable To find jumbo module with Id: " + toId + " in session " + sessionId);
437+
438+
}
439+
440+
}
441+
403442

404443

405444
public void SendUdpMesssage(Guid toId, MessageEnvelope message)
@@ -413,6 +452,17 @@ public void SendUdpMesssage(Guid toId, MessageEnvelope message)
413452

414453
}
415454

455+
public void SendUdpMesssage(Guid toId, MessageEnvelope message, Action<PooledMemoryStream> serializationCallback)
456+
{
457+
if (!Peers.TryGetValue(toId, out _) && toId != sessionId)
458+
return;
459+
message.From = sessionId;
460+
message.To = toId;
461+
462+
SendUdpMesssageInternal(toId, message,serializationCallback);
463+
464+
}
465+
416466
public void SendUdpMesssage<T>(Guid toId, MessageEnvelope message, T innerMessage)
417467
{
418468
if (!Peers.TryGetValue(toId, out _) && toId != sessionId)
@@ -628,6 +678,8 @@ public void SendAsyncMessage(Guid toId, MessageEnvelope message)
628678
message.To = toId;
629679
tcpMessageClient.SendAsyncMessage(message);
630680
}
681+
682+
631683
public void SendAsyncMessage<T>(Guid toId, MessageEnvelope envelope, T message)
632684
{
633685
if (!Peers.TryGetValue(toId, out _) && toId != sessionId)
@@ -864,9 +916,9 @@ private void HandleMessageReceived(MessageEnvelope message)
864916
}
865917
else
866918
{
867-
if (UdpAwaiter.IsWaiting(message.MessageId))
919+
if (Awaiter.IsWaiting(message.MessageId))
868920
{
869-
UdpAwaiter.ResponseArrived(message);
921+
Awaiter.ResponseArrived(message);
870922
}
871923
switch (message.Header)
872924
{
@@ -1055,6 +1107,13 @@ protected internal void HandleUnRegistered(Guid peerId)
10551107
RemoveJudpModule(peerId);
10561108
pinger.PeerUnregistered(peerId);
10571109
PeerInfos.TryRemove(peerId, out _);
1110+
if(RUdpModules.TryRemove(peerId, out var m))
1111+
foreach (var item in m)
1112+
{
1113+
item?.Release();
1114+
}
1115+
if(JumboUdpModules.TryRemove(peerId, out var m1))
1116+
m1.Release();
10581117
}
10591118
#endregion
10601119

@@ -1136,7 +1195,7 @@ private void RemoveRudpModule(Guid peer)
11361195
{
11371196
foreach (var item in mod)
11381197
{
1139-
item.Close();
1198+
item.Release();
11401199
}
11411200
}
11421201
}
@@ -1182,9 +1241,9 @@ private void HandleRUdpBytesReceived(byte[] arg1, int arg2, int arg3)
11821241
{
11831242
var msg = serialiser.DeserialiseEnvelopedMessage(arg1, arg2, arg3);
11841243

1185-
if (UdpAwaiter.IsWaiting(msg.MessageId))
1244+
if (Awaiter.IsWaiting(msg.MessageId))
11861245
{
1187-
UdpAwaiter.ResponseArrived(msg);
1246+
Awaiter.ResponseArrived(msg);
11881247
}
11891248
else
11901249
HandleUdpMessageReceived(msg);
@@ -1256,15 +1315,15 @@ public void SendRudpMessage<T>(Guid to, MessageEnvelope msg, T innerMessage, Rud
12561315
SharerdMemoryStreamPool.ReturnStreamStatic(stream);
12571316
}
12581317
}
1259-
GenericMessageAwaiter<MessageEnvelope> UdpAwaiter => tcpMessageClient.Awaiter;//= new GenericMessageAwaiter<MessageEnvelope>();
1318+
GenericMessageAwaiter<MessageEnvelope> Awaiter => tcpMessageClient.Awaiter;//= new GenericMessageAwaiter<MessageEnvelope>();
12601319
public Task<MessageEnvelope> SendRudpMessageAndWaitResponse(Guid to, MessageEnvelope msg, int timeoutMs = 10000, RudpChannel channel = RudpChannel.Ch1)
12611320
{
12621321
if (RUdpModules.TryGetValue(to, out var mod))
12631322
{
12641323
msg.From = sessionId;
12651324
msg.To = to;
12661325
msg.MessageId = Guid.NewGuid();
1267-
var task = UdpAwaiter.RegisterWait(msg.MessageId, timeoutMs);
1326+
var task = Awaiter.RegisterWait(msg.MessageId, timeoutMs);
12681327

12691328
var stream = SharerdMemoryStreamPool.RentStreamStatic();
12701329
serialiser.EnvelopeMessageWithBytesDontWritePayload(stream, msg, msg.PayloadCount);
@@ -1291,7 +1350,7 @@ public Task<MessageEnvelope> SendRudpMessageAndWaitResponse<T>(Guid to, MessageE
12911350
msg.From = sessionId;
12921351
msg.To = to;
12931352
msg.MessageId = Guid.NewGuid();
1294-
var task = UdpAwaiter.RegisterWait(msg.MessageId, timeoutMs);
1353+
var task = Awaiter.RegisterWait(msg.MessageId, timeoutMs);
12951354

12961355
var stream = SharerdMemoryStreamPool.RentStreamStatic();
12971356
serialiser.EnvelopeMessageWithInnerMessage(stream, msg, innerMessage);

NetworkLibrary/UDP/AsyncUdpServerLitecs.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,16 @@ public AsyncUdpServerLite(int port)
5555
ServerSocket.SendBufferSize = SocketSendBufferSize;
5656

5757
serverEndpoint = new IPEndPoint(IPAddress.Any, port);
58-
ServerSocket.Bind(serverEndpoint);
58+
try
59+
{
60+
ServerSocket.Bind(serverEndpoint);
61+
}
62+
catch
63+
{
64+
MiniLogger.Log(MiniLogger.LogLevel.Warning, $"Unable to bind udp port {port} choosing default");
65+
serverEndpoint = new IPEndPoint(IPAddress.Any, 0);
66+
ServerSocket.Bind(serverEndpoint);
67+
}
5968
ServerSocket.Blocking = false;
6069
this.port = port;
6170
}

NetworkLibrary/UDP/Jumbo/JumboModule.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public JumboModule()
1414
{
1515
this.receiver = new Receiver();
1616
this.sender = new Sender();
17-
sender.OnSend = SendBytestoSocket;
17+
sender.OnSend = SendBytesToSocket;
1818
receiver.OnMessageExtracted = HandleExtractedMessage;
1919
}
2020

@@ -23,7 +23,7 @@ private void HandleExtractedMessage(byte[] arg1, int arg2, int arg3)
2323
MessageReceived?.Invoke(arg1, arg2, arg3);
2424
}
2525

26-
private void SendBytestoSocket(byte[] arg1, int arg2, int arg3)
26+
private void SendBytesToSocket(byte[] arg1, int arg2, int arg3)
2727
{
2828
SendToSocket?.Invoke(arg1, arg2, arg3);
2929
}
@@ -40,6 +40,10 @@ public void HandleReceivedSegment(byte[] buffer, int offset, int count)
4040

4141
public void Release()
4242
{
43+
SendToSocket = null;
44+
MessageReceived = null;
45+
sender.Release();
46+
receiver.Release();
4347
}
4448

4549
internal void Send(in Segment s1, in Segment s2)

NetworkLibrary/UDP/Jumbo/Receiver.cs

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,14 @@ internal class Receiver
9090

9191
private void OnTick(object state)
9292
{
93+
9394
foreach (var message in pendingMessages)
9495
{
95-
message.Value.Tick(1000);
96+
try
97+
{
98+
message.Value.Tick(1000);
99+
}
100+
catch{ }
96101
}
97102
}
98103

@@ -129,33 +134,6 @@ public void ProccesReceivedDatagram(byte[] buffer, int offset, int count)
129134

130135
pend.Append(currenSeq, buffer, offset, count);
131136

132-
133-
//lock (locker)
134-
//{
135-
// int originalOffset = offset;
136-
// int msgNo = PrimitiveEncoder.ReadInt32(buffer, ref offset);
137-
// byte totNumSeq = buffer[offset++];
138-
// byte currenSeq = buffer[offset++];
139-
// count = count - (offset - originalOffset);
140-
// if (totNumSeq == 1)
141-
// {
142-
// OnMessageExtracted?.Invoke(buffer, offset, count);
143-
// return;
144-
// }
145-
// if (pendingMessages.TryGetValue(msgNo, out var pend))
146-
// {
147-
// pend.Append(currenSeq, buffer, offset, count);
148-
// return;
149-
// }
150-
151-
// var pending = new PendingMessage(SharerdMemoryStreamPool.RentStreamStatic(), 5000, totNumSeq, msgNo);
152-
// pending.Append(currenSeq, buffer, offset, count);
153-
// pendingMessages.TryAdd(msgNo, pending);
154-
// pending.TimedOut = MessagetimedOut;
155-
// pending.Completed = MessageComplete;
156-
// //Reliable.Timer.Register(pending);
157-
//}
158-
159137
}
160138

161139
internal void MessageComplete(PendingMessage message)
@@ -174,5 +152,12 @@ internal void MessagetimedOut(PendingMessage message)
174152

175153
BufferPool.ReturnBuffer(message.msgBuffer);
176154
}
155+
156+
internal void Release()
157+
{
158+
timer.Dispose();
159+
OnMessageExtracted = null;
160+
pendingMessages.Clear();
161+
}
177162
}
178163
}

NetworkLibrary/UDP/Jumbo/Sender.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,6 @@ private void ProcessBytesSafe(byte[] buffer, int offset, int count)
144144
tempBuff[offset_++] = curresntSeq++;
145145
tempbuffCnt += offset_;
146146

147-
148-
149147
ByteCopy.BlockCopy(buffer, offset, tempBuff, offset_, count);
150148

151149
tempbuffCnt += count;
@@ -219,7 +217,6 @@ public void ProcessBytes(in Segment first, in Segment second)
219217
tempBuff[offset_++] = curresntSeq++;
220218
tempbuffCnt += offset_;
221219

222-
223220
ByteCopy.BlockCopy(second.Array, secondOffset, tempBuff, offset_, count);
224221

225222
tempbuffCnt += count;
@@ -305,5 +302,9 @@ public unsafe void ProcessBytes(in SegmentUnsafe first, in Segment second)
305302

306303
}
307304

305+
internal void Release()
306+
{
307+
OnSend = null;
308+
}
308309
}
309310
}

NetworkLibrary/UDP/Reliable/Components/SenderModule.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ private void StartProducer()
312312
if (pb > PacingThreshold * Windowsize)
313313
{
314314
delayedSchedule = true;
315+
// force = true;
315316
break;
316317
}
317318

Protobuff/P2P/RelayClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace Protobuff.P2P
77
{
88
public class RelayClient : RelayClientBase<ProtoSerializer>
99
{
10-
public RelayClient(X509Certificate2 clientCert) : base(clientCert)
10+
public RelayClient(X509Certificate2 clientCert, int udpPort = 0) : base(clientCert,udpPort)
1111
{
1212
}
1313
}

Tests/ConsoleTest/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333

3434
namespace ConsoleTest
3535
{
36-
36+
//C7dtpD5U6$
3737
internal class Program
3838
{
3939
static int i = 0;

0 commit comments

Comments
 (0)