Skip to content

Commit 27aec03

Browse files
committed
Refactor on logic decoupling, cleanup
1 parent 0322033 commit 27aec03

21 files changed

+763
-356
lines changed

ConsoleTest/Program.cs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using CustomNetworkLib;
22
using CustomNetworkLib.SocketEventArgsTests;
3+
using CustomNetworkLib.Utils;
34
using NetworkSystem;
45
using System;
56
using System.Collections.Concurrent;
@@ -213,27 +214,28 @@ static void ClientBytesRecieved(byte[] bytes, int offset, int count)
213214
private static void TcpTest()
214215
{
215216
var msg1 = new byte[32];
217+
int clAmount = 10;
216218

217-
server = new ByteMessageTcpServer(2008);
219+
server = new ByteMessageTcpServer(2008,clAmount*2);
218220
List<ByteMessageTcpClient> clients = new List<ByteMessageTcpClient>();
219-
server.MaxIndexedMemoryPerClient = 1280000;
221+
server.MaxIndexedMemoryPerClient = 1280000000;
220222
server.DropOnBackPressure = false;
221223
server.StartServer();
222-
int clAmount = 1000;
223-
BufferManager.InitContigiousSendBuffers(clAmount*2, 12800);
224-
BufferManager.InitContigiousReceiveBuffers(clAmount*2, 12800);
224+
//BufferManager.InitContigiousSendBuffers(clAmount*2, 128000);
225+
//BufferManager.InitContigiousReceiveBuffers(clAmount*2, 128000);
225226

226-
int dep=0;
227+
int dep =0;
227228
for (int i = 0; i < clAmount; i++)
228229
{
229230

230231
var client = new ByteMessageTcpClient();
232+
client.BufferManager = server.BufferManager;
231233
client.MaxIndexedMemory = server.MaxIndexedMemoryPerClient;
232234
client.DropOnCongestion=false;
233-
client.OnConnected += async () =>
234-
{
235-
await Task.Delay(10000); Console.WriteLine("------------------- --------------"); client.Disconnect();
236-
};
235+
//client.OnConnected += async () =>
236+
//{
237+
// await Task.Delay(10000); Console.WriteLine("------------------- --------------"); client.Disconnect();
238+
//};
237239
client.OnBytesRecieved += (byte[] arg2, int offset, int count) => clientMsgRec2(client, arg2, offset, count);
238240

239241
client.ConnectAsync("127.0.0.1", 2008);
@@ -257,7 +259,7 @@ private static void TcpTest()
257259
}
258260

259261

260-
const int numMsg = 100;
262+
const int numMsg = 1000000;
261263

262264
var t1 = new Thread(() =>
263265
{
@@ -267,7 +269,7 @@ private static void TcpTest()
267269
foreach (var client in clients)
268270
{
269271
msg = new byte[32];
270-
BufferManager.WriteInt32AsBytes(ref msg, 0, i);
272+
PrefixWriter.WriteInt32AsBytes(ref msg, 0, i);
271273

272274
client.SendAsync(msg);
273275

CustomNetworkLib/TCP/Base/AsyncTcpClient.cs

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System;
1+
using CustomNetworkLib.Utils;
2+
using System;
23
using System.Collections.Concurrent;
34
using System.Collections.Generic;
45
using System.Diagnostics;
@@ -17,22 +18,37 @@ public class AsyncTpcClient
1718
public Action OnConnected;
1819
public Action OnConnectFailed;
1920
public Action OnDisconnected;
20-
21+
22+
private BufferProvider bufferManager;
23+
public BufferProvider BufferManager
24+
{
25+
get => bufferManager;
26+
set
27+
{
28+
if (IsConnecting)
29+
throw new InvalidOperationException("Setting buffer manager is not supported after conection is initiated.");
30+
bufferManager = value;
31+
}
32+
}
2133

2234
private Socket clientSocket;
2335
private TaskCompletionSource<bool> connectedCompletionSource;
2436

2537
protected bool connected = false;
2638
protected IAsyncSession session;
27-
28-
39+
public bool IsConnecting { get; private set; }
40+
public bool IsConnected { get; private set; }
2941

42+
// Config
3043
public int SocketSendBufferSize = 128000;
3144
public int SocketRecieveBufferSize = 128000;
32-
45+
public int MaxIndexedMemory = 128000;
46+
public bool DropOnCongestion = false;
3347

3448
public AsyncTpcClient()
35-
{}
49+
{
50+
51+
}
3652

3753
public async Task<bool> ConnectAsyncAwaitable(string IP, int port)
3854
{
@@ -43,6 +59,9 @@ public async Task<bool> ConnectAsyncAwaitable(string IP, int port)
4359

4460
public void ConnectAsync(string IP, int port)
4561
{
62+
MiniLogger.Log(MiniLogger.LogLevel.Info, "Client Connecting.. ");
63+
IsConnecting = true;
64+
4665
clientSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
4766

4867
clientSocket.ReceiveBufferSize = SocketRecieveBufferSize;
@@ -62,7 +81,7 @@ private void Connected(object sender, SocketAsyncEventArgs e)
6281
{
6382
if (e.SocketError != SocketError.Success)
6483
{
65-
HandleError(e, "While connecting an error occured: ");
84+
HandleError(e, "While Connecting an Error Eccured: ");
6685
OnConnectFailed?.Invoke();
6786
connectedCompletionSource?.SetException(new SocketException((int)e.SocketError));
6887
}
@@ -80,45 +99,59 @@ private void Connected(object sender, SocketAsyncEventArgs e)
8099
public virtual void SendAsync(byte[] buffer)
81100
{
82101
if (connected)
83-
session?.SendAsync(buffer);
84-
return;
85-
102+
session?.SendAsync(buffer);
103+
}
104+
public void Disconnect()
105+
{
106+
session.EndSession();
86107
}
87108

88109
private void HandleError(SocketAsyncEventArgs e, string context)
89110
{
90-
Console.WriteLine("An error Occured while " + context + " associated port: "
91-
+ ((IPEndPoint)e.AcceptSocket.RemoteEndPoint).Port + " Error: " + Enum.GetName(typeof(SocketError), e.SocketError));
92-
111+
string msg = "An error Occured While " + context +
112+
" associated port: "
113+
+ ((IPEndPoint)e.AcceptSocket.RemoteEndPoint).Port +
114+
"IP: "
115+
+((IPEndPoint)e.AcceptSocket.RemoteEndPoint).Address.ToString() +
116+
" Error: " + Enum.GetName(typeof(SocketError), e.SocketError);
93117

94-
95-
}
118+
MiniLogger.Log(MiniLogger.LogLevel.Error, msg);
96119

97-
public void Disconnect()
98-
{
99-
session.EndSession();
100120
}
101121

102122
private void ClientDisconnected(object sender, SocketAsyncEventArgs e)
103123
{
104124
e.AcceptSocket.Close();
105125
e.AcceptSocket.Dispose();
106126
e.Dispose();
127+
IsConnected = false;
107128
}
108129

109-
110130
protected virtual void HandleConnected(SocketAsyncEventArgs e)
111131
{
112-
CreateSession(e,Guid.NewGuid());
113-
session.OnBytesRecieved += (byte[] bytes,int offset, int count) => HandleBytesRecieved(bytes, offset, count);
132+
if (bufferManager == null)
133+
{
134+
bufferManager = new BufferProvider(1, SocketSendBufferSize, 1, SocketRecieveBufferSize);
135+
}
136+
137+
CreateSession(e,Guid.NewGuid(),bufferManager);
138+
session.OnBytesRecieved += (Guid sessionId, byte[] bytes,int offset, int count) => HandleBytesRecieved(bytes, offset, count);
114139
session.OnSessionClosed += (Guid sessionId) => OnDisconnected?.Invoke();
115140

116141
session.StartSession();
117142
OnConnected?.Invoke();
143+
MiniLogger.Log(MiniLogger.LogLevel.Info, "Client Connected.");
144+
145+
IsConnected = true;
118146
}
119-
protected virtual void CreateSession(SocketAsyncEventArgs e, Guid sessionId )
147+
protected virtual void CreateSession(SocketAsyncEventArgs e, Guid sessionId,BufferProvider bufferManager )
120148
{
121-
session = new TcpSession(e,sessionId);
149+
var ses = new TcpSession(e,sessionId,bufferManager);
150+
ses.socketSendBufferSize = SocketSendBufferSize;
151+
ses.socketRecieveBufferSize = SocketRecieveBufferSize;
152+
ses.maxIndexedMemory = MaxIndexedMemory;
153+
ses.dropOnCongestion = DropOnCongestion;
154+
session = ses;
122155
}
123156

124157
protected virtual void HandleBytesRecieved(byte[] bytes,int offset,int count)

CustomNetworkLib/TCP/Base/AsyncTcpServer.cs

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace CustomNetworkLib
44
{
5+
using CustomNetworkLib.Utils;
56
using System;
67
using System.Collections.Concurrent;
78
using System.Collections.Generic;
@@ -22,26 +23,36 @@ public class AsyncTcpServer
2223
public ClientAccepted OnClientAccepted;
2324
public BytesRecieved OnBytesRecieved;
2425

25-
public int MaxIndexedMemoryPerClient = 128000;
26+
public BufferProvider BufferManager { get; private set; }
27+
28+
public int MaxClients { get; private set; } = 10000;
29+
public int ClientSendBufsize = 128000;
30+
public int ClientReceiveBufsize = 128000;
31+
public int MaxIndexedMemoryPerClient = 1280000;
2632
public int SoocketReceiveBufferSize = 2080000000;
27-
public bool DropOnBackPressure=false;
33+
public bool DropOnBackPressure = false;
2834
public bool NaggleNoDelay = false;
2935

3036
protected Socket ServerSocket;
3137
protected int ServerPort { get; }
3238
public ConcurrentDictionary<Guid, IAsyncSession> Sessions { get; } = new ConcurrentDictionary<Guid, IAsyncSession>();
33-
public AsyncTcpServer(int port = 20008)
39+
public bool Stopping { get; private set; }
40+
41+
public AsyncTcpServer(int port = 20008, int maxClients = 100)
3442
{
3543
ServerPort = port;
44+
MaxClients = maxClients;
3645
}
3746

3847
public void StartServer()
3948
{
49+
BufferManager = new BufferProvider(MaxClients, ClientSendBufsize, MaxClients, ClientReceiveBufsize);
50+
4051
ServerSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
4152
ServerSocket.NoDelay = NaggleNoDelay;
4253
ServerSocket.ReceiveBufferSize = SoocketReceiveBufferSize;
4354
ServerSocket.Bind(new IPEndPoint(IPAddress.Any, ServerPort));
44-
ServerSocket.Listen(10000);
55+
ServerSocket.Listen(MaxClients);
4556

4657

4758
SocketAsyncEventArgs e = new SocketAsyncEventArgs();
@@ -55,51 +66,60 @@ public void StartServer()
5566

5667
private void Accepted(object sender, SocketAsyncEventArgs acceptedArg)
5768
{
58-
if (acceptedArg.SocketError != SocketError.Success)
59-
{
60-
HandleError(acceptedArg.SocketError, "While Accepting Client an error occured");
69+
if (Stopping)
6170
return;
62-
}
6371
SocketAsyncEventArgs nextClient = new SocketAsyncEventArgs();
6472
nextClient.Completed += Accepted;
6573

6674
if (!ServerSocket.AcceptAsync(nextClient))
6775
{
68-
ThreadPool.QueueUserWorkItem((s) => Accepted(null, nextClient));
76+
ThreadPool.UnsafeQueueUserWorkItem((s) => Accepted(null, nextClient), null);
6977
}
7078

7179
if (acceptedArg.SocketError != SocketError.Success)
7280
{
73-
Console.WriteLine(Enum.GetName(typeof(SocketError), acceptedArg.SocketError));
81+
HandleError(acceptedArg.SocketError, "While Accepting Client an Error Occured :");
7482
return;
7583
}
7684

77-
int port = ((IPEndPoint)acceptedArg.AcceptSocket.RemoteEndPoint).Port;
78-
Guid clientGuid = Guid.NewGuid();
85+
if (!HandleConnectionRequest(acceptedArg.AcceptSocket))
86+
{
87+
return;
88+
}
7989

80-
var session = CreateSession(acceptedArg, clientGuid);
81-
Sessions.TryAdd(clientGuid, session);
90+
Guid clientGuid = Guid.NewGuid();
91+
var session = CreateSession(acceptedArg, clientGuid, BufferManager);
8292

83-
session.OnBytesRecieved += (byte[] bytes,int offset, int count) => { HandleBytesRecieved(clientGuid, bytes,offset,count); };
93+
session.OnBytesRecieved += (Guid sessionId, byte[] bytes,int offset, int count) => { HandleBytesRecieved(sessionId, bytes,offset,count); };
8494
session.OnSessionClosed += (Guid sessionId) => { HandleDeadSession(sessionId); };
8595

8696
session.StartSession();
87-
Console.WriteLine("Accepted with port: " + ((IPEndPoint)acceptedArg.AcceptSocket.RemoteEndPoint).Port);
97+
Sessions.TryAdd(clientGuid, session);
98+
99+
string msg = "Accepted with port: " + ((IPEndPoint)acceptedArg.AcceptSocket.RemoteEndPoint).Port+
100+
" Ip: "+ ((IPEndPoint)acceptedArg.AcceptSocket.RemoteEndPoint).Address.ToString();
101+
102+
MiniLogger.Log(MiniLogger.LogLevel.Info, msg);
88103

89104
HandleClientAccepted(clientGuid,acceptedArg);
90105
}
91106

107+
protected virtual bool HandleConnectionRequest(Socket acceptSocket)
108+
{
109+
return true;
110+
}
92111

93-
protected virtual IAsyncSession CreateSession(SocketAsyncEventArgs e, Guid sessionId)
112+
protected virtual IAsyncSession CreateSession(SocketAsyncEventArgs e, Guid sessionId, BufferProvider bufferManager)
94113
{
95-
var session = new TcpSession(e, sessionId);
96-
session.MaxIndexedMemory = MaxIndexedMemoryPerClient;
97-
session.DropOnCongestion = DropOnBackPressure;
114+
var session = new TcpSession(e, sessionId, bufferManager);
115+
session.socketSendBufferSize = ClientSendBufsize;
116+
session.socketRecieveBufferSize = ClientReceiveBufsize;
117+
session.maxIndexedMemory = MaxIndexedMemoryPerClient;
118+
session.dropOnCongestion = DropOnBackPressure;
98119
return session;
99120
}
100121

101122

102-
//#region Send
103123
public void SendBytesToAllClients(byte[] bytes)
104124
{
105125
Parallel.ForEach(Sessions, session =>
@@ -129,12 +149,13 @@ protected virtual void HandleDeadSession(Guid sessionId)
129149
Sessions.TryRemove(sessionId, out _);
130150
}
131151

132-
protected virtual void HandleError(SocketError error, object context)
152+
protected virtual void HandleError(SocketError error, string context)
133153
{
134-
154+
MiniLogger.Log(MiniLogger.LogLevel.Error, context + Enum.GetName(typeof(SocketError), error));
135155
}
136-
public void StopServer()
156+
public virtual void StopServer()
137157
{
158+
Stopping = true;
138159
ServerSocket.Close();
139160
ServerSocket.Dispose();
140161

CustomNetworkLib/TCP/Base/Interface/IAsyncSession.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace CustomNetworkLib
66
{
77
public interface IAsyncSession: IDisposable
88
{
9-
event Action<byte[],int,int> OnBytesRecieved;
9+
event Action<Guid,byte[],int,int> OnBytesRecieved;
1010
event Action<Guid> OnSessionClosed;
1111
void SendAsync(byte[] buffer);
1212

0 commit comments

Comments
 (0)