Skip to content

Commit 173aeaf

Browse files
committed
feat: Added buffering of all message types
1 parent e81e767 commit 173aeaf

File tree

6 files changed

+233
-111
lines changed

6 files changed

+233
-111
lines changed

MLAPI-Editor/NetworkingManagerEditor.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public class NetworkingManagerEditor : Editor
4242
private SerializedProperty networkIdRecycleDelayProperty;
4343
private SerializedProperty rpcHashSizeProperty;
4444
private SerializedProperty loadSceneTimeOutProperty;
45+
private SerializedProperty enableMessageBufferingProperty;
46+
private SerializedProperty messageBufferTimeoutProperty;
4547
private SerializedProperty enableEncryptionProperty;
4648
private SerializedProperty signKeyExchangeProperty;
4749
private SerializedProperty serverBase64PfxCertificateProperty;
@@ -120,6 +122,8 @@ private void Init()
120122
networkIdRecycleDelayProperty = networkConfigProperty.FindPropertyRelative("NetworkIdRecycleDelay");
121123
rpcHashSizeProperty = networkConfigProperty.FindPropertyRelative("RpcHashSize");
122124
loadSceneTimeOutProperty = networkConfigProperty.FindPropertyRelative("LoadSceneTimeOut");
125+
enableMessageBufferingProperty = networkConfigProperty.FindPropertyRelative("EnableMessageBuffering");
126+
messageBufferTimeoutProperty = networkConfigProperty.FindPropertyRelative("MessageBufferTimeout");
123127
enableEncryptionProperty = networkConfigProperty.FindPropertyRelative("EnableEncryption");
124128
signKeyExchangeProperty = networkConfigProperty.FindPropertyRelative("SignKeyExchange");
125129
serverBase64PfxCertificateProperty = networkConfigProperty.FindPropertyRelative("ServerBase64PfxCertificate");
@@ -158,6 +162,8 @@ private void CheckNullProperties()
158162
networkIdRecycleDelayProperty = networkConfigProperty.FindPropertyRelative("NetworkIdRecycleDelay");
159163
rpcHashSizeProperty = networkConfigProperty.FindPropertyRelative("RpcHashSize");
160164
loadSceneTimeOutProperty = networkConfigProperty.FindPropertyRelative("LoadSceneTimeOut");
165+
enableMessageBufferingProperty = networkConfigProperty.FindPropertyRelative("EnableMessageBuffering");
166+
messageBufferTimeoutProperty = networkConfigProperty.FindPropertyRelative("MessageBufferTimeout");
161167
enableEncryptionProperty = networkConfigProperty.FindPropertyRelative("EnableEncryption");
162168
signKeyExchangeProperty = networkConfigProperty.FindPropertyRelative("SignKeyExchange");
163169
serverBase64PfxCertificateProperty = networkConfigProperty.FindPropertyRelative("ServerBase64PfxCertificate");
@@ -324,6 +330,13 @@ public override void OnInspectorGUI()
324330
EditorGUILayout.PropertyField(networkIdRecycleDelayProperty);
325331
}
326332

333+
EditorGUILayout.PropertyField(enableMessageBufferingProperty);
334+
335+
using (new EditorGUI.DisabledScope(!networkingManager.NetworkConfig.EnableMessageBuffering))
336+
{
337+
EditorGUILayout.PropertyField(messageBufferTimeoutProperty);
338+
}
339+
327340
EditorGUILayout.LabelField("Bandwidth", EditorStyles.boldLabel);
328341
EditorGUILayout.PropertyField(rpcHashSizeProperty);
329342

MLAPI/Configuration/NetworkConfig.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,16 @@ public class NetworkConfig
159159
[Tooltip("The amount of seconds to wait for all clients to load a requested scene")]
160160
public int LoadSceneTimeOut = 120;
161161
/// <summary>
162+
/// Whether or not message buffering should be enabled. This will resolve most out of order messages during spawn.
163+
/// </summary>
164+
[Tooltip("Whether or not message buffering should be enabled. This will resolve most out of order messages during spawn")]
165+
public bool EnableMessageBuffering = true;
166+
/// <summary>
167+
/// The amount of time a message should be buffered for without being consumed. If it is not consumed within this time, it will be dropped.
168+
/// </summary>
169+
[Tooltip("The amount of time a message should be buffered for without being consumed. If it is not consumed within this time, it will be dropped")]
170+
public float MessageBufferTimeout = 20f;
171+
/// <summary>
162172
/// Whether or not to enable the ECDHE key exchange to allow for encryption and authentication of messages
163173
/// </summary>
164174
[Tooltip("Whether or not to enable the ECDHE key exchange to allow for encryption and authentication of messages")]

MLAPI/Core/NetworkingManager.cs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
using static MLAPI.Messaging.CustomMessagingManager;
2626
using MLAPI.Exceptions;
2727
using MLAPI.Transports.Tasks;
28+
using MLAPI.Messaging.Buffering;
2829

