Skip to content

Commit ca7d7b7

Browse files
feat: Queue extra packets in the adapter (1.0.0) (#1514)
* Add batched send/receive queues The send queue is meant to be used to store individual messages and read them back in batched chunks (to be sent on the network). It's also meant to act as a sort of overflow storage when UTP internal queues are full. The receive queue is meant to store batched messages (batched by the send queue) and read them out one at a time. * Allow tests to access internals * Remove dummy tests * Update UnityTransport to use new batched queues * Update transport tests * Add latest changes to CHANGELOG * Minor stylistic improvements for consistency * Fix whitespace issue * Ignore test until fix for UTP is released * Correctly handle ArraySegment not at beginning of array When handling incoming messages from the transport * Don't change how existing settings behave m_SendQueueBatchSize was effectively the maximum payload size and this is what it will stay as. The new maximum payload size setting has been removed. The size of the send queue is now controlled by m_MaxSendQueueSize. * Improve "too many in-flight" error message * Had left merge markers in CHANGELOG... * Stylistic improvements * Fix standards issues * Enable test that was blocked on UTP 1.0.0-pre.10
1 parent 22a5c92 commit ca7d7b7

15 files changed

+702
-256
lines changed

com.unity.netcode.adapter.utp/CHANGELOG.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,27 @@ All notable changes to this package will be documented in this file. The format
33

44
## [Unreleased]
55

6+
### Added
7+
8+
- Added new 'Max Send Queue Size' configuration field in the inspector. This controls the size of the send queue that is used to accumulate small sends together and also acts as an overflow queue when there are too many in-flight packets or when other internal queues are full.
9+
610
### Changed
711

812
- Removed 'Maximum Packet Size' configuration field in the inspector. This would cause confusion since the maximum packet size is in effect always the MTU (1400 bytes on most platforms).
913
- Updated com.unity.transport to 1.0.0-pre.10.
14+
- The 'Send Queue Batch Size' configuration field now controls the size of the send queue, rather than the size of a single batch of messages. Consequently, it should be set much higher than it was previously.
1015
- All delivery methods now support fragmentation, meaning the 'Send Queue Batch Size' setting (which controls the maximum payload size) now applies to all delivery methods, not just reliable ones.
1116

1217

1318
### Fixed
1419

1520
- Fixed packet overflow errors when sending payloads too close to the MTU (was mostly visible when using Relay).
1621
- Don't throw an exception when the host disconnects (issue 1439 on GitHub).
22+
- Avoid "too many inflight packets" errors by queueing packets in a queue when the limit of inflight packets is reached in UTP. The size of this queue can be controlled with the 'Max Send Queue Size' configuration field.
1723

1824
## [1.0.0-pre.3] - 2021-10-22
1925

20-
#### Added
26+
### Added
2127

2228
- Exposed `m_HeartbeatTimeoutMS`, `m_ConnectTimeoutMS`, `m_MaxConnectAttempts`, and `m_DisconnectTimeoutMS` parameters. (#1314)
2329

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
using System.Runtime.CompilerServices;
2+
3+
[assembly: InternalsVisibleTo("Unity.Netcode.Adapter.UTP.EditorTests")]
4+
[assembly: InternalsVisibleTo("Unity.Netcode.Adapter.UTP.RuntimeTests")]

com.unity.netcode.adapter.utp/Tests/Runtime/DummyTestScript.cs.meta renamed to com.unity.netcode.adapter.utp/Runtime/AssemblyInfo.cs.meta

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
using System;
2+
using Unity.Networking.Transport;
3+
4+
namespace Unity.Netcode.UTP.Utilities
5+
{
6+
/// <summary>Queue for batched messages received through UTP.</summary>
7+
/// <remarks>This is meant as a companion to <see cref="BatchedSendQueue"/>.</remarks>
8+
internal class BatchedReceiveQueue
9+
{
10+
private byte[] m_Data;
11+
private DataStreamReader m_Reader;
12+
private int m_ReaderOffset;
13+
14+
public bool IsEmpty => m_ReaderOffset >= m_Reader.Length;
15+
16+
/// <summary>
17+
/// Construct a new receive queue from a <see cref="DataStreamReader"/> returned by
18+
/// <see cref="NetworkDriver"/> when popping a data event.
19+
/// </summary>
20+
/// <param name="reader">The <see cref="DataStreamReader"/> to construct from.</param>
21+
public BatchedReceiveQueue(DataStreamReader reader)
22+
{
23+
m_Data = new byte[reader.Length];
24+
unsafe
25+
{
26+
fixed (byte* dataPtr = m_Data)
27+
{
28+
reader.ReadBytes(dataPtr, reader.Length);
29+
}
30+
}
31+
32+
m_Reader = reader;
33+
m_ReaderOffset = 0;
34+
}
35+
36+
/// <summary>Pop the next message in the queue.</summary>
37+
/// <returns>The message, or the default value if no more messages.</returns>
38+
public ArraySegment<byte> PopMessage()
39+
{
40+
if (IsEmpty)
41+
{
42+
return default;
43+
}
44+
45+
m_Reader.SeekSet(m_ReaderOffset);
46+
47+
var messageLength = m_Reader.ReadInt();
48+
m_ReaderOffset += sizeof(int);
49+
50+
var data = new ArraySegment<byte>(m_Data, m_ReaderOffset, messageLength);
51+
m_ReaderOffset += messageLength;
52+
53+
return data;
54+
}
55+
}
56+
}

com.unity.netcode.adapter.utp/Runtime/BatchedReceiveQueue.cs.meta

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
using System;
2+
using Unity.Collections;
3+
using Unity.Collections.LowLevel.Unsafe;
4+
using Unity.Networking.Transport;
5+
6+
namespace Unity.Netcode.UTP.Utilities
7+
{
8+
/// <summary>Queue for batched messages meant to be sent through UTP.</summary>
9+
/// <remarks>
10+
/// Messages should be pushed on the queue with <see cref="PushMessage"/>. To send batched
11+
/// messages, call <see cref="FillWriter"> with the <see cref="DataStreamWriter"/> obtained from
12+
/// <see cref="NetworkDriver.BeginSend"/>. This will fill the writer with as many messages as
13+
/// possible. If the send is successful, call <see cref="Consume"/> to remove the data from the
14+
/// queue.
15+
///
16+
/// This is meant as a companion to <see cref="BatchedReceiveQueue"/>, which should be used to
17+
/// read messages sent with this queue.
18+
/// </remarks>
19+
internal struct BatchedSendQueue : IDisposable
20+
{
21+
private NativeArray<byte> m_Data;
22+
private NativeArray<int> m_HeadTailIndices;
23+
24+
/// <summary>Overhead that is added to each message in the queue.</summary>
25+
public const int PerMessageOverhead = sizeof(int);
26+
27+
// Indices into m_HeadTailIndicies.
28+
private const int k_HeadInternalIndex = 0;
29+
private const int k_TailInternalIndex = 1;
30+
31+
/// <summary>Index of the first byte of the oldest data in the queue.</summary>
32+
private int HeadIndex
33+
{
34+
get { return m_HeadTailIndices[k_HeadInternalIndex]; }
35+
set { m_HeadTailIndices[k_HeadInternalIndex] = value; }
36+
}
37+
38+
/// <summary>Index one past the last byte of the most recent data in the queue.</summary>
39+
private int TailIndex
40+
{
41+
get { return m_HeadTailIndices[k_TailInternalIndex]; }
42+
set { m_HeadTailIndices[k_TailInternalIndex] = value; }
43+
}
44+
45+
public int Length => TailIndex - HeadIndex;
46+
47+
public bool IsEmpty => HeadIndex == TailIndex;
48+
49+
public bool IsCreated => m_Data.IsCreated;
50+
51+
/// <summary>Construct a new empty send queue.</summary>
52+
/// <param name="capacity">Maximum capacity of the send queue.</param>
53+
public BatchedSendQueue(int capacity)
54+
{
55+
m_Data = new NativeArray<byte>(capacity, Allocator.Persistent);
56+
m_HeadTailIndices = new NativeArray<int>(2, Allocator.Persistent);
57+
58+
HeadIndex = 0;
59+
TailIndex = 0;
60+
}
61+
62+
public void Dispose()
63+
{
64+
if (IsCreated)
65+
{
66+
m_Data.Dispose();
67+
m_HeadTailIndices.Dispose();
68+
}
69+
}
70+
71+
/// <summary>Append data at the tail of the queue. No safety checks.</summary>
72+
private void AppendDataAtTail(ArraySegment<byte> data)
73+
{
74+
unsafe
75+
{
76+
var writer = new DataStreamWriter((byte*)m_Data.GetUnsafePtr() + TailIndex, m_Data.Length - TailIndex);
77+
78+
writer.WriteInt(data.Count);
79+
80+
fixed (byte* dataPtr = data.Array)
81+
{
82+
writer.WriteBytes(dataPtr + data.Offset, data.Count);
83+
}
84+
}
85+
86+
TailIndex += sizeof(int) + data.Count;
87+
}
88+
89+
/// <summary>Append a new message to the queue.</summary>
90+
/// <param name="message">Message to append to the queue.</param>
91+
/// <returns>
92+
/// Whether the message was appended successfully. The only way it can fail is if there's
93+
/// no more room in the queue. On failure, nothing is written to the queue.
94+
/// </returns>
95+
public bool PushMessage(ArraySegment<byte> message)
96+
{
97+
if (!IsCreated)
98+
{
99+
return false;
100+
}
101+
102+
// Check if there's enough room after the current tail index.
103+
if (m_Data.Length - TailIndex >= sizeof(int) + message.Count)
104+
{
105+
AppendDataAtTail(message);
106+
return true;
107+
}
108+
109+
// Check if there would be enough room if we moved data at the beginning of m_Data.
110+
if (m_Data.Length - TailIndex + HeadIndex >= sizeof(int) + message.Count)
111+
{
112+
// Move the data back at the beginning of m_Data.
113+
unsafe
114+
{
115+
UnsafeUtility.MemMove(m_Data.GetUnsafePtr(), (byte*)m_Data.GetUnsafePtr() + HeadIndex, Length);
116+
}
117+
118+
TailIndex = Length;
119+
HeadIndex = 0;
120+
121+
AppendDataAtTail(message);
122+
return true;
123+
}
124+
125+
return false;
126+
}
127+
128+
/// <summary>
129+
/// Fill as much of a <see cref="DataStreamWriter"/> as possible with data from the head of
130+
/// the queue. Only full messages (and their length) are written to the writer.
131+
/// </summary>
132+
/// <remarks>
133+
/// This does NOT actually consume anything from the queue. That is, calling this method
134+
/// does not reduce the length of the queue. Callers are expected to call
135+
/// <see cref="Consume"/> with the value returned by this method afterwards if the data can
136+
/// be safely removed from the queue (e.g. if it was sent successfully).
137+
/// </remarks>
138+
/// <param name="writer">The <see cref="DataStreamWriter"/> to write to.</param>
139+
/// <returns>How many bytes were written to the writer.</returns>
140+
public int FillWriter(ref DataStreamWriter writer)
141+
{
142+
if (!IsCreated || Length == 0)
143+
{
144+
return 0;
145+
}
146+
147+
unsafe
148+
{
149+
var reader = new DataStreamReader((byte*)m_Data.GetUnsafePtr() + HeadIndex, Length);
150+
151+
var writerAvailable = writer.Capacity;
152+
var readerOffset = 0;
153+
154+
while (readerOffset < Length)
155+
{
156+
reader.SeekSet(readerOffset);
157+
var messageLength = reader.ReadInt();
158+
159+
if (writerAvailable < sizeof(int) + messageLength)
160+
{
161+
break;
162+
}
163+
else
164+
{
165+
writer.WriteInt(messageLength);
166+
167+
var messageOffset = HeadIndex + reader.GetBytesRead();
168+
writer.WriteBytes((byte*)m_Data.GetUnsafePtr() + messageOffset, messageLength);
169+
170+
writerAvailable -= sizeof(int) + messageLength;
171+
readerOffset += sizeof(int) + messageLength;
172+
}
173+
}
174+
175+
return writer.Capacity - writerAvailable;
176+
}
177+
}
178+
179+
/// <summary>Consume a number of bytes from the head of the queue.</summary>
180+
/// <remarks>
181+
/// This should only be called with a size that matches the last value returned by
182+
/// <see cref="FillWriter"/>. Anything else will result in a corrupted queue.
183+
/// </remarks>
184+
/// <param name="size">Number of bytes to consume from the queue.</param>
185+
public void Consume(int size)
186+
{
187+
if (size >= Length)
188+
{
189+
HeadIndex = 0;
190+
TailIndex = 0;
191+
}
192+
else
193+
{
194+
HeadIndex += size;
195+
}
196+
}
197+
}
198+
}

com.unity.netcode.adapter.utp/Runtime/BatchedSendQueue.cs.meta

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)