Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ public void ProcessMessageQueue(MonoBehaviour behaviour)
onConnect?.Invoke();
break;
case EventType.Data:
onData?.Invoke(next.data.ToSegment());
next.data.Release();
using (next.data) // auto release
{
onData?.Invoke(next.data.ToSegment());
}
break;
case EventType.Disconnected:
onDisconnect?.Invoke();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public override void Connect(Uri serverAddress)
tcpConfig.ApplyTo(client);

// create connection object here so dispose correctly disconnects on failed connect
conn = new Connection(client, AfterConnectionDisposed);
conn = new Connection(client, AfterConnectionDisposed, onSendQueueFull: null, maxSendQueueSize: int.MaxValue);

Thread receiveThread = new Thread(() => ConnectAndReceiveLoop(serverAddress));
receiveThread.IsBackground = true;
Expand Down Expand Up @@ -133,17 +133,15 @@ public override void Send(ReadOnlySpan<byte> span)
ArrayBuffer buffer = bufferPool.Take(span.Length);
buffer.CopyFrom(span);

conn.sendQueue.Enqueue(buffer);
conn.sendPending.Set();
conn.QueueSend(buffer);
}
#else
public override void Send(ArraySegment<byte> segment)
{
ArrayBuffer buffer = bufferPool.Take(segment.Count);
buffer.CopyFrom(segment);

conn.sendQueue.Enqueue(buffer);
conn.sendPending.Set();
conn.QueueSend(buffer);
}
#endif
}
Expand Down
46 changes: 43 additions & 3 deletions Assets/Mirror/Transports/SimpleWeb/SimpleWeb/Common/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,56 @@ internal sealed class Connection : IDisposable
public Thread receiveThread;
public Thread sendThread;

public ManualResetEventSlim sendPending = new ManualResetEventSlim(false);
public ConcurrentQueue<ArrayBuffer> sendQueue = new ConcurrentQueue<ArrayBuffer>();
ManualResetEventSlim sendPending = new ManualResetEventSlim(false);
ConcurrentQueue<ArrayBuffer> sendQueue = new ConcurrentQueue<ArrayBuffer>();

public Action<Connection> onDispose;
volatile internal bool hasDisposed;

public Connection(TcpClient client, Action<Connection> onDispose)
Action<Connection> onSendQueueFull;
readonly int maxSendQueueSize;

public Connection(TcpClient client, Action<Connection> onDispose, Action<Connection> onSendQueueFull, int maxSendQueueSize)
{
this.client = client ?? throw new ArgumentNullException(nameof(client));
this.onDispose = onDispose;
this.onSendQueueFull = onSendQueueFull;
this.maxSendQueueSize = maxSendQueueSize;
}

public void QueueSend(ArrayBuffer buffer)
{
bool queueFull = false;
lock (disposedLock)
{
if (hasDisposed)
{
Log.Warn($"[SWT-Connection]: Message sent to id={connId} after it was disposed");
buffer.Release();
}
else if (sendQueue.Count >= maxSendQueueSize)
{
queueFull = true;
buffer.Release();
}
else
{
sendQueue.Enqueue(buffer);
sendPending.Set();
}
}

if (queueFull)
{
Log.Warn($"[SWT-Connection]: Send queue was over {maxSendQueueSize} for {ToString()}, kicking connection.");
onSendQueueFull?.Invoke(this);
Dispose();
}
}

public (ManualResetEventSlim sendPending, ConcurrentQueue<ArrayBuffer> sendQueue) GetSendQueue()
{
return (sendPending, sendQueue);
}

/// <summary>
Expand Down
19 changes: 15 additions & 4 deletions Assets/Mirror/Transports/SimpleWeb/SimpleWeb/Common/ReceiveLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public static void Loop(Config config)

Profiler.BeginThreadProfiling("SimpleWeb", $"ReceiveLoop {conn.connId}");

Queue<ArrayBuffer> fragments = new Queue<ArrayBuffer>(); // create once to avoid allocation each time
byte[] readBuffer = new byte[Constants.HeaderSize + (expectMask ? Constants.MaskSize : 0) + maxMessageSize];
try
{
Expand All @@ -60,7 +61,7 @@ public static void Loop(Config config)
TcpClient client = conn.client;

while (client.Connected)
ReadOneMessage(config, readBuffer);
ReadOneMessage(config, readBuffer, fragments);

Log.Verbose("[SWT-ReceiveLoop]: {0} Not Connected", conn);
}
Expand Down Expand Up @@ -100,10 +101,22 @@ public static void Loop(Config config)
finally
{
Profiler.EndThreadProfiling();

// release any unprocessed fragments in case ReadOneMessage throws mid loop
#if UNITY_2022_3_OR_NEWER
while (fragments.TryDequeue(out ArrayBuffer buffer))
buffer.Release();
#else
while (fragments.Count > 0)
{
ArrayBuffer buffer = fragments.Dequeue();
buffer.Release();
}
#endif
}
}

