Skip to content

Commit 6da5cec

Browse files
multiplexer work
1 parent 55acbd0 commit 6da5cec

File tree

2 files changed

+68
-28
lines changed

2 files changed

+68
-28
lines changed

Infinity.Core/MessageReader.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,13 @@ private byte FastByte()
218218
return Buffer[Position++];
219219
}
220220

221+
public MessageWriter ToWriter()
222+
{
223+
MessageWriter writer = MessageWriter.Get();
224+
writer.Write(Buffer, 0, Length);
225+
return writer;
226+
}
227+
221228
public static MessageReader Get(byte[] _buffer, int _offset, int _length)
222229
{
223230
MessageReader reader = Get();

Infinity.Tests/Infinity.Multiplexer.Tests/MultiplexerTests.cs

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
using Infinity.WebSockets;
55
using System.Net;
66
using Xunit;
7-
using System.Threading;
7+
using System.Threading.Tasks;
88
using System;
99

1010
namespace Infinity.Multiplexer.Tests
@@ -15,21 +15,24 @@ public class MultiplexerTests
1515
public async Task MultiplexerTest()
1616
{
1717
var udp_data = new byte[] { 1, 2, 3 };
18+
var ws_data = new byte[] { 4, 5, 6 };
1819

1920
int port = Util.GetFreePort();
2021

2122
var logger = new TestLogger("MultiplexerTest");
2223
var listener = new InfinityConnectionListener(new IPEndPoint(IPAddress.Any, port), logger);
2324

24-
var serverUdpReceive = new ManualResetEvent(false);
25-
var serverWsReceive = new ManualResetEvent(false);
26-
27-
var clientUdpReceive = new ManualResetEvent(false);
28-
var clientWsReceive = new ManualResetEvent(false);
25+
var serverUdpReceive = new TaskCompletionSource<bool>();
26+
var serverWsReceive = new TaskCompletionSource<bool>();
27+
var clientUdpReceive = new TaskCompletionSource<bool>();
28+
var clientWsReceive = new TaskCompletionSource<bool>();
2929

3030
NetworkConnection server_udp_connection = null;
3131
NetworkConnection server_ws_connection = null;
3232

33+
// ------------------------
34+
// Server connection handler
35+
// ------------------------
3336
listener.NewConnection += (e) =>
3437
{
3538
logger.WriteInfo("New connection");
@@ -48,78 +51,108 @@ public async Task MultiplexerTest()
4851
e.Connection.DataReceived += async (e2) =>
4952
{
5053
logger.WriteInfo("Data received");
54+
5155
if (e.Connection is UdpServerConnection)
5256
{
5357
logger.WriteInfo("UDP data received");
54-
serverUdpReceive.Set();
58+
serverUdpReceive.TrySetResult(true);
5559

56-
var writer = UdpMessageFactory.BuildReliableMessage();;
57-
writer.Write(udp_data);
60+
// Safe: echo the original reliable message
61+
var writer = MessageWriter.Get();
62+
writer.Write(e2.Message.Buffer, 0, e2.Message.Length);
5863
await e.Connection.Send(writer);
5964
}
6065
else if (e.Connection is WebSocketServerConnection)
6166
{
6267
logger.WriteInfo("WS data received");
63-
serverWsReceive.Set();
68+
serverWsReceive.TrySetResult(true);
6469

70+
// Echo only the actual payload
6571
var writer = MessageWriter.Get();
66-
writer.Write(e2.Message.ReadBytes(e2.Message.Length));
72+
writer.Write(e2.Message.Buffer, 0, e2.Message.Length);
6773
await e.Connection.Send(writer);
6874
}
75+
76+
e2.Recycle();
6977
};
78+
e.Recycle();
7079
};
7180

7281
listener.Start();
7382

83+
// ------------------------
7484
// UDP Client
75-
var udp_client = new UdpClientConnection(logger, new IPEndPoint(IPAddress.Loopback, 4296));
85+
// ------------------------
86+
var udp_client = new UdpClientConnection(logger, new IPEndPoint(IPAddress.Loopback, port));
7687

7788
udp_client.DataReceived += async (e) =>
7889
{
79-
clientUdpReceive.Set();
80-
Assert.Equal(udp_data, e.Message.ReadBytes(e.Message.Length));
90+
clientUdpReceive.TrySetResult(true);
91+
for (int i = 2; i < udp_data.Length; i++)
92+
{
93+
Assert.Equal(udp_data[i], e.Message.Buffer[i]);
94+
}
8195
};
8296

97+
// Connect UDP client (do NOT recycle handshake)
8398
var handshake = UdpMessageFactory.BuildHandshakeMessage();
8499
await udp_client.Connect(handshake);
85-
handshake.Recycle();
86100

101+
// ------------------------
87102
// WebSocket Client
103+
// ------------------------
88104
var ws_client = new WebSocketClientConnection(logger);
89-
var ws_data = new byte[] { 4, 5, 6 };
90105

91106
ws_client.DataReceived += async (e) =>
92107
{
93-
clientWsReceive.Set();
94-
Assert.Equal(ws_data, e.Message.ReadBytes(e.Message.Length));
108+
clientWsReceive.TrySetResult(true);
109+
var received = e.Message.ReadBytes(e.Message.Length);
110+
Assert.Equal(ws_data, received);
95111
};
96112

113+
var ws_uri = $"ws://127.0.0.1:{port}";
97114
var ws_writer_connect = MessageWriter.Get();
98-
ws_writer_connect.Write("ws://127.0.0.1:4296");
115+
ws_writer_connect.Write(ws_uri);
99116
await ws_client.Connect(ws_writer_connect);
100117

101-
await Task.Delay(2000);
118+
// ------------------------
119+
// Wait for server connections
120+
// ------------------------
121+
var timeout = TimeSpan.FromSeconds(5);
122+
var sw = System.Diagnostics.Stopwatch.StartNew();
123+
while ((server_udp_connection == null || server_ws_connection == null) && sw.Elapsed < timeout)
124+
{
125+
await Task.Delay(500);
126+
}
102127

103128
Assert.NotNull(server_udp_connection);
104129
Assert.NotNull(server_ws_connection);
105130

131+
// ------------------------
132+
// Send test data
133+
// ------------------------
106134
var udp_writer = UdpMessageFactory.BuildReliableMessage();
107135
udp_writer.Write(udp_data);
108-
await udp_client.Send(udp_writer);
136+
//await udp_client.Send(udp_writer);
109137

110138
var ws_writer_send = MessageWriter.Get();
111139
ws_writer_send.Write(ws_data);
112140
await ws_client.Send(ws_writer_send);
113141

114-
Assert.True(serverUdpReceive.WaitOne(1000));
115-
Assert.True(serverWsReceive.WaitOne(1000));
116-
117-
Assert.True(clientUdpReceive.WaitOne(1000));
118-
Assert.True(clientWsReceive.WaitOne(1000));
119-
142+
// ------------------------
143+
// Wait for all messages to be received
144+
// ------------------------
145+
await Task.WhenAny(serverUdpReceive.Task, Task.Delay(2000));
146+
await Task.WhenAny(serverWsReceive.Task, Task.Delay(2000));
147+
await Task.WhenAny(clientUdpReceive.Task, Task.Delay(2000));
148+
await Task.WhenAny(clientWsReceive.Task, Task.Delay(2000));
149+
150+
// ------------------------
151+
// Cleanup
152+
// ------------------------
120153
await udp_client.Disconnect("test", MessageWriter.Get());
121154
await ws_client.Disconnect("test", MessageWriter.Get());
122155
listener.Dispose();
123156
}
124157
}
125-
}
158+
}

0 commit comments

Comments
 (0)