Skip to content

Commit e3b56a4

Browse files
feat: Lift ~44KB limit for reliable payloads in the adapter (1.0.0) (#1608)
1 parent 3b9b812 commit e3b56a4

File tree

9 files changed

+375
-66
lines changed

9 files changed

+375
-66
lines changed

com.unity.netcode.adapter.utp/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ All notable changes to this package will be documented in this file. The format
88

99
### Changed
1010

11-
- Rename the 'Send Queue Batch Size' property to 'Max Payload Size' to better reflect its usage. (#1585)
11+
- Rename the 'Send Queue Batch Size' property to 'Max Payload Size' to better reflect its usage. (#1584)
1212

1313
### Fixed
1414

15+
- Lifted the limit of ~44KB for reliable payloads. Before the fix, attempting to send a payload larger than that with reliable delivery would silently fail. Note that it is still not recommended to send such large reliable payloads, since their delivery could take a few network round-trips. (#1596)
16+
1517
## [1.0.0-pre.4] - 2022-01-04
1618

1719
### Added

com.unity.netcode.adapter.utp/Runtime/BatchedReceiveQueue.cs

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ namespace Unity.Netcode.UTP.Utilities
88
internal class BatchedReceiveQueue
99
{
1010
private byte[] m_Data;
11-
private DataStreamReader m_Reader;
12-
private int m_ReaderOffset;
11+
private int m_Offset;
12+
private int m_Length;
1313

14-
public bool IsEmpty => m_ReaderOffset >= m_Reader.Length;
14+
public bool IsEmpty => m_Length <= 0;
1515

1616
/// <summary>
1717
/// Construct a new receive queue from a <see cref="DataStreamReader"/> returned by
@@ -29,26 +29,66 @@ public BatchedReceiveQueue(DataStreamReader reader)
2929
}
3030
}
3131

32-
m_Reader = reader;
33-
m_ReaderOffset = 0;
32+
m_Offset = 0;
33+
m_Length = reader.Length;
3434
}
3535

36-
/// <summary>Pop the next message in the queue.</summary>
37-
/// <returns>The message, or the default value if no more messages.</returns>
36+
/// <summary>
37+
/// Push the entire data from a <see cref="DataStreamReader"/> (as returned by popping an
38+
/// event from a <see cref="NetworkDriver">) to the queue.
39+
/// </summary>
40+
/// <param name="reader">The <see cref="DataStreamReader"/> to push the data of.</param>
41+
public void PushReader(DataStreamReader reader)
42+
{
43+
// Resize the array and copy the existing data to the beginning if there's not enough
44+
// room to copy the reader's data at the end of the existing data.
45+
var available = m_Data.Length - (m_Offset + m_Length);
46+
if (available < reader.Length)
47+
{
48+
if (m_Length > 0)
49+
{
50+
Array.Copy(m_Data, m_Offset, m_Data, 0, m_Length);
51+
}
52+
53+
m_Offset = 0;
54+
55+
while (m_Data.Length - m_Length < reader.Length)
56+
{
57+
Array.Resize(ref m_Data, m_Data.Length * 2);
58+
}
59+
}
60+
61+
unsafe
62+
{
63+
fixed (byte* dataPtr = m_Data)
64+
{
65+
reader.ReadBytes(dataPtr + m_Offset + m_Length, reader.Length);
66+
}
67+
}
68+
69+
m_Length += reader.Length;
70+
}
71+
72+
/// <summary>Pop the next full message in the queue.</summary>
73+
/// <returns>The message, or the default value if no more full messages.</returns>
3874
public ArraySegment<byte> PopMessage()
3975
{
40-
if (IsEmpty)
76+
if (m_Length < sizeof(int))
4177
{
4278
return default;
4379
}
4480

45-
m_Reader.SeekSet(m_ReaderOffset);
81+
var messageLength = BitConverter.ToInt32(m_Data, m_Offset);
82+
83+
if (m_Length - sizeof(int) < messageLength)
84+
{
85+
return default;
86+
}
4687

47-
var messageLength = m_Reader.ReadInt();
48-
m_ReaderOffset += sizeof(int);
88+
var data = new ArraySegment<byte>(m_Data, m_Offset + sizeof(int), messageLength);
4989

50-
var data = new ArraySegment<byte>(m_Data, m_ReaderOffset, messageLength);
51-
m_ReaderOffset += messageLength;
90+
m_Offset += sizeof(int) + messageLength;
91+
m_Length -= sizeof(int) + messageLength;
5292

5393
return data;
5494
}

com.unity.netcode.adapter.utp/Runtime/BatchedSendQueue.cs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,13 @@ public bool PushMessage(ArraySegment<byte> message)
134134
/// does not reduce the length of the queue. Callers are expected to call
135135
/// <see cref="Consume"/> with the value returned by this method afterwards if the data can
136136
/// be safely removed from the queue (e.g. if it was sent successfully).
137+
///
138+
/// This method should not be used together with <see cref="FillWriterWithBytes"> since this
139+
/// could lead to a corrupted queue.
137140
/// </remarks>
138141
/// <param name="writer">The <see cref="DataStreamWriter"/> to write to.</param>
139142
/// <returns>How many bytes were written to the writer.</returns>
140-
public int FillWriter(ref DataStreamWriter writer)
143+
public int FillWriterWithMessages(ref DataStreamWriter writer)
141144
{
142145
if (!IsCreated || Length == 0)
143146
{
@@ -176,6 +179,38 @@ public int FillWriter(ref DataStreamWriter writer)
176179
}
177180
}
178181

182+
/// <summary>
183+
/// Fill the given <see cref="DataStreamWriter"/> with as many bytes from the queue as
184+
/// possible, disregarding message boundaries.
185+
/// </summary>
186+
///<remarks>
187+
/// This does NOT actually consume anything from the queue. That is, calling this method
188+
/// does not reduce the length of the queue. Callers are expected to call
189+
/// <see cref="Consume"/> with the value returned by this method afterwards if the data can
190+
/// be safely removed from the queue (e.g. if it was sent successfully).
191+
///
192+
/// This method should not be used together with <see cref="FillWriterWithMessages"/> since
193+
/// this could lead to reading messages from a corrupted queue.
194+
/// </remarks>
195+
/// <param name="writer">The <see cref="DataStreamWriter"/> to write to.</param>
196+
/// <returns>How many bytes were written to the writer.</returns>
197+
public int FillWriterWithBytes(ref DataStreamWriter writer)
198+
{
199+
if (!IsCreated || Length == 0)
200+
{
201+
return 0;
202+
}
203+
204+
var copyLength = Math.Min(writer.Capacity, Length);
205+
206+
unsafe
207+
{
208+
writer.WriteBytes((byte*)m_Data.GetUnsafePtr() + HeadIndex, copyLength);
209+
}
210+
211+
return copyLength;
212+
}
213+
179214
/// <summary>Consume a number of bytes from the head of the queue.</summary>
180215
/// <remarks>
181216
/// This should only be called with a size that matches the last value returned by

com.unity.netcode.adapter.utp/Runtime/UnityTransport.cs

Lines changed: 63 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ void CreateDriver(
2323
out NetworkDriver driver,
2424
out NetworkPipeline unreliableFragmentedPipeline,
2525
out NetworkPipeline unreliableSequencedFragmentedPipeline,
26-
out NetworkPipeline reliableSequencedFragmentedPipeline);
26+
out NetworkPipeline reliableSequencedPipeline);
2727
}
2828

