Skip to content

Commit bb7f9dd

Browse files
committed
state manager and udp single byte reservation
1 parent b95d688 commit bb7f9dd

29 files changed

+3215
-1174
lines changed

Benchmarks/RelayBenchmark/Program.cs

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ internal class Program
1717

1818
static void Main(string[] args)
1919
{
20-
MiniLogger.AllLog += (l) => Console.WriteLine(l);
20+
//ThreadPool.SetMinThreads(2000, 2000);
21+
MiniLogger.AllLog += Console.WriteLine;
2122
RelayTest();
2223

2324
Console.ReadLine();
@@ -57,25 +58,28 @@ private static void SerializerTest()
5758
sw.Stop();
5859
Console.WriteLine(sw.ElapsedMilliseconds);
5960
}
60-
private static void RelayTest()
61+
static MessageEnvelope testMessage => new MessageEnvelope()
62+
{
63+
Header = "Test",
64+
Payload = new byte[32]
65+
};
66+
private static async void RelayTest()
6167
{
6268
string ip = "127.0.0.1";
63-
MessageEnvelope testMessage = new MessageEnvelope()
64-
{
65-
Header = "Test",
66-
// Payload = new byte[32]
67-
};
69+
6870

6971
var cert = new X509Certificate2("client.pfx", "greenpass");
7072
var scert = new X509Certificate2("server.pfx", "greenpass");
7173

72-
// var server = new SecureProtoRelayServer(20011, scert);
74+
var server = new SecureProtoRelayServer(20011, scert);
75+
76+
// Task.Run(async () => { while (true) { await Task.Delay(10000); server.GetTcpStatistics(out var generalStats, out _); Console.WriteLine(generalStats.ToString()); } });
77+
Thread.Sleep(1000);
7378
var clients = new ConcurrentBag<RelayClient>();
74-
//for (int i = 0; i < 200; i++)
7579
int numclients = 20;
76-
Task[] pending = new Task[numclients];
77-
// Parallel.For(0, numclients, (i) =>
78-
for (int i = 0; i < numclients; i++)
80+
var pending = new Task[numclients];
81+
Parallel.For(0, numclients, (i) =>
82+
//for (int i = 0; i < numclients; i++)
7983

8084
{
8185
var client = new RelayClient(cert);
@@ -84,17 +88,18 @@ private static void RelayTest()
8488
//client.OnPeerRegistered+=(id)=> client.RequestHolePunchAsync(id, 10000, false);
8589
try
8690
{
87-
pending[i]= client.ConnectAsync(ip, 20011);
88-
91+
pending[i] = client.ConnectAsync(ip, 20011);
92+
// client.Connect(ip, 20011);
8993
clients.Add(client);
90-
client.StartPingService();
94+
client.StartPingService();
9195
}
9296
catch { }
9397

9498
//Thread.Sleep(1000);
9599
}
96-
//);
100+
);
97101
Task.WaitAll(pending);
102+
Console.WriteLine("All Connected");
98103
Thread.Sleep(5000);
99104
int cc = 0;
100105
List<Task<bool>> pndg = new List<Task<bool>>();
@@ -105,20 +110,21 @@ private static void RelayTest()
105110
// Console.WriteLine("--- -- - | "+client.sessionId+" count: " + client.Peers.Count);
106111
foreach (var peer in client.Peers)
107112
{
108-
if (client.sessionId>peer.Key)
113+
if (client.sessionId.CompareTo(peer.Key)>0)
109114
{
110115
if (peer.Key == Guid.Empty)
111116
throw new Exception();
112117

113118
var a = client.RequestHolePunchAsync(peer.Key, 10000, false);
114119
pndg.Add(a);
115-
120+
//client.TestHP(peer.Key, 10000, false);
116121
// Console.WriteLine(peer.Key+" cnt=> "+ ++cc);
117122
}
118123

119124
}
120125
}
121126
Task.WaitAll(pndg.ToArray());
127+
Console.WriteLine("all good");
122128
int kk = 0;
123129
foreach (var item in pndg)
124130
{
@@ -136,6 +142,7 @@ private static void RelayTest()
136142

137143
Task.Run(async () =>
138144
{
145+
return;
139146
while(true)
140147
{
141148
await Task.Delay(3000);
@@ -146,15 +153,20 @@ private static void RelayTest()
146153
Thread.Sleep(5000);
147154
Parallel.ForEach(clients, (client) =>
148155
{
149-
for (int i = 0; i < 10; i++)
156+
var testMessage = new MessageEnvelope()
157+
{
158+
Header = "Test",
159+
// Payload = new byte[32]
160+
};
161+
for (int i = 0; i < 1; i++)
150162
{
151163
//return;
152164
foreach (var peer in client.Peers.Keys)
153165
{
154166
//await client.SendRequestAndWaitResponse(peer, testMessage,1000);
155167
//client.SendAsyncMessage(peer, testMessage);
156168

157-
// client.SendUdpMesssage(peer, testMessage);
169+
client.SendUdpMesssage(peer, testMessage);
158170
}
159171
}
160172

@@ -172,7 +184,7 @@ void ClientMsgReceived(RelayClient client, MessageEnvelope reply)
172184

173185
void ClientUdpReceived(RelayClient client, MessageEnvelope reply)
174186
{
175-
Interlocked.Increment(ref totMsgCl);
187+
// Interlocked.Increment(ref totMsgCl);
176188
client.SendUdpMesssage(reply.From, reply);
177189

178190
}

NetworkLibrary/Components/PooledMemoryStream.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,6 @@ public override void Write(byte[] buffer, int offset, int count)
187187
length = position;
188188
}
189189

190-
191190
[MethodImpl(MethodImplOptions.AggressiveInlining)]
192191
private void ExpandInternalBuffer(int size)
193192
{

NetworkLibrary/Generic/GenericBuffer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Text;
6-
6+
[assembly: CLSCompliant(true)]
77
namespace NetworkLibrary.Generic
88
{
99
public class GenericBuffer<S> : MessageBuffer where S: ISerializer,new()

NetworkLibrary/MessageProtocol/Fast/GenericMessageQueue.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ public bool TryEnqueueMessage(MessageEnvelope envelope)
9595
writeStream.Position32 = lastPos;
9696
currentIndexedMemory += msgLen + 4;
9797
return true;
98-
9998
}
10099
}
101100
return false;

NetworkLibrary/MessageProtocol/Fast/Network/SecureMessageServer.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public void SendAsyncMessage(in Guid clientId, MessageEnvelope message)
8181
{
8282
if (Sessions.TryGetValue(clientId, out IAsyncSession session))
8383
((SecureMessageSession<S>)session).SendAsync(message);
84+
8485
}
8586

8687
[MethodImpl(MethodImplOptions.AggressiveInlining)]

NetworkLibrary/MessageProtocol/Fast/Network/SecureMessageSession.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ private void SendAsyncInternal(MessageEnvelope message)
8383
mq.TryEnqueueMessage(message);
8484
mq.TryFlushQueue(ref sendBuffer, 0, out int amountWritten);
8585
WriteOnSessionStream(amountWritten);
86-
8786
}
8887

8988
[MethodImpl(MethodImplOptions.AggressiveInlining)]

NetworkLibrary/TCP/SSL/SslClient.cs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Net.Sockets;
77
using System.Security.Cryptography.X509Certificates;
88
using System.Threading.Tasks;
9+
using System.Xml.Serialization;
910

1011
namespace NetworkLibrary.TCP.SSL.Base
1112
{
@@ -51,17 +52,39 @@ public override void Connect(string ip, int port)
5152

5253
}
5354

54-
public override async Task<bool> ConnectAsyncAwaitable(string ip, int port)
55+
public override Task<bool> ConnectAsyncAwaitable(string ip, int port)
5556
{
5657
try
5758
{
5859
IsConnecting = true;
5960
var clientSocket = GetSocket();
6061

61-
await clientSocket.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port)).ConfigureAwait(false);
62+
// this shit is terrible..
63+
// await clientSocket.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port)).ConfigureAwait(false);
6264

63-
Connected(ip, clientSocket);
64-
return true;
65+
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
66+
67+
var earg = new SocketAsyncEventArgs();
68+
earg.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(ip), port);
69+
earg.Completed += (ignored, arg) => { HandleResult(arg); };
70+
71+
if (!clientSocket.ConnectAsync(earg))
72+
{
73+
HandleResult(earg);
74+
}
75+
76+
void HandleResult(SocketAsyncEventArgs arg)
77+
{
78+
if(arg.SocketError == SocketError.Success)
79+
{
80+
Connected(ip, clientSocket);
81+
tcs.SetResult(true);
82+
}
83+
else tcs.SetResult(false);
84+
}
85+
return tcs.Task;
86+
87+
6588
}
6689
catch { throw; }
6790
finally
@@ -103,11 +126,12 @@ private void Connected(string domainName, Socket clientSocket)
103126
new X509CertificateCollection(new[] { certificate }), System.Security.Authentication.SslProtocols.Tls12, true);
104127
this.clientSocket = clientSocket;
105128
var Id = Guid.NewGuid();
129+
106130
clientSession = CreateSession(Id, new ValueTuple<SslStream, IPEndPoint>(sslStream, (IPEndPoint)clientSocket.RemoteEndPoint));
107131
clientSession.OnSessionClosed += (id) => OnDisconnected?.Invoke();
108-
109132
clientSession.OnBytesRecieved += HandleBytesReceived;
110133
clientSession.StartSession();
134+
111135
statisticsPublisher = new TcpClientStatisticsPublisher(clientSession, Id);
112136
IsConnecting = false;
113137
IsConnected = true;
@@ -120,7 +144,6 @@ private void Connected(string domainName, Socket clientSocket)
120144
protected virtual bool ValidateCeriticate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
121145
{
122146
return RemoteCertificateValidationCallback.Invoke(sender, certificate, chain, sslPolicyErrors);
123-
124147
}
125148

126149
private bool DefaultValidationCallbackHandler(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)

NetworkLibrary/TCP/SSL/SslServer.cs

Lines changed: 1 addition & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -108,48 +108,6 @@ private void Accepted(object sender, SocketAsyncEventArgs acceptedArg)
108108

109109
acceptedArg.Dispose();
110110
}
111-
//private void Accepted_(IAsyncResult ar)
112-
//{
113-
// if (Stopping)
114-
// return;
115-
// Socket clientsocket = null;
116-
// try
117-
// {
118-
// clientsocket = serverSocket.EndAccept(ar);
119-
120-
// }
121-
// catch (ObjectDisposedException) { return; }
122-
123-
// if (ar.CompletedSynchronously)
124-
// {
125-
// ThreadPool.UnsafeQueueUserWorkItem(s => serverSocket.BeginAccept(Accepted, null), null);
126-
// }
127-
// else
128-
// {
129-
// serverSocket.BeginAccept(Accepted, null);
130-
// }
131-
// if (!ValidateConnection(clientsocket))
132-
// {
133-
// return;
134-
// }
135-
136-
// var sslStream = new SslStream(new NetworkStream(clientsocket, true), false, ValidateCeriticate);
137-
// try
138-
// {
139-
// sslStream.BeginAuthenticateAsServer(certificate,
140-
// true,
141-
// System.Security.Authentication.SslProtocols.Tls12,
142-
// false,
143-
// EndAuthenticate,
144-
// new ValueTuple<SslStream, IPEndPoint>(sslStream, (IPEndPoint)clientsocket.RemoteEndPoint));
145-
// }
146-
// catch (Exception ex)
147-
// when (ex is AuthenticationException || ex is ObjectDisposedException)
148-
// {
149-
// MiniLogger.Log(MiniLogger.LogLevel.Error, "Athentication as server failed: " + ex.Message);
150-
// }
151-
152-
//}
153111
protected virtual bool ValidateConnection(Socket clientsocket)
154112
{
155113
return OnClientRequestedConnection.Invoke(clientsocket);
@@ -187,8 +145,8 @@ private void EndAuthenticate(IAsyncResult ar)
187145
var ses = CreateSession(sessionId, (ValueTuple<SslStream, IPEndPoint>)ar.AsyncState);
188146
ses.OnBytesRecieved += HandleBytesReceived;
189147
ses.OnSessionClosed += HandeDeadSession;
190-
ses.StartSession();
191148
Sessions.TryAdd(sessionId, ses);
149+
ses.StartSession();
192150

193151
OnClientAccepted?.Invoke(sessionId);
194152
}

