Skip to content

Commit 7cbdeb9

Browse files
the stress tests will never work properly with testing frameworks. we will bring back the test listener , but in the meantime i've fixed the problem with the messages
1 parent a3926cb commit 7cbdeb9

File tree

11 files changed

+82
-85
lines changed

11 files changed

+82
-85
lines changed

Infinity.Core/NetworkConnectionListener.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Infinity.Core
44
{
5-
public delegate bool HandshakeCheck(IPEndPoint _endPoint, MessageReader _reader, out byte[] _response);
5+
public delegate bool HandshakeCheck(IPEndPoint _endPoint, MessageReader _reader, out MessageWriter _response);
66

77
public abstract class NetworkConnectionListener : IDisposable
88
{

Infinity.Tests/Infinity.Udp.Tests/NoConnectionUdpConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public async Task Test_Receive(MessageWriter msg)
3737
await HandleReceive(data, data.Length);
3838
}
3939

40-
public override async Task WriteBytesToConnection(MessageWriter _writer)
40+
public override async Task WriteBytesToConnection(MessageWriter _writer, bool _recycle_writer)
4141
{
4242
BytesSent.Add(MessageReader.Get(_writer.Buffer, 0, _writer.Length));
4343
}

Infinity.Tests/Infinity.Udp.Tests/StressTests.cs

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -85,18 +85,18 @@ public async Task StressOpeningConnections()
8585
{
8686
Console.WriteLine("StressOpeningConnections");
8787

88-
int connections_to_test = 100;
88+
int connections_to_test = 10;
8989
int port = Util.GetFreePort();
9090
var ep = new IPEndPoint(IPAddress.Loopback, port);
91-
ConcurrentStack<UdpClientConnection> connections = new ConcurrentStack<UdpClientConnection>();
91+
List<UdpClientConnection> connections = new List<UdpClientConnection>();
9292
int con_count = 0;
93-
var allConnectedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
93+
ManualResetEvent mutex = new ManualResetEvent(false);
9494

9595
using (var listener = new UdpConnectionListener(ep))
9696
{
9797
listener.NewConnection += obj =>
9898
{
99-
Interlocked.Increment(ref con_count);
99+
con_count++;
100100

101101
obj.Connection.DataReceived += data_args =>
102102
{
@@ -109,17 +109,16 @@ public async Task StressOpeningConnections()
109109
};
110110

111111
obj.Recycle();
112-
112+
Console.WriteLine(con_count);
113113
if (con_count == connections_to_test)
114114
{
115-
allConnectedTcs.TrySetResult();
115+
mutex.Set();
116116
}
117117
};
118118

119119
listener.Start();
120120

121121
// Launch all client connections
122-
var tasks = new List<Task>();
123122
for (int i = 0; i < connections_to_test; i++)
124123
{
125124
var handshake = UdpMessageFactory.BuildHandshakeMessage();
@@ -129,16 +128,16 @@ public async Task StressOpeningConnections()
129128
connection.DataReceived += data_args => data_args.Recycle();
130129
connection.Disconnected += e => e.Recycle();
131130

132-
tasks.Add(connection.Connect(handshake));
133-
connections.Push(connection);
131+
await connection.Connect(handshake);
132+
connections.Add(connection);
134133

135134
// same problem as with stress test reliable messages
136-
await Task.Delay(1);
135+
await Task.Delay(100);
137136
}
138137

139138
// Wait for all clients to finish connecting
140-
var completed = await Task.WhenAny(allConnectedTcs.Task, Task.Delay(10000));
141-
Assert.True(allConnectedTcs.Task.IsCompleted, "Not all connections established in time.");
139+
mutex.WaitOne(5000);
140+
Assert.Equal(connections_to_test, con_count);
142141

143142
// Verify listener connection count
144143
Assert.Equal(connections_to_test, listener.ConnectionCount);
@@ -164,15 +163,19 @@ public async Task StressReliableMessages()
164163

165164
int count = 0;
166165

167-
int messages_to_try = 100;
166+
int messages_to_try = 10000;
168167

169168
var mutex = new ManualResetEvent(false);
170169

170+
UdpServerConnection server_connection = null;
171+
171172
using (UdpConnectionListener listener = new UdpConnectionListener(ep))
172173
using (UdpConnection connection = new UdpClientConnection(new TestLogger("Client"), ep))
173174
{
174175
listener.NewConnection += (evt) =>
175176
{
177+
server_connection = (UdpServerConnection)evt.Connection;
178+
176179
evt.Connection.Disconnected += delegate (DisconnectedEvent obj)
177180
{
178181
obj.Recycle();
@@ -185,17 +188,6 @@ public async Task StressReliableMessages()
185188
{
186189
mutex.Set();
187190
sw.Stop();
188-
Console.WriteLine("Readers: " + Core.Pools.ReaderPool.InUse.ToString());
189-
Console.WriteLine("Packets: " + Infinity.Udp.Pools.PacketPool.InUse.ToString());
190-
Console.WriteLine("Fragmented: " + Infinity.Udp.Pools.FragmentedMessagePool.InUse.ToString());
191-
Console.WriteLine("Writers: " + Core.Pools.WriterPool.InUse.ToString());
192-
193-
Console.WriteLine("DataReceived: " + Core.Pools.DataReceivedEventPool.InUse.ToString());
194-
Console.WriteLine("Disconnected: " + Core.Pools.DisconnectedEventPool.InUse.ToString());
195-
Console.WriteLine("NewConnection: " + Core.Pools.NewConnectionPool.InUse.ToString());
196-
197-
Console.WriteLine("Server packets: " + ((UdpConnection)(obj.Connection)).reliable_data_packets_sent.Count);
198-
Console.WriteLine("Client packets: " + ((UdpConnection)connection).reliable_data_packets_sent.Count);
199191
}
200192
obj.Recycle();
201193
};
@@ -220,16 +212,28 @@ public async Task StressReliableMessages()
220212
var message = UdpMessageFactory.BuildReliableMessage();
221213
message.Write(123);
222214

223-
_ = connection.Send(message);
215+
await connection.Send(message);
224216

225217
// if we dont have this delay something weird happens and the packets and readers are not recycled, need to figure stuff out
226-
await Task.Delay(1);
218+
//await Task.Delay(1);
227219
}
228220

229-
mutex.WaitOne(2000);
221+
mutex.WaitOne(10000);
230222
Assert.Equal(messages_to_try, count);
231-
await Task.Delay(1000);
223+
await Task.Delay(10000);
232224
Console.WriteLine($"StressReliableMessages took {sw.ElapsedMilliseconds}ms");
225+
226+
Console.WriteLine("Readers: " + Core.Pools.ReaderPool.InUse.ToString());
227+
Console.WriteLine("Packets: " + Infinity.Udp.Pools.PacketPool.InUse.ToString());
228+
Console.WriteLine("Fragmented: " + Infinity.Udp.Pools.FragmentedMessagePool.InUse.ToString());
229+
Console.WriteLine("Writers: " + Core.Pools.WriterPool.InUse.ToString());
230+
231+
Console.WriteLine("DataReceived: " + Core.Pools.DataReceivedEventPool.InUse.ToString());
232+
Console.WriteLine("Disconnected: " + Core.Pools.DisconnectedEventPool.InUse.ToString());
233+
Console.WriteLine("NewConnection: " + Core.Pools.NewConnectionPool.InUse.ToString());
234+
235+
Console.WriteLine("Server packets: " + server_connection.reliable_data_packets_sent.Count);
236+
Console.WriteLine("Client packets: " + ((UdpConnection)connection).reliable_data_packets_sent.Count);
233237
}
234238
}
235239

Infinity.Tests/Infinity.Udp.Tests/UdpConnectionTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ public async Task UdpHandshakeTest()
175175
// Use TaskCompletionSource for async signaling
176176
var tcs = new TaskCompletionSource();
177177

178-
listener.HandshakeConnection = (IPEndPoint endPoint, MessageReader input, out byte[] response) =>
178+
listener.HandshakeConnection = (IPEndPoint endPoint, MessageReader input, out MessageWriter response) =>
179179
{
180180
response = null;
181181

Infinity.Udp/Fragmented/UdpConnection.Fragmented.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ private async Task FragmentedSend(MessageWriter _writer)
4040
await Task.Delay(10);
4141
}
4242

43+
_writer.Recycle();
44+
4345
if (last_fragment_id_allocated >= byte.MaxValue)
4446
{
4547
Interlocked.Exchange(ref last_fragment_id_allocated, 0);

Infinity.Udp/Reliable/UdpConnection.Reliable.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,8 @@ internal async Task<int> ManageReliablePackets()
3535
protected async Task ReliableSend(MessageWriter _writer, Action _ack_callback = null)
3636
{
3737
AttachReliableID(_writer, 1, _ack_callback);
38-
await WriteBytesToConnection(_writer).ConfigureAwait(false);
3938
Statistics.LogReliableMessageSent(_writer.Length);
40-
_writer.Recycle();
39+
await WriteBytesToConnection(_writer).ConfigureAwait(false);
4140
}
4241

4342
private async Task ReliableMessageReceive(MessageReader _reader)

Infinity.Udp/Reliable/UdpPacket.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ public async Task<int> Resend()
9595

9696
try
9797
{
98-
await connection.WriteBytesToConnection(writer).ConfigureAwait(false);
9998
connection.Statistics.LogMessageResent(writer.Length);
99+
await connection.WriteBytesToConnection(writer, false).ConfigureAwait(false);
100100

101101
return 1;
102102
}

Infinity.Udp/UdpClientConnection.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ public sealed class UdpClientConnection : UdpConnection
1313

1414
private ManualResetEvent connect_wait_lock = new ManualResetEvent(false);
1515

16-
private CancellationTokenSource cancellation_token_source = new CancellationTokenSource();
17-
1816
private readonly Channel<MessageReader> _incoming = Channel.CreateUnbounded<MessageReader>();
1917

2018
public UdpClientConnection(ILogger _logger, IPEndPoint _remote_end_point, IPMode _ip_mode = IPMode.IPv4)
@@ -71,7 +69,7 @@ private Socket CreateSocket(IPMode _ip_mode)
7169
Dispose(true);
7270
}
7371

74-
public override async Task WriteBytesToConnection(MessageWriter _writer)
72+
public override async Task WriteBytesToConnection(MessageWriter _writer, bool _recycle_writer = true)
7573
{
7674
#if DEBUG
7775
if (TestLagMs > 0)
@@ -84,6 +82,11 @@ public override async Task WriteBytesToConnection(MessageWriter _writer)
8482
{
8583
await WriteBytesToConnectionReal(_writer.Buffer, _writer.Length).ConfigureAwait(false);
8684
}
85+
86+
if (_recycle_writer)
87+
{
88+
_writer.Recycle();
89+
}
8790
}
8891

8992
public override void WriteBytesToConnectionSync(MessageWriter _writer)
@@ -282,8 +285,6 @@ protected override void DisconnectRemote(string _reason, MessageReader _reader)
282285
InvokeDisconnected(_reason, _reader);
283286
}
284287

285-
writer.Recycle();
286-
287288
Dispose();
288289
}
289290

