Skip to content

Commit d569f5e

Browse files
authored
fix(SimpleWebTransport): Guard Queue.TryDequeue behind UNITY_2022_3_OR_NEWER (#4105)
1 parent 162c052 commit d569f5e

File tree

8 files changed

+102
-31
lines changed

8 files changed

+102
-31
lines changed

Assets/Mirror/Transports/SimpleWeb/SimpleWeb/Client/SimpleWebClient.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,10 @@ public void ProcessMessageQueue(MonoBehaviour behaviour)
8484
onConnect?.Invoke();
8585
break;
8686
case EventType.Data:
87-
onData?.Invoke(next.data.ToSegment());
88-
next.data.Release();
87+
using (next.data) // auto release
88+
{
89+
onData?.Invoke(next.data.ToSegment());
90+
}
8991
break;
9092
case EventType.Disconnected:
9193
onDisconnect?.Invoke();

Assets/Mirror/Transports/SimpleWeb/SimpleWeb/Client/StandAlone/WebSocketClientStandAlone.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public override void Connect(Uri serverAddress)
3131
tcpConfig.ApplyTo(client);
3232

3333
// create connection object here so dispose correctly disconnects on failed connect
34-
conn = new Connection(client, AfterConnectionDisposed);
34+
conn = new Connection(client, AfterConnectionDisposed, onSendQueueFull: null, maxSendQueueSize: int.MaxValue);
3535

3636
Thread receiveThread = new Thread(() => ConnectAndReceiveLoop(serverAddress));
3737
receiveThread.IsBackground = true;
@@ -133,17 +133,15 @@ public override void Send(ReadOnlySpan<byte> span)
133133
ArrayBuffer buffer = bufferPool.Take(span.Length);
134134
buffer.CopyFrom(span);
135135

136-
conn.sendQueue.Enqueue(buffer);
137-
conn.sendPending.Set();
136+
conn.QueueSend(buffer);
138137
}
139138
#else
140139
public override void Send(ArraySegment<byte> segment)
141140
{
142141
ArrayBuffer buffer = bufferPool.Take(segment.Count);
143142
buffer.CopyFrom(segment);
144143

145-
conn.sendQueue.Enqueue(buffer);
146-
conn.sendPending.Set();
144+
conn.QueueSend(buffer);
147145
}
148146
#endif
149147
}

Assets/Mirror/Transports/SimpleWeb/SimpleWeb/Common/Connection.cs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,56 @@ internal sealed class Connection : IDisposable
3131
public Thread receiveThread;
3232
public Thread sendThread;
3333

34-
public ManualResetEventSlim sendPending = new ManualResetEventSlim(false);
35-
public ConcurrentQueue<ArrayBuffer> sendQueue = new ConcurrentQueue<ArrayBuffer>();
34+
ManualResetEventSlim sendPending = new ManualResetEventSlim(false);
35+
ConcurrentQueue<ArrayBuffer> sendQueue = new ConcurrentQueue<ArrayBuffer>();
3636

3737
public Action<Connection> onDispose;
3838
volatile internal bool hasDisposed;
3939

40-
public Connection(TcpClient client, Action<Connection> onDispose)
40+
Action<Connection> onSendQueueFull;
41+
readonly int maxSendQueueSize;
42+
43+
public Connection(TcpClient client, Action<Connection> onDispose, Action<Connection> onSendQueueFull, int maxSendQueueSize)
4144
{
4245
this.client = client ?? throw new ArgumentNullException(nameof(client));
4346
this.onDispose = onDispose;
47+
this.onSendQueueFull = onSendQueueFull;
48+
this.maxSendQueueSize = maxSendQueueSize;
49+
}
50+
51+
public void QueueSend(ArrayBuffer buffer)
52+
{
53+
bool queueFull = false;
54+
lock (disposedLock)
55+
{
56+
if (hasDisposed)
57+
{
58+
Log.Warn($"[SWT-Connection]: Message sent to id={connId} after it was disposed");
59+
buffer.Release();
60+
}
61+
else if (sendQueue.Count >= maxSendQueueSize)
62+
{
63+
queueFull = true;
64+
buffer.Release();
65+
}
66+
else
67+
{
68+
sendQueue.Enqueue(buffer);
69+
sendPending.Set();
70+
}
71+
}
72+
73+
if (queueFull)
74+
{
75+
Log.Warn($"[SWT-Connection]: Send queue was over {maxSendQueueSize} for {ToString()}, kicking connection.");
76+
onSendQueueFull?.Invoke(this);
77+
Dispose();
78+
}
79+
}
80+
81+
public (ManualResetEventSlim sendPending, ConcurrentQueue<ArrayBuffer> sendQueue) GetSendQueue()
82+
{
83+
return (sendPending, sendQueue);
4484
}
4585

