Skip to content

Commit 31b1a43

Browse files
committed
Task Syncronization context patch to aviod WPF deadlock, Holepunch extension .
1 parent ffa02e9 commit 31b1a43

18 files changed

+391
-138
lines changed

NetworkLibrary/TCP/Base/AsyncTcpClient.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ public override void Connect(string IP, int port)
3434

3535
public override async Task<bool> ConnectAsyncAwaitable(string IP, int port)
3636
{
37-
connectedCompletionSource = new TaskCompletionSource<bool>();
37+
connectedCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
3838
ConnectAsync(IP, port);
39-
return await connectedCompletionSource.Task;
39+
return await connectedCompletionSource.Task.ConfigureAwait(false);
4040
}
4141

4242
public override void ConnectAsync(string IP, int port)

NetworkLibrary/TCP/SSL/SslClient.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public override async Task<bool> ConnectAsyncAwaitable(string ip, int port)
6161
IsConnecting = true;
6262
var clientSocket = GetSocket();
6363

64-
await clientSocket.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port));
64+
await clientSocket.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port)).ConfigureAwait(false);
6565

6666
Connected(ip,clientSocket);
6767
return true;
@@ -82,7 +82,7 @@ public override void ConnectAsync(string IP, int port)
8282
bool result = false;
8383
try
8484
{
85-
result = await ConnectAsyncAwaitable(IP, port);
85+
result = await ConnectAsyncAwaitable(IP, port).ConfigureAwait(false);
8686

8787
}
8888
catch (Exception ex)

NetworkLibrary/UDP/AsyncUdpClient.cs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ private void Receive()
129129
{
130130
try
131131
{
132-
activeRec = clientSocket.BeginReceiveFrom(recieveBuffer, 0, recieveBuffer.Length, SocketFlags.None, ref remoteEndPoint, EndRecieveFrom, null);
132+
EndPoint rr = new IPEndPoint(IPAddress.Any, 0);
133+
activeRec = clientSocket.BeginReceiveFrom(recieveBuffer, 0, recieveBuffer.Length, SocketFlags.None, ref rr, EndRecieveFrom, null);
133134

134135
}
135136
catch (Exception e)
@@ -213,6 +214,36 @@ public void SendAsync(byte[] bytes)
213214
SendAsync(bytes, 0, bytes.Length);
214215
}
215216