@@ -340,7 +341,6 @@ protected override void Dispose(bool _disposing)
340341
// send disconnect packet to server
341342
var writer = UdpMessageFactory.BuildDisconnectMessage();
342343
SendDisconnect(writer);
343-
writer.Recycle();
344344

345345
// Fire client-side Disconnected event
346346
InvokeDisconnected("Disposed", null);

Infinity.Udp/UdpConnection.cs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System.Threading.Tasks;
1+
using System.Threading.Channels;
22
using Infinity.Core;
33
using Infinity.Core.Exceptions;
44

@@ -20,13 +20,18 @@ public UdpConnectionConfiguration Configuration
2020

2121
internal UdpConnectionConfiguration configuration;
2222

23+
private readonly Channel<MessageWriter> _outgoing = Channel.CreateUnbounded<MessageWriter>();
24+
internal CancellationTokenSource cancellation_token_source = new CancellationTokenSource();
25+
2326
public UdpConnection(ILogger _logger) : base()
2427
{
2528
configuration = new UdpConnectionConfiguration();
2629
logger = _logger;
30+
31+
Util.FireAndForget(SendInternal(), logger);
2732
}
2833

29-
public abstract Task WriteBytesToConnection(MessageWriter _writer);
34+
public abstract Task WriteBytesToConnection(MessageWriter _writer, bool _recycle_writer = true);
3035
public abstract void WriteBytesToConnectionSync(MessageWriter _writer);
3136