4686
/// <summary>

Assets/Mirror/Transports/SimpleWeb/SimpleWeb/Common/ReceiveLoop.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public static void Loop(Config config)
5252

5353
Profiler.BeginThreadProfiling("SimpleWeb", $"ReceiveLoop {conn.connId}");
5454

55+
Queue<ArrayBuffer> fragments = new Queue<ArrayBuffer>(); // create once to avoid allocation each time
5556
byte[] readBuffer = new byte[Constants.HeaderSize + (expectMask ? Constants.MaskSize : 0) + maxMessageSize];
5657
try
5758
{
@@ -60,7 +61,7 @@ public static void Loop(Config config)
6061
TcpClient client = conn.client;
6162

6263
while (client.Connected)
63-
ReadOneMessage(config, readBuffer);
64+
ReadOneMessage(config, readBuffer, fragments);
6465

6566
Log.Verbose("[SWT-ReceiveLoop]: {0} Not Connected", conn);
6667
}
@@ -100,10 +101,22 @@ public static void Loop(Config config)
100101
finally
101102
{
102103
Profiler.EndThreadProfiling();
104+
105+
// release any unprocessed fragments in case ReadOneMessage throws mid loop
106+
#if UNITY_2022_3_OR_NEWER
107+
while (fragments.TryDequeue(out ArrayBuffer buffer))
108+
buffer.Release();
109+
#else
110+
while (fragments.Count > 0)
111+
{
112+
ArrayBuffer buffer = fragments.Dequeue();
113+
buffer.Release();
114+
}
115+
#endif
103116
}
104117
}
105118

106-
static void ReadOneMessage(Config config, byte[] buffer)
119+
static void ReadOneMessage(Config config, byte[] buffer, Queue<ArrayBuffer> fragments)
107120
{
108121
(Connection conn, int maxMessageSize, bool expectMask, ConcurrentQueue<Message> queue, BufferPool bufferPool) = config;
109122
Stream stream = conn.stream;
@@ -127,8 +140,6 @@ static void ReadOneMessage(Config config, byte[] buffer)
127140
}
128141
else
129142
{
130-
// todo cache this to avoid allocations
131-
Queue<ArrayBuffer> fragments = new Queue<ArrayBuffer>();
132143
fragments.Enqueue(CopyMessageToBuffer(bufferPool, expectMask, buffer, msgOffset, header.payloadLength));
133144
int totalSize = header.payloadLength;
134145

Assets/Mirror/Transports/SimpleWeb/SimpleWeb/Common/SendLoop.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Concurrent;
23
using System.IO;
34
using System.Net.Sockets;
45
using System.Security.Cryptography;
@@ -53,26 +54,28 @@ public static void Loop(Config config)
5354
if (client == null)
5455
return;
5556

57+
(ManualResetEventSlim sendPending, ConcurrentQueue<ArrayBuffer> sendQueue) = conn.GetSendQueue();
58+
5659
while (client.Connected)
5760
{
5861
// wait for message
59-
conn.sendPending.Wait();
62+
sendPending.Wait();
6063
// wait for 1ms for mirror to send other messages
6164
if (SendLoopConfig.sleepBeforeSend)
6265
Thread.Sleep(1);
6366

64-
conn.sendPending.Reset();
67+
sendPending.Reset();
6568

6669
if (SendLoopConfig.batchSend)
6770
{
6871
int offset = 0;
69-
while (conn.sendQueue.TryDequeue(out ArrayBuffer msg))
72+
while (sendQueue.TryDequeue(out ArrayBuffer msg))
7073
{
74+
using ArrayBuffer _ = msg; // auto release
7175
// check if connected before sending message
7276
if (!client.Connected)
7377
{
7478
Log.Verbose("[SWT-SendLoop]: SendLoop {0} not connected", conn);
75-
msg.Release();
7679
return;
7780
}
7881

@@ -86,7 +89,6 @@ public static void Loop(Config config)
8689
}
8790

8891
offset = SendMessage(writeBuffer, offset, msg, setMask, maskHelper);
89-
msg.Release();
9092
}
9193

9294
// after no message in queue, send remaining messages
@@ -96,19 +98,18 @@ public static void Loop(Config config)
9698
}
9799
else
98100
{
99-
while (conn.sendQueue.TryDequeue(out ArrayBuffer msg))
101+
while (sendQueue.TryDequeue(out ArrayBuffer msg))
100102
{
103+
using ArrayBuffer _ = msg; // auto release
101104
// check if connected before sending message
102105
if (!client.Connected)
103106
{
104107
Log.Verbose("[SWT-SendLoop]: SendLoop {0} not connected", conn);
105-
msg.Release();
106108
return;
107109
}
108110

109111
int length = SendMessage(writeBuffer, 0, msg, setMask, maskHelper);
110112
stream.Write(writeBuffer, 0, length);
111-
msg.Release();
112113
}
113114
}
114115
}

