Skip to content

Commit 686d0ce

Browse files
committed
Refactoring & disposal reliability patch
1 parent a91b38d commit 686d0ce

29 files changed

+218
-768
lines changed

Examples/Examples.csproj

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
44
<OutputType>Exe</OutputType>
@@ -9,15 +9,7 @@
99

1010
<ItemGroup>
1111
<PackageReference Include="protobuf-net" Version="3.1.26" />
12-
</ItemGroup>
13-
14-
<ItemGroup>
15-
<Reference Include="NetworkLibrary">
16-
<HintPath>..\Protobuff\bin\Debug\netstandard2.0\NetworkLibrary.dll</HintPath>
17-
</Reference>
18-
<Reference Include="Protobuff">
19-
<HintPath>..\Protobuff\bin\Debug\netstandard2.0\Protobuff.dll</HintPath>
20-
</Reference>
12+
<PackageReference Include="Protobuf.Network.Library" Version="1.0.0" />
2113
</ItemGroup>
2214

2315
<ItemGroup>

NetworkLibrary/Components/BufferPool.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
namespace NetworkLibrary
1212
{
1313
/*
14-
* This behaves like ArrayPool<>. The butckets are TLS due to concurrent bag. ( ThreadLocal<ThreadLocalList> m_locals )
14+
* This behaves like ArrayPool<>. The butckets are TLS thx to concurrent bag. ( ThreadLocal<ThreadLocalList> m_locals )
1515
*/
1616
public class BufferPool
1717
{
@@ -48,7 +48,6 @@ public class BufferPool
4848

4949
};
5050

51-
5251
static BufferPool()
5352
{
5453
Init();

NetworkLibrary/Components/MessageBuffer/MessageBuffer.cs

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class MessageBuffer:IMessageQueue
1818
protected readonly object loki = new object();
1919
protected bool writeLengthPrefix;
2020
protected int currentIndexedMemory;
21-
private bool disposedValue;
21+
protected bool disposedValue;
2222

2323
public MessageBuffer(int maxIndexedMemory, bool writeLengthPrefix = true)
2424
{
@@ -33,10 +33,11 @@ public bool IsEmpty()
3333

3434
public bool TryEnqueueMessage(byte[] bytes)
3535
{
36-
if (Volatile.Read(ref currentIndexedMemory) < MaxIndexedMemory)
36+
lock (loki)
3737
{
38-
lock (loki)
38+
if (Volatile.Read(ref currentIndexedMemory) < MaxIndexedMemory&& !disposedValue)
3939
{
40+
4041
TotalMessageDispatched++;
4142

4243
if (writeLengthPrefix)
@@ -59,10 +60,11 @@ public bool TryEnqueueMessage(byte[] bytes)
5960
}
6061
public bool TryEnqueueMessage(byte[] bytes, int offset, int count)
6162
{
62-
if (Volatile.Read(ref currentIndexedMemory) < MaxIndexedMemory)
63+
lock (loki)
6364
{
64-
lock (loki)
65+
if (Volatile.Read(ref currentIndexedMemory) < MaxIndexedMemory && !disposedValue)
6566
{
67+
6668
TotalMessageDispatched++;
6769

6870
if (writeLengthPrefix)
@@ -84,14 +86,14 @@ public bool TryEnqueueMessage(byte[] bytes, int offset, int count)
8486
}
8587
public bool TryFlushQueue(ref byte[] buffer, int offset, out int amountWritten)
8688
{
87-
if (IsEmpty())
88-
{
89-
amountWritten = 0;
90-
return false;
91-
}
92-
9389
lock (loki)
9490
{
91+
if (IsEmpty())
92+
{
93+
amountWritten = 0;
94+
return false;
95+
}
96+
9597
var temp = writeStream;
9698
writeStream = flushStream;
9799
flushStream = temp;
@@ -110,21 +112,23 @@ public bool TryFlushQueue(ref byte[] buffer, int offset, out int amountWritten)
110112

111113
protected virtual void Dispose(bool disposing)
112114
{
113-
if (!disposedValue)
115+
lock (loki)
114116
{
115-
disposedValue = true;
116-
if (disposing)
117+
if (!disposedValue)
117118
{
118-
writeStream.Flush();
119-
flushStream.Flush();
120-
writeStream.Dispose();
121-
flushStream.Dispose();
119+
Volatile.Write(ref disposedValue, true);
120+
if (disposing)
121+
{
122+
writeStream.Flush();
123+
flushStream.Flush();
124+
writeStream.Dispose();
125+
flushStream.Dispose();
126+
}
122127
}
123128
}
129+
124130
}
125131

126-
127-
128132
public void Dispose()
129133
{
130134
Dispose(disposing: true);

NetworkLibrary/Components/MessageProcessor/Unmanaged/UnsafeDelimitedMessageWriter.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public unsafe bool ProcessMessage(byte[] message)
3434
{
3535
if (IsHoldingMessage)
3636
throw new InvalidOperationException("You can not process new message before heldover message is fully flushed");
37+
if (bufferInternal == null)
38+
return false;
3739
else if (bufferInternal.Length - offset >= 36)
3840
{
3941
fixed (byte* b = &bufferInternal[offset])

NetworkLibrary/Components/MessageProcessor/Unmanaged/UnsafePlainMessageWriter.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ public unsafe bool ProcessMessage(byte[] message)
3636

3737
if (bufferInternal.Length - offset >= message.Length)
3838
{
39-
// System.Buffer.BlockCopy(message, 0, bufferInternal, offset, message.Length);
40-
41-
4239
//System.Buffer.BlockCopy(message, 0, bufferInternal, offset, message.Length);
4340
fixed (byte* destination = &bufferInternal[offset])
4441
{

NetworkLibrary/Components/PooledMemoryStream.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public PooledMemoryStream(int minCapacity)
2121
{
2222
bufferInternal = BufferPool.RentBuffer(minCapacity);
2323
}
24-
24+
// write only memory?? :)
2525
public override bool CanRead => false;
2626

2727
public override bool CanSeek => false;

NetworkLibrary/Components/Statistics/TcpClientStatisticsPublisher.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ private void GetSessionStats()
2626

2727
}
2828

29-
3029
internal void GetStatistics(out TcpStatistics generalStats)
3130
{
3231
GetSessionStats();

NetworkLibrary/NetworkLibrary.csproj

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,17 @@
33
<PropertyGroup>
44
<TargetFramework>netstandard2.0</TargetFramework>
55
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
6+
<PackageId>Standard.Network.Library</PackageId>
7+
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
8+
<Title>Standard Network Library</Title>
9+
<Description>High performance extensible network library optimised for high trafic messages
10+
</Description>
11+
<PackageProjectUrl>https://github.com/ReferenceType/StandardNetworkLibrary</PackageProjectUrl>
12+
<RepositoryUrl>https://github.com/ReferenceType/StandardNetworkLibrary</RepositoryUrl>
13+
<Authors>RefrenceType</Authors>
14+
<PackageLicenseFile>Licence.txt</PackageLicenseFile>
15+
<Copyright>Apache 2.0</Copyright>
16+
<Version>1.0.2</Version>
617
</PropertyGroup>
718

819
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
@@ -26,6 +37,13 @@
2637
</AssemblyAttribute>
2738
</ItemGroup>
2839

40+
<ItemGroup>
41+
<None Include="..\..\Licence.txt">
42+
<Pack>True</Pack>
43+
<PackagePath>\</PackagePath>
44+
</None>
45+
</ItemGroup>
46+
2947

3048
<ItemGroup>
3149
<Resource Include="TCP\SSL\Certificates\client.pfx">

NetworkLibrary/TCP/Base/AsyncTcpClient.cs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,12 @@ namespace NetworkLibrary.TCP.Base
1616
{
1717
public class AsyncTpcClient : TcpClientBase,IDisposable
1818
{
19-
#region Fields & Props
20-
21-
private Socket clientSocket;
22-
private TaskCompletionSource<bool> connectedCompletionSource;
2319
protected bool connected = false;
2420
internal IAsyncSession session;
25-
private TcpClientStatisticsPublisher statisticsPublisher;
2621

27-
#endregion Fields & Props
22+
private TcpClientStatisticsPublisher statisticsPublisher;
23+
private Socket clientSocket;
24+
private TaskCompletionSource<bool> connectedCompletionSource;
2825

2926
public AsyncTpcClient() {}
3027

@@ -123,6 +120,7 @@ public override void SendAsync(byte[] buffer)
123120
if (connected)
124121
session?.SendAsync(buffer);
125122
}
123+
126124
public override void SendAsync(byte[] buffer, int offset, int count)
127125
{
128126
if (connected)
@@ -147,7 +145,7 @@ private void HandleError(SocketAsyncEventArgs e, string context)
147145

148146
public override void Disconnect()
149147
{
150-
// session will fire OnDisconnected;
148+
// session will fire OnDisconnected event of its own;
151149
session.EndSession();
152150
IsConnected = false;
153151
}
@@ -165,7 +163,6 @@ public void Dispose()
165163
catch { }
166164
}
167165

168-
//bufferManager?.Dispose();
169166
}
170167

171168
public override void GetStatistics(out TcpStatistics generalStats)

NetworkLibrary/TCP/Base/AsyncTcpServer.cs

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,17 @@ namespace NetworkLibrary.TCP.Base
1515
{
1616
public class AsyncTcpServer:TcpServerBase
1717
{
18-
19-
#region Fields & Props
20-
2118
public ClientAccepted OnClientAccepted;
2219
public BytesRecieved OnBytesReceived;
2320
public ClientDisconnected OnClientDisconnected;
2421
public ClientConnectionRequest OnClientAccepting = (socket) => true;
22+
public int SessionCount => Sessions.Count;
23+
public bool Stopping { get; private set; }
2524

26-
//public BufferProvider BufferManager { get; private set; }
2725
protected Socket ServerSocket;
28-
private TcpServerStatisticsPublisher statisticsPublisher;
29-
3026
protected ConcurrentDictionary<Guid, IAsyncSession> Sessions { get; } = new ConcurrentDictionary<Guid, IAsyncSession>();
3127

32-
public int SessionCount=>Sessions.Count;
33-
public bool Stopping { get; private set; }
34-
35-
#endregion
28+
private TcpServerStatisticsPublisher statisticsPublisher;
3629

3730
public AsyncTcpServer(int port = 20008)
3831
{
@@ -42,25 +35,20 @@ public AsyncTcpServer(int port = 20008)
4235
#region Start
4336
public override void StartServer()
4437
{
45-
//BufferManager = new BufferProvider(MaxClients, ClientSendBufsize, MaxClients, ClientReceiveBufsize);
46-
4738
ServerSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
4839
ServerSocket.NoDelay = NaggleNoDelay;
4940
ServerSocket.ReceiveBufferSize = ServerSockerReceiveBufferSize;
5041
ServerSocket.Bind(new IPEndPoint(IPAddress.Any, ServerPort));
5142
ServerSocket.Listen(10000);
5243

53-
5444
SocketAsyncEventArgs e = new SocketAsyncEventArgs();
5545
e.Completed += Accepted;
5646
if (!ServerSocket.AcceptAsync(e))
5747
{
5848
Accepted(null, e);
5949
}
60-
statisticsPublisher = new TcpServerStatisticsPublisher(Sessions);
61-
62-
6350

51+
statisticsPublisher = new TcpServerStatisticsPublisher(Sessions);
6452
}
6553

6654

@@ -71,6 +59,7 @@ private void Accepted(object sender, SocketAsyncEventArgs acceptedArg)
7159
{
7260
if (Stopping)
7361
return;
62+
7463
SocketAsyncEventArgs nextClient = new SocketAsyncEventArgs();
7564
nextClient.Completed += Accepted;
7665

@@ -85,29 +74,30 @@ private void Accepted(object sender, SocketAsyncEventArgs acceptedArg)
8574
return;
8675
}
8776

88-
if (!HandleConnectionRequest(acceptedArg))
77+
if (!IsConnectionAllowed(acceptedArg))
8978
{
79+
acceptedArg.Dispose();
9080
return;
9181
}
9282

9383
Guid clientGuid = Guid.NewGuid();
9484
var session = CreateSession(acceptedArg, clientGuid);
9585

96-
session.OnBytesRecieved += (sessionId, bytes, offset, count) => { HandleBytesRecieved(sessionId, bytes, offset, count); };
97-
session.OnSessionClosed += (sessionId) => { HandleDeadSession(sessionId); };
86+
session.OnBytesRecieved += HandleBytesRecieved;
87+
session.OnSessionClosed += HandleDeadSession;
9888

9989
Sessions.TryAdd(clientGuid, session);
10090

10191
string msg = "Accepted with port: " + ((IPEndPoint)acceptedArg.AcceptSocket.RemoteEndPoint).Port +
102-
" Ip: " + ((IPEndPoint)acceptedArg.AcceptSocket.RemoteEndPoint).Address.ToString();
92+
" Ip: " + ((IPEndPoint)acceptedArg.AcceptSocket.RemoteEndPoint).Address.ToString();
10393

10494
MiniLogger.Log(MiniLogger.LogLevel.Info, msg);
10595
session.StartSession();
10696

10797
HandleClientAccepted(clientGuid, acceptedArg);
10898
}
10999

110-
protected virtual bool HandleConnectionRequest(SocketAsyncEventArgs acceptArgs)
100+
protected virtual bool IsConnectionAllowed(SocketAsyncEventArgs acceptArgs)
111101
{
112102
return OnClientAccepting.Invoke(acceptArgs.AcceptSocket);
113103
}
@@ -152,14 +142,14 @@ public override void SendBytesToClient(in Guid id, byte[] bytes)
152142
if(Sessions.TryGetValue(id,out var session))
153143
session.SendAsync(bytes);
154144
}
145+
155146
public void SendBytesToClient(in Guid id, byte[] bytes, int offset, int count)
156147
{
157148
if (Sessions.TryGetValue(id, out var session))
158149
session.SendAsync(bytes,offset,count);
159150
}
160151

161-
162-
protected virtual void HandleBytesRecieved(in Guid guid, byte[] bytes, int offset, int count)
152+
protected virtual void HandleBytesRecieved(Guid guid, byte[] bytes, int offset, int count)
163153
{
164154
OnBytesReceived?.Invoke(in guid, bytes, offset, count);
165155
}

0 commit comments

Comments
 (0)