static void ReadOneMessage(Config config, byte[] buffer)
static void ReadOneMessage(Config config, byte[] buffer, Queue<ArrayBuffer> fragments)
{
(Connection conn, int maxMessageSize, bool expectMask, ConcurrentQueue<Message> queue, BufferPool bufferPool) = config;
Stream stream = conn.stream;
Expand All @@ -127,8 +140,6 @@ static void ReadOneMessage(Config config, byte[] buffer)
}
else
{
// todo cache this to avoid allocations
Queue<ArrayBuffer> fragments = new Queue<ArrayBuffer>();
fragments.Enqueue(CopyMessageToBuffer(bufferPool, expectMask, buffer, msgOffset, header.payloadLength));
int totalSize = header.payloadLength;

Expand Down
17 changes: 9 additions & 8 deletions Assets/Mirror/Transports/SimpleWeb/SimpleWeb/Common/SendLoop.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net.Sockets;
using System.Security.Cryptography;
Expand Down Expand Up @@ -53,26 +54,28 @@ public static void Loop(Config config)
if (client == null)
return;

(ManualResetEventSlim sendPending, ConcurrentQueue<ArrayBuffer> sendQueue) = conn.GetSendQueue();

while (client.Connected)
{
// wait for message
conn.sendPending.Wait();
sendPending.Wait();
// wait for 1ms for mirror to send other messages
if (SendLoopConfig.sleepBeforeSend)
Thread.Sleep(1);

conn.sendPending.Reset();
sendPending.Reset();

if (SendLoopConfig.batchSend)
{
int offset = 0;
while (conn.sendQueue.TryDequeue(out ArrayBuffer msg))
while (sendQueue.TryDequeue(out ArrayBuffer msg))
{
using ArrayBuffer _ = msg; // auto release
// check if connected before sending message
if (!client.Connected)
{
Log.Verbose("[SWT-SendLoop]: SendLoop {0} not connected", conn);
msg.Release();
return;
}

Expand All @@ -86,7 +89,6 @@ public static void Loop(Config config)
}

offset = SendMessage(writeBuffer, offset, msg, setMask, maskHelper);
msg.Release();
}

// after no message in queue, send remaining messages
Expand All @@ -96,19 +98,18 @@ public static void Loop(Config config)
}
else
{
while (conn.sendQueue.TryDequeue(out ArrayBuffer msg))
while (sendQueue.TryDequeue(out ArrayBuffer msg))
{
using ArrayBuffer _ = msg; // auto release
// check if connected before sending message
if (!client.Connected)
{
Log.Verbose("[SWT-SendLoop]: SendLoop {0} not connected", conn);
msg.Release();
return;
}

int length = SendMessage(writeBuffer, 0, msg, setMask, maskHelper);
stream.Write(writeBuffer, 0, length);
msg.Release();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ public class SimpleWebServer

public bool Active { get; private set; }

public SimpleWebServer(int maxMessagesPerTick, TcpConfig tcpConfig, int maxMessageSize, int handshakeMaxSize, SslConfig sslConfig)
public SimpleWebServer(int maxMessagesPerTick, TcpConfig tcpConfig, int maxMessageSize, int handshakeMaxSize, SslConfig sslConfig, int maxSendQueueSize = 10000)
{
this.maxMessagesPerTick = maxMessagesPerTick;
// use max because bufferpool is used for both messages and handshake
int max = Math.Max(maxMessageSize, handshakeMaxSize);
bufferPool = new BufferPool(5, 20, max);
server = new WebSocketServer(tcpConfig, maxMessageSize, handshakeMaxSize, sslConfig, bufferPool);
server = new WebSocketServer(tcpConfig, maxMessageSize, handshakeMaxSize, sslConfig, bufferPool, maxSendQueueSize);
}

public void Start(ushort port)
Expand All @@ -40,6 +40,9 @@ public void Stop()

public void SendAll(List<int> connectionIds, ArraySegment<byte> source)
{
if (connectionIds.Count == 0)
return;

ArrayBuffer buffer = bufferPool.Take(source.Count);
buffer.CopyFrom(source);
buffer.SetReleasesRequired(connectionIds.Count);
Expand Down Expand Up @@ -89,8 +92,10 @@ public void ProcessMessageQueue(MonoBehaviour behaviour)
onConnect?.Invoke(next.connId, GetClientAddress(next.connId));
break;
case EventType.Data:
onData?.Invoke(next.connId, next.data.ToSegment());
next.data.Release();
using (next.data)
{
onData?.Invoke(next.connId, next.data.ToSegment());
}
break;
case EventType.Disconnected:
onDisconnect?.Invoke(next.connId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ public class WebSocketServer
readonly ServerSslHelper sslHelper;
readonly BufferPool bufferPool;
readonly ConcurrentDictionary<int, Connection> connections = new ConcurrentDictionary<int, Connection>();
readonly int maxSendQueueSize;

int _idCounter = 0;

public WebSocketServer(TcpConfig tcpConfig, int maxMessageSize, int handshakeMaxSize, SslConfig sslConfig, BufferPool bufferPool)
public WebSocketServer(TcpConfig tcpConfig, int maxMessageSize, int handshakeMaxSize, SslConfig sslConfig, BufferPool bufferPool, int maxSendQueueSize = 10000)
{
if (maxSendQueueSize <= 0)
throw new ArgumentException("maxSendQueueSize must be greater than 0", nameof(maxSendQueueSize));
this.tcpConfig = tcpConfig;
this.maxMessageSize = maxMessageSize;
sslHelper = new ServerSslHelper(sslConfig);
this.bufferPool = bufferPool;
handShake = new ServerHandshake(this.bufferPool, handshakeMaxSize);
this.maxSendQueueSize = maxSendQueueSize;
}

public void Listen(int port)
Expand Down Expand Up @@ -77,7 +81,7 @@ void acceptLoop()
// TODO keep track of connections before they are in connections dictionary
// this might not be a problem as HandshakeAndReceiveLoop checks for stop
// and returns/disposes before sending message to queue
Connection conn = new Connection(client, AfterConnectionDisposed);
Connection conn = new Connection(client, AfterConnectionDisposed, SendQueueFull, maxSendQueueSize);
Log.Verbose("[SWT-WebSocketServer]: A client connected from {0}", conn);

// handshake needs its own thread as it needs to wait for message from client
Expand Down Expand Up @@ -168,6 +172,11 @@ void HandshakeAndReceiveLoop(Connection conn)
}
}

void SendQueueFull(Connection conn)
{
receiveQueue.Enqueue(new Message(conn.connId, new Exception("Send Queue Full")));
}

void AfterConnectionDisposed(Connection conn)
{
if (conn.connId != Connection.IdNotSet)
Expand All @@ -187,11 +196,13 @@ public void Send(int id, ArrayBuffer buffer)
{
if (connections.TryGetValue(id, out Connection conn))
{
conn.sendQueue.Enqueue(buffer);
conn.sendPending.Set();
conn.QueueSend(buffer);
}
else
{
Log.Warn("[SWT-WebSocketServer]: Send: cannot send message to {0} because it was not found in dictionary. Maybe it disconnected.", id);
buffer.Release();
}
}

public void CloseConnection(int id)
Expand Down
5 changes: 4 additions & 1 deletion Assets/Mirror/Transports/SimpleWeb/SimpleWebTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public class SimpleWebTransport : Transport, PortTransport
[Tooltip("Caps the number of messages the client will process per tick. Allows LateUpdate to finish to let the reset of unity continue in case more messages arrive before they are processed")]
public int clientMaxMsgsPerTick = 1000;

[Tooltip("Maximum number of messages that can be in the send queue before the connection is closed. This prevents slow connections from using too much memory on the server.")]
public int maxSendQueueSize = 1000;

[Tooltip("Send would stall forever if the network is cut off during a send, so we need a timeout (in milliseconds)")]
public int sendTimeout = 5000;

Expand Down Expand Up @@ -293,7 +296,7 @@ public override void ServerStart()
Log.Warn("[SWT-ServerStart]: Server Already Started");

SslConfig config = SslConfigLoader.Load(sslEnabled, sslCertJson, sslProtocols);
server = new SimpleWebServer(serverMaxMsgsPerTick, TcpConfig, maxMessageSize, maxHandshakeSize, config);
server = new SimpleWebServer(serverMaxMsgsPerTick, TcpConfig, maxMessageSize, maxHandshakeSize, config, maxSendQueueSize);

server.onConnect += OnServerConnectedWithAddress.Invoke;
server.onDisconnect += OnServerDisconnected.Invoke;
Expand Down
Loading