3237
protected abstract Task ShareConfiguration();
@@ -46,11 +51,17 @@ public SendErrors SendSync(MessageWriter _writer)
4651

4752
public override async Task<SendErrors> Send(MessageWriter _writer)
4853
{
49-
try
54+
await _outgoing.Writer.WriteAsync(_writer, cancellation_token_source.Token);
55+
return SendErrors.None;
56+
}
57+
58+
public async Task SendInternal()
59+
{
60+
await foreach (var _writer in _outgoing.Reader.ReadAllAsync(cancellation_token_source.Token).ConfigureAwait(false))
5061
{
5162
if (state != ConnectionState.Connected)
5263
{
53-
return SendErrors.Disconnected;
64+
return;
5465
}
5566

5667
InvokeBeforeSend(_writer);
@@ -65,7 +76,6 @@ public override async Task<SendErrors> Send(MessageWriter _writer)
6576
}
6677

6778
await ReliableSend(_writer).ConfigureAwait(false);
68-
_writer.Recycle();
6979

7080
break;
7181
}
@@ -78,7 +88,6 @@ public override async Task<SendErrors> Send(MessageWriter _writer)
7888

7989
await OrderedSend(_writer).ConfigureAwait(false);
8090
Statistics.LogReliableMessageSent(_writer.Length);
81-
_writer.Recycle();
8291

8392
break;
8493
}
@@ -92,7 +101,6 @@ public override async Task<SendErrors> Send(MessageWriter _writer)
92101
}
93102

94103
await FragmentedSend(_writer).ConfigureAwait(false);
95-
_writer.Recycle();
96104

97105
Statistics.LogFragmentedMessageSent(_writer.Length);
98106
}
@@ -105,21 +113,13 @@ public override async Task<SendErrors> Send(MessageWriter _writer)
105113
}
106114
default: // applies to disconnect and unreliable
107115
{
108-
await WriteBytesToConnection(_writer).ConfigureAwait(false);
109-
_writer.Recycle();
110-
111116
Statistics.LogUnreliableMessageSent(_writer.Length);
117+
await WriteBytesToConnection(_writer).ConfigureAwait(false);
112118

113119
break;
114120
}
115121
}
116122
}
117-
finally
118-
{
119-
_writer.Recycle();
120-
}
121-
122-
return SendErrors.None;
123123
}
124124

125125
protected internal virtual async Task HandleReceive(MessageReader _reader, int _bytes_received)

0 commit comments

Comments
 (0)