2929
public static class ErrorUtilities
@@ -160,7 +160,7 @@ public static implicit operator ConnectionAddressData(NetworkEndPoint d) =>
160160

161161
private NetworkPipeline m_UnreliableFragmentedPipeline;
162162
private NetworkPipeline m_UnreliableSequencedFragmentedPipeline;
163-
private NetworkPipeline m_ReliableSequencedFragmentedPipeline;
163+
private NetworkPipeline m_ReliableSequencedPipeline;
164164

165165
public override ulong ServerClientId => m_ServerClientId;
166166

@@ -210,14 +210,19 @@ public SimulatorUtility.Parameters ClientSimulatorParameters
210210
/// </summary>
211211
private readonly Dictionary<SendTarget, BatchedSendQueue> m_SendQueue = new Dictionary<SendTarget, BatchedSendQueue>();
212212

213+
// Since reliable messages may be spread out over multiple transport payloads, it's possible
214+
// to receive only parts of a message in an update. We thus keep the reliable receive queues
215+
// around to avoid losing partial messages.
216+
private readonly Dictionary<ulong, BatchedReceiveQueue> m_ReliableReceiveQueues = new Dictionary<ulong, BatchedReceiveQueue>();
217+
213218
private void InitDriver()
214219
{
215220
DriverConstructor.CreateDriver(
216221
this,
217222
out m_Driver,
218223
out m_UnreliableFragmentedPipeline,
219224
out m_UnreliableSequencedFragmentedPipeline,
220-
out m_ReliableSequencedFragmentedPipeline);
225+
out m_ReliableSequencedPipeline);
221226
}
222227

223228
private void DisposeDriver()
@@ -241,7 +246,7 @@ private NetworkPipeline SelectSendPipeline(NetworkDelivery delivery)
241246
case NetworkDelivery.Reliable:
242247
case NetworkDelivery.ReliableSequenced:
243248
case NetworkDelivery.ReliableFragmentedSequenced:
244-
return m_ReliableSequencedFragmentedPipeline;
249+
return m_ReliableSequencedPipeline;
245250

246251
default:
247252
Debug.LogError($"Unknown {nameof(NetworkDelivery)} value: {delivery}");
@@ -340,6 +345,11 @@ private static RelayConnectionData ConvertConnectionData(byte[] connectionData)
340345
}
341346
}
342347

348+
internal void SetMaxPayloadSize(int maxPayloadSize)
349+
{
350+
m_MaxPayloadSize = maxPayloadSize;
351+
}
352+
343353
private void SetProtocol(ProtocolType inProtocol)
344354
{
345355
m_ProtocolType = inProtocol;
@@ -439,7 +449,14 @@ private void SendBatchedMessages(SendTarget sendTarget, BatchedSendQueue queue)
439449
return;
440450
}
441451