NetworkLibrary/UDP/AsyncUdpServer.cs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Collections.Concurrent;
44
using System.Net;
55
using System.Net.Sockets;
6+
using System.Runtime.CompilerServices;
67
using System.Threading;
78

89
namespace NetworkLibrary.UDP
@@ -34,7 +35,7 @@ public int SocketSendBufferSize
3435
socketSendBufferSize = value;
3536
}
3637
}
37-
38+
public EndPoint LocalEndpoint => ServerSocket.LocalEndPoint;
3839
private int receiveBufferSize = 1280000000;
3940
private int socketSendBufferSize = 1280000000;
4041
protected Socket ServerSocket;
@@ -92,15 +93,16 @@ private void StartReceiveSentinel()
9293
Receive(e);
9394
}
9495

96+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
9597
private void Receive(SocketAsyncEventArgs e)
9698
{
97-
9899
if (!ServerSocket.ReceiveFromAsync(e))
99100
{
100101
ThreadPool.UnsafeQueueUserWorkItem((cb) => Received(null, e), null);
101102
}
102103
}
103104

105+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
104106
private void Received(object sender, SocketAsyncEventArgs e)
105107
{
106108
if (e.SocketError != SocketError.Success)
@@ -135,6 +137,7 @@ private void HandleClientRegistered(SocketAsyncEventArgs acceptedArg)
135137

136138
}
137139

140+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
138141
void HandleBytesReceived(IPEndPoint clientRemoteEndpoint, byte[] buffer, int offset, int count)
139142
{
140143
if (Statistics.TryGetValue(clientRemoteEndpoint, out var stats))
@@ -160,12 +163,19 @@ public void SendBytesToClient(IPEndPoint clientEndpoint, byte[] bytes, int offse
160163
try
161164
{
162165
ServerSocket.SendTo(bytes, offset, count, SocketFlags.None, clientEndpoint);
163-
Statistics[clientEndpoint].TotalBytesSent += count;
164-
Statistics[clientEndpoint].TotalDatagramSent += 1;
166+
if (Statistics.TryGetValue(clientEndpoint, out var value))
167+
{
168+
value.TotalBytesSent += count;
169+
value.TotalDatagramSent += 1;
170+
}
171+
165172
}
166173
catch
167174
{
168-
Statistics[clientEndpoint].TotalMessageDropped += 1;
175+
if(Statistics.TryGetValue(clientEndpoint, out var value))
176+
{
177+
value.TotalMessageDropped += 1;
178+
}
169179

170180
}
171181
}

0 commit comments

Comments
 (0)