2930
namespace MLAPI
3031
{
@@ -333,7 +334,6 @@ private void Init(bool server)
333334
NetworkSceneManager.sceneIndexToString.Clear();
334335
NetworkSceneManager.sceneNameToIndex.Clear();
335336
NetworkSceneManager.sceneSwitchProgresses.Clear();
336-
NetworkSceneManager.sceneBufferedNetworkIds.Clear();
337337

338338
if (NetworkConfig.NetworkTransport == null)
339339
{
@@ -658,6 +658,11 @@ private void Update()
658658
NetworkedObject.NetworkedBehaviourUpdate();
659659
}
660660

661+
if (!IsServer && NetworkConfig.EnableMessageBuffering)
662+
{
663+
BufferManager.CleanBuffer();
664+
}
665+
661666
if (IsServer)
662667
{
663668
lastEventTickTime = NetworkTime;
@@ -905,22 +910,17 @@ internal void HandleIncomingData(ulong clientId, string channelName, ArraySegmen
905910

906911
void bufferCallback(ulong networkId)
907912
{
908-
// Alloc some memory
909-
PooledBitStream pooledData = PooledBitStream.Get();
910-
911-
// Copy the data
912-
pooledData.Write(data.Array, data.Offset, data.Count);
913+
if (!NetworkConfig.EnableMessageBuffering)
914+
{
915+
throw new InvalidOperationException("Cannot buffer with buffering disabled.");
916+
}
913917

914-
NetworkSceneManager.BufferedMessage message = new NetworkSceneManager.BufferedMessage()
918+
if (IsServer)
915919
{
916-
channelName = channelName,
917-
pool = pooledData,
918-
payload = new ArraySegment<byte>(pooledData.GetBuffer(), 0, data.Count),
919-
receiveTime = receiveTime,
920-
sender = clientId
921-
};
922-
923-
NetworkSceneManager.sceneBufferedNetworkIds[networkId].Enqueue(message);
920+
throw new InvalidOperationException("Cannot buffer on server.");
921+
}
922+
923+
BufferManager.BufferMessageForNetworkId(networkId, clientId, channelName, receiveTime, data);
924924
}
925925

926926

@@ -962,10 +962,10 @@ void bufferCallback(ulong networkId)
962962
InternalMessageHandler.HandleNetworkedVarUpdate(clientId, messageStream, bufferCallback);
963963
break;
964964
case MLAPIConstants.MLAPI_SERVER_RPC:
965-
if (IsServer) InternalMessageHandler.HandleServerRPC(clientId, messageStream, bufferCallback);
965+
if (IsServer) InternalMessageHandler.HandleServerRPC(clientId, messageStream);
966966
break;
967967
case MLAPIConstants.MLAPI_SERVER_RPC_REQUEST:
968-
if (IsServer) InternalMessageHandler.HandleServerRPCRequest(clientId, messageStream, channelName, security, bufferCallback);
968+
if (IsServer) InternalMessageHandler.HandleServerRPCRequest(clientId, messageStream, channelName, security);
969969
break;
970970
case MLAPIConstants.MLAPI_SERVER_RPC_RESPONSE:
971971
if (IsClient) InternalMessageHandler.HandleServerRPCResponse(clientId, messageStream);
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using MLAPI.Serialization.Pooled;
4+
using UnityEngine;
5+
6+
namespace MLAPI.Messaging.Buffering
7+
{
8+
internal static class BufferManager
9+
{
10+
private static readonly Dictionary<ulong, Queue<BufferedMessage>> bufferQueues = new Dictionary<ulong, Queue<BufferedMessage>>();
11+
12+
internal struct BufferedMessage
13+
{
14+
internal ulong sender;
15+
internal string channelName;
16+
internal PooledBitStream payload;
17+
internal float receiveTime;
18+
internal float bufferTime;
19+
}
20+
21+
internal static Queue<BufferedMessage> ConsumeBuffersForNetworkId(ulong networkId)
22+
{
23+
if (bufferQueues.ContainsKey(networkId))
24+
{
25+
Queue<BufferedMessage> message = bufferQueues[networkId];
26+
27+
bufferQueues.Remove(networkId);
28+
29+
return message;
30+
}
31+
else
32+
{
33+
return null;
34+
}
35+
}
36+
37+
internal static void RecycleConsumedBufferedMessage(BufferedMessage message)
38+
{
39+
message.payload.Dispose();
40+
}
41+
42+
internal static void BufferMessageForNetworkId(ulong networkId, ulong sender, string channelName, float receiveTime, ArraySegment<byte> payload)
43+
{
44+
if (!bufferQueues.ContainsKey(networkId))
45+
{
46+
bufferQueues.Add(networkId, new Queue<BufferedMessage>());
47+
}
48+
49+
Queue<BufferedMessage> queue = bufferQueues[networkId];
50+
51+
PooledBitStream payloadStream = PooledBitStream.Get();
52+
53+
payloadStream.Write(payload.Array, payload.Offset, payload.Count);
54+
payloadStream.Position = 0;
55+
56+
queue.Enqueue(new BufferedMessage()
57+
{
58+
bufferTime = Time.realtimeSinceStartup,
59+
channelName = channelName,
60+
payload = payloadStream,
61+
receiveTime = receiveTime,
62+
sender = sender
63+
});
64+
}
65+
66+
private static readonly List<ulong> _keysToDestroy = new List<ulong>();
67+
internal static void CleanBuffer()
68+
{
69+
foreach (KeyValuePair<ulong, Queue<BufferedMessage>> pair in bufferQueues)
70+
{
71+
while (pair.Value.Count > 0 && Time.realtimeSinceStartup - pair.Value.Peek().bufferTime >= NetworkingManager.Singleton.NetworkConfig.MessageBufferTimeout)
72+
{
73+
BufferedMessage message = pair.Value.Dequeue();
74+
75+
RecycleConsumedBufferedMessage(message);
76+
}
77+
78+
if (pair.Value.Count == 0)
79+
{
80+
_keysToDestroy.Add(pair.Key);
81+
}
82+
}
83+
84+
for (int i = 0; i < _keysToDestroy.Count; i++)
85+
{
86+
bufferQueues.Remove(_keysToDestroy[i]);
87+
}
88+
89+
_keysToDestroy.Clear();
90+
}
91+
}
92+
}

0 commit comments

Comments
 (0)