217+
public void SendTo(byte[] bytes, int offset, int count, EndPoint endpoint)
218+
{
219+
clientSocket.SendTo(bytes, offset, count, SocketFlags.None, endpoint);
220+
}
221+
222+
public void ReceiveOnceFrom(EndPoint endPoint, Action<byte[],int,int> OnReceived)
223+
{
224+
var buffer = BufferPool.RentBuffer(64000);
225+
clientSocket.BeginReceiveFrom(buffer, 0, 62000, SocketFlags.None, ref endPoint, OnReceived_, buffer);
226+
227+
void OnReceived_(IAsyncResult ar)
228+
{
229+
try
230+
{
231+
int amount = clientSocket.EndReceiveFrom(ar,ref endPoint);
232+
byte[] bytes = ar.AsyncState as byte[];
233+
OnReceived?.Invoke(bytes, 0, amount);
234+
BufferPool.ReturnBuffer(bytes);
235+
}
236+
catch(Exception e)
237+
{
238+
MiniLogger.Log(MiniLogger.LogLevel.Error,"ReceiveOnceFrom Error: "+e.Message);
239+
OnReceived?.Invoke(null, 0, 0);
240+
}
241+
242+
}
243+
}
244+
245+
246+
216247
public void JoinMulticastGroup(IPAddress multicastAddr)
217248
{
218249
MulticastOption mcOpt = new MulticastOption(multicastAddr, ((IPEndPoint)clientSocket.LocalEndPoint).Address);

NetworkLibrary/UDP/Secure/SecureUdpClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace NetworkLibrary.UDP.Secure
1111
{
1212
public class SecureUdpClient:AsyncUdpClient
1313
{
14-
ConcurrentAesAlgorithm algorithm;
14+
public ConcurrentAesAlgorithm algorithm;
1515
public SecureUdpClient(ConcurrentAesAlgorithm algorithm, int port) : base(port)
1616
{
1717
this.algorithm = algorithm;

Protobuff/Components/ConcurrentProtoSerialiser.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public T Deserialize<T>(byte[] data) where T : IProtoMessage
7878
[MethodImpl(MethodImplOptions.AggressiveInlining)]
7979
public T Deserialize<T>(byte[] data, int offset, int count) where T : IProtoMessage
8080
{
81-
if (null == data)
81+
if (data == null || count == 0)
8282
return default;
8383

8484
ReadOnlySpan<byte> seq = new ReadOnlySpan<byte>(data, offset, count);

Protobuff/Components/MessageAwaiter.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,14 +192,14 @@ public MessageAwaiter()
192192
};
193193
}
194194

195-
public async ValueTask<MessageEnvelope> RegisterWait(Guid messageId, int timeoutMs)
195+
public async Task<MessageEnvelope> RegisterWait(Guid messageId, int timeoutMs)
196196
{
197-
awaitingMessages[messageId] = new TaskCompletionSource<MessageEnvelope>();
197+
awaitingMessages[messageId] = new TaskCompletionSource<MessageEnvelope>(TaskCreationOptions.RunContinuationsAsynchronously);
198198
var pending = awaitingMessages[messageId].Task;
199199
MessageEnvelope returnMessage;
200200

201201
var delay = Task.Delay(timeoutMs);
202-
if (await Task.WhenAny(pending, delay) == pending)
202+
if (await Task.WhenAny(pending, delay).ConfigureAwait(false) == pending)
203203
{
204204
// Task completed within timeout.
205205
returnMessage = pending.Result;
@@ -217,8 +217,12 @@ public async ValueTask<MessageEnvelope> RegisterWait(Guid messageId, int timeout
217217

218218
public void ResponseArrived(MessageEnvelope envelopedMessage)
219219
{
220-
if(envelopedMessage.MessageId!=null && awaitingMessages.TryGetValue(envelopedMessage.MessageId, out var completionSource))
220+
if (envelopedMessage.MessageId != null && awaitingMessages.TryGetValue(envelopedMessage.MessageId, out var completionSource))
221+
{
222+
// lock(do a copy) because continiation will be async.
223+
envelopedMessage.LockBytes();
221224
completionSource.TrySetResult(envelopedMessage);
225+
}
222226
}
223227

224228
internal bool IsWaiting(in Guid messageId)

Protobuff/P2P/HolePunch/ClientHolepunchState.cs

Lines changed: 151 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
using NetworkLibrary.Components;
22
using NetworkLibrary.Utils;
3+
using ProtoBuf;
34
using System;
5+
using System.Collections.Concurrent;
46
using System.Collections.Generic;
7+
using System.ComponentModel;
8+
using System.Drawing;
9+
using System.Linq;
510
using System.Net;
611
using System.Net.Sockets;
712
using System.Text;
@@ -12,22 +17,25 @@ namespace Protobuff.P2P.HolePunch
1217
{
1318
internal class ClientHolepunchState
1419
{
15-
public TaskCompletionSource<EncryptedUdpProtoClient> Completion = new TaskCompletionSource<EncryptedUdpProtoClient>();
20+
public TaskCompletionSource<EncryptedUdpProtoClient> Completion = new TaskCompletionSource<EncryptedUdpProtoClient>(TaskCreationOptions.RunContinuationsAsynchronously);
1621
private int Success = 0;
1722

1823
internal EncryptedUdpProtoClient holepunchClient;
1924
private readonly RelayClient client;
2025
internal readonly Guid stateId;
2126
internal readonly Guid DestinationId;
27+
internal bool encrypted = true;
2228

2329
// initiator client is the one who generated the state id.
2430
// this id traveled though relay to here.
25-
public ClientHolepunchState(RelayClient client, Guid stateId, Guid To)
31+
public ClientHolepunchState(RelayClient client, Guid stateId, Guid To, int timeoutMs = 5000, bool encrypted = true)
2632
{
27-
StartLifetimeCounter(5000);
33+
StartLifetimeCounter(timeoutMs);
2834
this.client = client;
2935
this.stateId = stateId;
3036
DestinationId = To;
37+
this.encrypted= encrypted;
38+
MiniLogger.Log(MiniLogger.LogLevel.Info,"---------- Encryption: " + encrypted.ToString());
3139
}
3240

3341
public void HandleMessage(MessageEnvelope message)
@@ -51,9 +59,11 @@ public void HandleMessage(MessageEnvelope message)
5159

5260
private async void StartLifetimeCounter(int lifeSpanMs)
5361
{
54-
await Task.Delay(lifeSpanMs);
62+
await Task.Delay(lifeSpanMs).ConfigureAwait(false);
5563
if (!Completion.Task.IsCompleted)
5664
{
65+
Interlocked.Exchange(ref cancelSends, 1);
66+
Interlocked.Exchange(ref endReceives, 1);
5767
holepunchClient.Dispose();
5868
Completion.TrySetResult(null);
5969
}
@@ -69,7 +79,7 @@ private void CreateUdpChannel(MessageEnvelope message)
6979
holepunchClient = new EncryptedUdpProtoClient(aesAlgorithm);
7080
holepunchClient.SocketSendBufferSize = 12800000;
7181
holepunchClient.ReceiveBufferSize = 12800000;
72-
holepunchClient.OnMessageReceived += HolePunchPeerMsgReceived;
82+
//holepunchClient.OnMessageReceived += HolePunchPeerMsgReceived;
7383
holepunchClient.Bind();
7484

7585
// tricky point: its disaster when udp client receives from 2 endpoints.. corruption
@@ -86,45 +96,134 @@ private void SendUdpEndpointMessage()
8696
MiniLogger.Log(MiniLogger.LogLevel.Info, "Sending Endpoint");
8797

8898
MessageEnvelope envelope = GetEnvelope("");
99+
EndpointTransferMessage innerMsg = new EndpointTransferMessage();
100+
innerMsg.LocalEndpoints = GetLocalEndpoints();
89101
envelope.From = client.sessionId;
90102

91-
holepunchClient.SendAsyncMessage(envelope);
103+
holepunchClient.SendAsyncMessage(envelope, innerMsg);
92104
}
105+
private List<EndpointTransferMessage> GetLocalEndpoints()
106+
{
107+
List<EndpointTransferMessage> endpoints = new List<EndpointTransferMessage>();
108+
var lep = (IPEndPoint)holepunchClient.LocalEndpoint;
93109

110+
var host = Dns.GetHostEntry(Dns.GetHostName());
111+
foreach (var ip in host.AddressList)
112+
{
113+
if (ip.AddressFamily == AddressFamily.InterNetwork)
114+
{
115+
if (ip.ToString() == "0.0.0.0")
116+
continue;
117+
endpoints.Add(new EndpointTransferMessage()
118+
{
119+
IpRemote = ip.ToString(),
120+
PortRemote=lep.Port
121+
});
122+
}
123+
}
124+
return endpoints;
125+
}
94126
int cancelSends;
127+
ConcurrentProtoSerialiser seri = new ConcurrentProtoSerialiser();
95128
private void StartHolepunch(MessageEnvelope message)
96129
{
97130
var endPoint = message.UnpackPayload<EndpointTransferMessage>();
98-
MiniLogger.Log(MiniLogger.LogLevel.Info, client.sessionId.ToString() + " --- punching towards " + endPoint.PortRemote);
99-
100-
holepunchClient?.SetRemoteEnd(endPoint.IpRemote, endPoint.PortRemote);
131+
MiniLogger.Log(MiniLogger.LogLevel.Info, client.sessionId.ToString() + " --- punching towards " + endPoint.IpRemote + " - " + endPoint.PortRemote);
132+
foreach (var item in endPoint.LocalEndpoints)
133+
{
134+
MiniLogger.Log(MiniLogger.LogLevel.Info, client.sessionId.ToString() + " --- punching towards " +item.IpRemote+" - "+item.PortRemote);
135+
}
136+
137+
message.From = client.sessionId;
138+
message.MessageId = stateId;
139+
140+
var any = new IPEndPoint(IPAddress.Any, endPoint.PortRemote);
141+
holepunchClient.ReceiveOnceFrom(any, OnBytesReceived);
142+
143+
IPEndPoint ep = new IPEndPoint(IPAddress.Parse(endPoint.IpRemote).MapToIPv4(), endPoint.PortRemote);
144+
var bytes = seri.SerializeMessageEnvelope(message, endPoint);
145+
146+
PunchAlgorithm(bytes, 0, bytes.Length, ep);
147+
148+
foreach (var endpointMsg in endPoint.LocalEndpoints)
149+
{
150+
IPEndPoint epl = new IPEndPoint(IPAddress.Parse(endpointMsg.IpRemote).MapToIPv4(), endpointMsg.PortRemote);
151+
var bytes_ = seri.SerializeMessageEnvelope(message, endpointMsg);
152+
PunchAlgorithm(bytes_, 0, bytes_.Length, epl);
153+
}
154+
155+
156+
}
157+
private void PunchAlgorithm(byte[] bytes_,int offset,int count, EndPoint epl)
158+
{
159+
101160
//message.Payload = null;
102161

103-
// now we punch a hole through nat, this is exp[erimentally optimized.
162+
// now we punch a hole through nat, this is experimentally optimized.
163+
if (Interlocked.CompareExchange(ref cancelSends, 0, 0) == 1)
164+
return;
165+
holepunchClient.SendTo(bytes_, 0, count, epl);
166+
167+
if (Interlocked.CompareExchange(ref cancelSends, 0, 0) == 1)
168+
return;
169+
holepunchClient.SendTo(bytes_, 0, count, epl);
104170

105-
holepunchClient?.SendAsyncMessage(message);
106-
holepunchClient?.SendAsyncMessage(message);
107-
holepunchClient?.SendAsyncMessage(message);
171+
if (Interlocked.CompareExchange(ref cancelSends, 0, 0) == 1)
172+
return;
173+
108174

109175
Task.Run(async () =>
110176
{
111-
for (int i = 0; i < 30; i++)
177+
for (int i = 0; i < 10; i++)
112178
{
113179
if(Interlocked.CompareExchange(ref cancelSends,0,0)==1)
114180
return;
115-
holepunchClient?.SendAsyncMessage(message);
116-
if (i > 10)
117-
await Task.Delay(i - 10);
181+
holepunchClient.SendTo(bytes_, 0, count, epl);
182+
183+
await Task.Delay(i).ConfigureAwait(false);
118184

119185
}
120186
});
121187
}
188+
ConcurrentDictionary<EndpointTransferMessage,bool> successes = new ConcurrentDictionary<EndpointTransferMessage,bool>();
189+
int endReceives = 0;
190+
int msgSent = 0;
191+
private void OnBytesReceived(byte[] arg1, int arg2, int arg3)
192+
{
193+
if (Interlocked.CompareExchange(ref endReceives, 0, 0) == 1)
194+
return;
195+
if(arg1 == null)
196+
{
197+
var any = new IPEndPoint(IPAddress.Any, 0);
198+
holepunchClient.ReceiveOnceFrom(any, OnBytesReceived);
199+
return;
200+
}
201+
202+
if (Interlocked.CompareExchange(ref msgSent, 1,0) == 1)
203+
{
204+
var any = new IPEndPoint(IPAddress.Any, 0);
205+
holepunchClient.ReceiveOnceFrom(any, OnBytesReceived);
206+
return;
207+
}
208+
209+
MessageEnvelope msg = seri.DeserialiseEnvelopedMessage(arg1, 0, arg3);
210+
var succesfullEp= msg.UnpackPayload<EndpointTransferMessage>();
211+
if (successes.TryAdd(succesfullEp,true))
212+
{
213+
var envelope = GetEnvelope(HolepunchHeaders.SuccesAck);
214+
envelope.From = client.sessionId;
215+
envelope.MessageId = stateId;
216+
client.SendAsyncMessage(DestinationId, envelope,succesfullEp);
217+
}
218+
var any1 = new IPEndPoint(IPAddress.Any, 0);
219+
holepunchClient.ReceiveOnceFrom(any1, OnBytesReceived);
220+
}
122221

123222
private void HolePunchPeerMsgReceived(MessageEnvelope obj)
124223
{
125224
if (Interlocked.CompareExchange(ref Success, 1, 0) == 0)
126225
{
127-
Interlocked.Increment(ref cancelSends);
226+
Interlocked.Exchange(ref cancelSends,1);
128227
MiniLogger.Log(MiniLogger.LogLevel.Info, "got hp feedback yay!" + client.sessionId.ToString());
129228
SendAckToRelay();
130229
}
@@ -140,11 +239,41 @@ private void SendAckToRelay()
140239
// done.
141240
private void HandleSuccess(MessageEnvelope message)
142241
{
143-
MiniLogger.Log(MiniLogger.LogLevel.Info, "success!!");
242+
Interlocked.Exchange(ref cancelSends, 1);
144243
message.LockBytes();
145-
holepunchClient.SwapAlgorith(new ConcurrentAesAlgorithm(message.Payload, message.Payload));
146-
// holepunchClient.SwapAlgorith(null);
147-
Completion.TrySetResult(holepunchClient);
244+
if (message.KeyValuePairs != null)
245+
{
246+
var ip = message.KeyValuePairs["IP"];
247+
var port = message.KeyValuePairs["Port"];
248+
ThreadPool.UnsafeQueueUserWorkItem(async(s) =>
249+
{
250+
251+
await Task.Delay(200);
252+
Interlocked.Exchange(ref endReceives, 1);
253+
254+
if(encrypted)
255+
holepunchClient.SwapAlgorith(new ConcurrentAesAlgorithm(message.Payload, message.Payload));
256+
else
257+
holepunchClient.SwapAlgorith(null);
258+
259+
260+
holepunchClient.SetRemoteEnd(ip, int.Parse(port));
261+
262+
Completion.TrySetResult(holepunchClient);
263+
MiniLogger.Log(MiniLogger.LogLevel.Info, "HP Complete!:" + ip);
264+
265+
266+
}, null);
267+
MiniLogger.Log(MiniLogger.LogLevel.Info, "Success Selected IP:" + ip);
268+
269+
}
270+
else
271+
{
272+
holepunchClient.SwapAlgorith(new ConcurrentAesAlgorithm(message.Payload, message.Payload));
273+
// holepunchClient.SwapAlgorith(null);
274+
Completion.TrySetResult(holepunchClient);
275+
}
276+
148277
}
149278

150279
private MessageEnvelope GetEnvelope(string Header)

0 commit comments

Comments
 (0)