442-
var written = queue.FillWriter(ref writer);
452+
// We don't attempt to send entire payloads over the reliable pipeline. Instead we
453+
// fragment it manually. This is safe and easy to do since the reliable pipeline
454+
// basically implements a stream, so as long as we separate the different messages
455+
// in the stream (the send queue does that automatically) we are sure they'll be
456+
// reassembled properly at the other end. This allows us to lift the limit of ~44KB
457+
// on reliable payloads (because of the reliable window size).
458+
var written = pipeline == m_ReliableSequencedPipeline
459+
? queue.FillWriterWithBytes(ref writer) : queue.FillWriterWithMessages(ref writer);
443460

444461
result = m_Driver.EndSend(writer);
445462
if (result == written)
@@ -481,9 +498,42 @@ private bool AcceptConnection()
481498

482499
}
483500

501+
private void ReceiveMessages(ulong clientId, NetworkPipeline pipeline, DataStreamReader dataReader)
502+
{
503+
BatchedReceiveQueue queue;
504+
if (pipeline == m_ReliableSequencedPipeline)
505+
{
506+
if (m_ReliableReceiveQueues.TryGetValue(clientId, out queue))
507+
{
508+
queue.PushReader(dataReader);
509+
}
510+
else
511+
{
512+
queue = new BatchedReceiveQueue(dataReader);
513+
m_ReliableReceiveQueues[clientId] = queue;
514+
}
515+
}
516+
else
517+
{
518+
queue = new BatchedReceiveQueue(dataReader);
519+
}
520+
521+
while (!queue.IsEmpty)
522+
{
523+
var message = queue.PopMessage();
524+
if (message == default)
525+
{
526+
// Only happens if there's only a partial message in the queue (rare).
527+
break;
528+
}
529+
530+
InvokeOnTransportEvent(NetcodeNetworkEvent.Data, clientId, message, Time.realtimeSinceStartup);
531+
}
532+
}
533+
484534
private bool ProcessEvent()
485535
{
486-
var eventType = m_Driver.PopEvent(out var networkConnection, out var reader);
536+
var eventType = m_Driver.PopEvent(out var networkConnection, out var reader, out var pipeline);
487537

488538
switch (eventType)
489539
{
@@ -510,6 +560,8 @@ private bool ProcessEvent()
510560
}
511561
}
512562

563+
m_ReliableReceiveQueues.Remove(ParseClientId(networkConnection));
564+
513565
InvokeOnTransportEvent(NetcodeNetworkEvent.Disconnect,
514566
ParseClientId(networkConnection),
515567
default(ArraySegment<byte>),
@@ -520,17 +572,7 @@ private bool ProcessEvent()
520572
}
521573
case TransportNetworkEvent.Type.Data:
522574
{
523-
var queue = new BatchedReceiveQueue(reader);
524-
525-
while (!queue.IsEmpty)
526-
{
527-
InvokeOnTransportEvent(NetcodeNetworkEvent.Data,
528-
ParseClientId(networkConnection),
529-
queue.PopMessage(),
530-
Time.realtimeSinceStartup
531-
);
532-
}
533-
575+
ReceiveMessages(ParseClientId(networkConnection), pipeline, reader);
534576
return true;
535577
}
536578
}
@@ -744,7 +786,7 @@ public override void Shutdown()
744786
public void CreateDriver(UnityTransport transport, out NetworkDriver driver,
745787
out NetworkPipeline unreliableFragmentedPipeline,
746788
out NetworkPipeline unreliableSequencedFragmentedPipeline,
747-
out NetworkPipeline reliableSequencedFragmentedPipeline)
789+
out NetworkPipeline reliableSequencedPipeline)
748790
{
749791
var maxFrameTimeMS = 0;
750792

@@ -775,8 +817,7 @@ public void CreateDriver(UnityTransport transport, out NetworkDriver driver,
775817
typeof(UnreliableSequencedPipelineStage),
776818
typeof(SimulatorPipelineStage),
777819
typeof(SimulatorPipelineStageInSend));
778-
reliableSequencedFragmentedPipeline = driver.CreatePipeline(
779-
typeof(FragmentationPipelineStage),
820+
reliableSequencedPipeline = driver.CreatePipeline(
780821
typeof(ReliableSequencedPipelineStage),
781822
typeof(SimulatorPipelineStage),
782823
typeof(SimulatorPipelineStageInSend));
@@ -788,8 +829,8 @@ public void CreateDriver(UnityTransport transport, out NetworkDriver driver,
788829
typeof(FragmentationPipelineStage));
789830
unreliableSequencedFragmentedPipeline = driver.CreatePipeline(
790831
typeof(FragmentationPipelineStage), typeof(UnreliableSequencedPipelineStage));
791-
reliableSequencedFragmentedPipeline = driver.CreatePipeline(
792-
typeof(FragmentationPipelineStage), typeof(ReliableSequencedPipelineStage)
832+
reliableSequencedPipeline = driver.CreatePipeline(
833+
typeof(ReliableSequencedPipelineStage)
793834
);
794835
}
795836
}

0 commit comments

Comments
 (0)