Assets/Mirror/Transports/SimpleWeb/SimpleWeb/Server/SimpleWebServer.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ public class SimpleWebServer
1717

1818
public bool Active { get; private set; }
1919

20-
public SimpleWebServer(int maxMessagesPerTick, TcpConfig tcpConfig, int maxMessageSize, int handshakeMaxSize, SslConfig sslConfig)
20+
public SimpleWebServer(int maxMessagesPerTick, TcpConfig tcpConfig, int maxMessageSize, int handshakeMaxSize, SslConfig sslConfig, int maxSendQueueSize = 10000)
2121
{
2222
this.maxMessagesPerTick = maxMessagesPerTick;
2323
// use max because bufferpool is used for both messages and handshake
2424
int max = Math.Max(maxMessageSize, handshakeMaxSize);
2525
bufferPool = new BufferPool(5, 20, max);
26-
server = new WebSocketServer(tcpConfig, maxMessageSize, handshakeMaxSize, sslConfig, bufferPool);
26+
server = new WebSocketServer(tcpConfig, maxMessageSize, handshakeMaxSize, sslConfig, bufferPool, maxSendQueueSize);
2727
}
2828

2929
public void Start(ushort port)
@@ -40,6 +40,9 @@ public void Stop()
4040

4141
public void SendAll(List<int> connectionIds, ArraySegment<byte> source)
4242
{
43+
if (connectionIds.Count == 0)
44+
return;
45+
4346
ArrayBuffer buffer = bufferPool.Take(source.Count);
4447
buffer.CopyFrom(source);
4548
buffer.SetReleasesRequired(connectionIds.Count);
@@ -89,8 +92,10 @@ public void ProcessMessageQueue(MonoBehaviour behaviour)
8992
onConnect?.Invoke(next.connId, GetClientAddress(next.connId));
9093
break;
9194
case EventType.Data:
92-
onData?.Invoke(next.connId, next.data.ToSegment());
93-
next.data.Release();
95+
using (next.data)
96+
{
97+
onData?.Invoke(next.connId, next.data.ToSegment());
98+
}
9499
break;
95100
case EventType.Disconnected:
96101
onDisconnect?.Invoke(next.connId);

Assets/Mirror/Transports/SimpleWeb/SimpleWeb/Server/WebSocketServer.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,20 @@ public class WebSocketServer
2020
readonly ServerSslHelper sslHelper;
2121
readonly BufferPool bufferPool;
2222
readonly ConcurrentDictionary<int, Connection> connections = new ConcurrentDictionary<int, Connection>();
23+
readonly int maxSendQueueSize;
2324

2425
int _idCounter = 0;
2526

26-
public WebSocketServer(TcpConfig tcpConfig, int maxMessageSize, int handshakeMaxSize, SslConfig sslConfig, BufferPool bufferPool)
27+
public WebSocketServer(TcpConfig tcpConfig, int maxMessageSize, int handshakeMaxSize, SslConfig sslConfig, BufferPool bufferPool, int maxSendQueueSize = 10000)
2728
{
29+
if (maxSendQueueSize <= 0)
30+
throw new ArgumentException("maxSendQueueSize must be greater than 0", nameof(maxSendQueueSize));
2831
this.tcpConfig = tcpConfig;
2932
this.maxMessageSize = maxMessageSize;
3033
sslHelper = new ServerSslHelper(sslConfig);
3134
this.bufferPool = bufferPool;
3235
handShake = new ServerHandshake(this.bufferPool, handshakeMaxSize);
36+
this.maxSendQueueSize = maxSendQueueSize;
3337
}
3438

3539
public void Listen(int port)
@@ -77,7 +81,7 @@ void acceptLoop()
7781
// TODO keep track of connections before they are in connections dictionary
7882
// this might not be a problem as HandshakeAndReceiveLoop checks for stop
7983
// and returns/disposes before sending message to queue
80-
Connection conn = new Connection(client, AfterConnectionDisposed);
84+
Connection conn = new Connection(client, AfterConnectionDisposed, SendQueueFull, maxSendQueueSize);
8185
Log.Verbose("[SWT-WebSocketServer]: A client connected from {0}", conn);
8286

8387
// handshake needs its own thread as it needs to wait for message from client
@@ -168,6 +172,11 @@ void HandshakeAndReceiveLoop(Connection conn)
168172
}
169173
}
170174

