Skip to content

Commit b35de23

Browse files
authored
WebSocket: Better ReadAsync behavior and buffer management (#277)
1 parent 508b104 commit b35de23

File tree

2 files changed

+435
-15
lines changed

2 files changed

+435
-15
lines changed

Source/HiveMQtt/Client/Transport/WebSocketTransport.cs

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -295,15 +295,59 @@ public override async Task<TransportReadResult> ReadAsync(CancellationToken canc
295295
return new TransportReadResult(true);
296296
}
297297

298-
var buffer = new ArraySegment<byte>(new byte[8192]);
298+
// Use ArrayPool to reuse buffers and reduce GC pressure
299+
const int bufferSize = 8192;
300+
var rentedBuffer = ArrayPool<byte>.Shared.Rent(bufferSize);
301+
var buffer = new ArraySegment<byte>(rentedBuffer, 0, bufferSize);
299302
WebSocketReceiveResult result;
300303

301304
try
302305
{
306+
// First read to determine if message is fragmented
307+
result = await this.Socket.ReceiveAsync(buffer, cancellationToken).ConfigureAwait(false);
308+
309+
// Check if we received a close message
310+
if (result.MessageType is WebSocketMessageType.Close)
311+
{
312+
Logger.Debug($"WebSocket received close message: {result.CloseStatus}");
313+
return new TransportReadResult(true);
314+
}
315+
316+
// Check if connection was closed during read
317+
if (result.CloseStatus.HasValue)
318+
{
319+
Logger.Debug($"WebSocket connection closed during read: {result.CloseStatus}");
320+
return new TransportReadResult(true);
321+
}
322+
323+
// Optimize for single-read messages (no fragmentation)
324+
if (result.EndOfMessage && result.MessageType is WebSocketMessageType.Binary)
325+
{
326+
// Single read message - copy directly from rented buffer
327+
if (result.Count > 0)
328+
{
329+
var resultArray = new byte[result.Count];
330+
Buffer.BlockCopy(rentedBuffer, 0, resultArray, 0, result.Count);
331+
Logger.Trace($"Received {result.Count} bytes (single read)");
332+
return new TransportReadResult(new ReadOnlySequence<byte>(resultArray));
333+
}
334+
else
335+
{
336+
return new TransportReadResult(true);
337+
}
338+
}
339+
340+
// Message is fragmented - use MemoryStream to accumulate
303341
using (var ms = new MemoryStream())
304342
{
305-
// Read until the end of the message
306-
do
343+
// Write the first chunk
344+
if (result.Count > 0)
345+
{
346+
await ms.WriteAsync(rentedBuffer.AsMemory(0, result.Count), cancellationToken).ConfigureAwait(false);
347+
}
348+
349+
// Continue reading if message is not complete
350+
while (!result.EndOfMessage && result.MessageType is not WebSocketMessageType.Close)
307351
{
308352
result = await this.Socket.ReceiveAsync(buffer, cancellationToken).ConfigureAwait(false);
309353

@@ -314,28 +358,28 @@ public override async Task<TransportReadResult> ReadAsync(CancellationToken canc
314358
return new TransportReadResult(true);
315359
}
316360

361+
// Check if connection was closed during read
362+
if (result.CloseStatus.HasValue)
363+
{
364+
Logger.Debug($"WebSocket connection closed during read: {result.CloseStatus}");
365+
return new TransportReadResult(true);
366+
}
367+
317368
// Only write if we got actual data
318369
if (result.Count > 0)
319370
{
320-
await ms.WriteAsync(buffer.AsMemory(buffer.Offset, result.Count), cancellationToken).ConfigureAwait(false);
371+
await ms.WriteAsync(rentedBuffer.AsMemory(0, result.Count), cancellationToken).ConfigureAwait(false);
321372
}
322373
}
323-
while (!result.EndOfMessage && result.MessageType is not WebSocketMessageType.Close);
324374

325-
Logger.Trace($"Received {ms.Length} bytes");
326-
327-
// Check if connection was closed during read
328-
if (result.CloseStatus.HasValue)
329-
{
330-
Logger.Debug($"WebSocket connection closed during read: {result.CloseStatus}");
331-
return new TransportReadResult(true);
332-
}
375+
Logger.Trace($"Received {ms.Length} bytes (fragmented)");
333376

334377
// Return data only for binary messages (MQTT uses binary)
335378
if (result.MessageType is WebSocketMessageType.Binary)
336379
{
337-
// Prepare the result and return
338-
return new TransportReadResult(new ReadOnlySequence<byte>(ms.ToArray()));
380+
// Copy from MemoryStream to final array
381+
var resultArray = ms.ToArray();
382+
return new TransportReadResult(new ReadOnlySequence<byte>(resultArray));
339383
}
340384
else
341385
{
@@ -363,6 +407,11 @@ public override async Task<TransportReadResult> ReadAsync(CancellationToken canc
363407
Logger.Debug(ex, "WebSocket operation is invalid - socket may be closed");
364408
return new TransportReadResult(true);
365409
}
410+
finally
411+
{
412+
// Always return the rented buffer to the pool
413+
ArrayPool<byte>.Shared.Return(rentedBuffer);
414+
}
366415
}
367416

368417
/// <summary>

0 commit comments

Comments
 (0)