175+
void SendQueueFull(Connection conn)
176+
{
177+
receiveQueue.Enqueue(new Message(conn.connId, new Exception("Send Queue Full")));
178+
}
179+
171180
void AfterConnectionDisposed(Connection conn)
172181
{
173182
if (conn.connId != Connection.IdNotSet)
@@ -187,11 +196,13 @@ public void Send(int id, ArrayBuffer buffer)
187196
{
188197
if (connections.TryGetValue(id, out Connection conn))
189198
{
190-
conn.sendQueue.Enqueue(buffer);
191-
conn.sendPending.Set();
199+
conn.QueueSend(buffer);
192200
}
193201
else
202+
{
194203
Log.Warn("[SWT-WebSocketServer]: Send: cannot send message to {0} because it was not found in dictionary. Maybe it disconnected.", id);
204+
buffer.Release();
205+
}
195206
}
196207

197208
public void CloseConnection(int id)

Assets/Mirror/Transports/SimpleWeb/SimpleWebTransport.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ public class SimpleWebTransport : Transport, PortTransport
3131
[Tooltip("Caps the number of messages the client will process per tick. Allows LateUpdate to finish to let the reset of unity continue in case more messages arrive before they are processed")]
3232
public int clientMaxMsgsPerTick = 1000;
3333

34+
[Tooltip("Maximum number of messages that can be in the send queue before the connection is closed. This prevents slow connections from using too much memory on the server.")]
35+
public int maxSendQueueSize = 1000;
36+
3437
[Tooltip("Send would stall forever if the network is cut off during a send, so we need a timeout (in milliseconds)")]
3538
public int sendTimeout = 5000;
3639

@@ -293,7 +296,7 @@ public override void ServerStart()
293296
Log.Warn("[SWT-ServerStart]: Server Already Started");
294297

295298
SslConfig config = SslConfigLoader.Load(sslEnabled, sslCertJson, sslProtocols);
296-
server = new SimpleWebServer(serverMaxMsgsPerTick, TcpConfig, maxMessageSize, maxHandshakeSize, config);
299+
server = new SimpleWebServer(serverMaxMsgsPerTick, TcpConfig, maxMessageSize, maxHandshakeSize, config, maxSendQueueSize);
297300

298301
server.onConnect += OnServerConnectedWithAddress.Invoke;
299302
server.onDisconnect += OnServerDisconnected.Invoke;

0 commit comments

Comments
 (0)