Skip to content

Commit 892088d

Browse files
GerardSmitlukebakken
authored andcommitted
Reduce memory usage by using the body directly in the pipe
1 parent fb94c1d commit 892088d

File tree

14 files changed

+370
-55
lines changed

14 files changed

+370
-55
lines changed

projects/Benchmarks/WireFormatting/MethodFraming.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class MethodFramingBasicAck
1919
public ushort Channel { get; set; }
2020

2121
[Benchmark]
22-
internal RentedMemory BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
22+
internal RentedOutgoingMemory BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
2323
}
2424

2525
[Config(typeof(Config))]
@@ -41,13 +41,13 @@ public class MethodFramingBasicPublish
4141
public int FrameMax { get; set; }
4242

4343
[Benchmark]
44-
internal RentedMemory BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax);
44+
internal RentedOutgoingMemory BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax);
4545

4646
[Benchmark]
47-
internal RentedMemory BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
47+
internal RentedOutgoingMemory BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
4848

4949
[Benchmark]
50-
internal RentedMemory BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
50+
internal RentedOutgoingMemory BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
5151
}
5252

5353
[Config(typeof(Config))]
@@ -60,6 +60,6 @@ public class MethodFramingChannelClose
6060
public ushort Channel { get; set; }
6161

6262
[Benchmark]
63-
internal RentedMemory ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
63+
internal RentedOutgoingMemory ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
6464
}
6565
}

projects/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767

6868
<ItemGroup>
6969
<PackageReference Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" Version="3.3.4" PrivateAssets="all" />
70+
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="8.0.0" />
7071
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" PrivateAssets="all" />
7172
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="all" />
7273
<PackageReference Include="MinVer" Version="4.3.0" PrivateAssets="all" />
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
using System;
2+
using System.Buffers;
3+
4+
namespace RabbitMQ.Client;
5+
6+
internal ref struct ChunkedSequence<T>
7+
{
8+
private ReadOnlyChunk _first;
9+
private ReadOnlyChunk _current;
10+
11+
private bool _changed;
12+
private ReadOnlySequence<T>? _sequence;
13+
14+
public ChunkedSequence()
15+
{
16+
_first = _current = null;
17+
_sequence = null;
18+
_changed = false;
19+
}
20+
21+
public void Append(ReadOnlySequence<T> sequence)
22+
{
23+
SequencePosition pos = sequence.Start;
24+
while (sequence.TryGet(ref pos, out ReadOnlyMemory<T> mem))
25+
{
26+
Append(mem);
27+
}
28+
}
29+
30+
public void Append(ReadOnlyMemory<T> memory)
31+
{
32+
if (_current == null)
33+
{
34+
_first = _current = new ReadOnlyChunk(memory);
35+
}
36+
else
37+
{
38+
_current = _current.Append(memory);
39+
}
40+
41+
_changed = true;
42+
}
43+
44+
internal ReadOnlySequence<T> GetSequence()
45+
{
46+
if (_changed)
47+
{
48+
_sequence = new ReadOnlySequence<T>(_first, 0, _current, _current.Memory.Length);
49+
}
50+
else
51+
{
52+
_sequence ??= new ReadOnlySequence<T>();
53+
}
54+
55+
return _sequence.Value;
56+
}
57+
58+
private sealed class ReadOnlyChunk : ReadOnlySequenceSegment<T>
59+
{
60+
public ReadOnlyChunk(ReadOnlyMemory<T> memory)
61+
{
62+
Memory = memory;
63+
}
64+
65+
public ReadOnlyChunk Append(ReadOnlyMemory<T> memory)
66+
{
67+
ReadOnlyChunk nextChunk = new(memory)
68+
{
69+
RunningIndex = RunningIndex + Memory.Length
70+
};
71+
72+
Next = nextChunk;
73+
return nextChunk;
74+
}
75+
}
76+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#nullable enable
2+
3+
using System;
4+
using System.Buffers;
5+
using System.IO.Pipelines;
6+
using System.Threading.Tasks;
7+
using Microsoft.Extensions.ObjectPool;
8+
9+
namespace RabbitMQ.Client
10+
{
11+
internal class RentedOutgoingMemory : IDisposable, IResettable
12+
{
13+
private static readonly ObjectPool<RentedOutgoingMemory> s_pool = ObjectPool.Create<RentedOutgoingMemory>();
14+
15+
private bool _disposedValue;
16+
private TaskCompletionSource<bool>? _sendCompletionSource;
17+
18+
internal ReadOnlySequence<byte> Data;
19+
internal byte[]? RentedArray;
20+
21+
internal int Size => (int) Data.Length;
22+
23+
/// <summary>
24+
/// Mark the data as sent.
25+
/// </summary>
26+
public void DidSend()
27+
{
28+
if (_sendCompletionSource is null)
29+
{
30+
Dispose();
31+
}
32+
else
33+
{
34+
_sendCompletionSource.SetResult(true);
35+
}
36+
}
37+
38+
/// <summary>
39+
/// Wait for the data to be sent.
40+
/// </summary>
41+
/// <returns>A <see cref="ValueTask"/> that completes when the data is sent.</returns>
42+
public ValueTask WaitForDataSendAsync()
43+
{
44+
return _sendCompletionSource is null ? default : WaitForFinishCore();
45+
46+
async ValueTask WaitForFinishCore()
47+
{
48+
await _sendCompletionSource.Task.ConfigureAwait(false);
49+
Dispose();
50+
}
51+
}
52+
53+
public void WriteTo(PipeWriter pipeWriter)
54+
{
55+
foreach (ReadOnlyMemory<byte> memory in Data)
56+
{
57+
pipeWriter.Write(memory.Span);
58+
}
59+
}
60+
61+
private void Dispose(bool disposing)
62+
{
63+
if (_disposedValue)
64+
{
65+
return;
66+
}
67+
68+
if (disposing && RentedArray != null)
69+
{
70+
ClientArrayPool.Return(RentedArray);
71+
}
72+
73+
_disposedValue = true;
74+
}
75+
76+
public void Dispose()
77+
{
78+
Dispose(disposing: true);
79+
GC.SuppressFinalize(this);
80+
}
81+
82+
bool IResettable.TryReset()
83+
{
84+
if (!_disposedValue)
85+
{
86+
return false;
87+
}
88+
89+
_disposedValue = false;
90+
RentedArray = default;
91+
_sendCompletionSource = default;
92+
Data = default;
93+
return true;
94+
}
95+
96+
public static RentedOutgoingMemory Create(ReadOnlySequence<byte> mem, byte[] buffer, bool waitSend = false)
97+
{
98+
var rented = s_pool.Get();
99+
100+
rented.Data = mem;
101+
rented.RentedArray = buffer;
102+
103+
if (waitSend)
104+
{
105+
rented._sendCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
106+
}
107+
108+
return rented;
109+
}
110+
111+
public static RentedOutgoingMemory Create(ReadOnlyMemory<byte> mem, byte[] buffer, bool waitSend = false)
112+
{
113+
return Create(new ReadOnlySequence<byte>(mem), buffer, waitSend);
114+
}
115+
}
116+
}

projects/RabbitMQ.Client/client/api/IChannel.cs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Buffers;
3334
using System.Collections.Generic;
3435
using System.Threading;
3536
using System.Threading.Tasks;
@@ -258,7 +259,7 @@ void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, i
258259
/// Routing key must be shorter than 255 bytes.
259260
/// </para>
260261
/// </remarks>
261-
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
262+
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false, bool copyBody = true)
262263
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
263264

264265
/// <summary>
@@ -269,7 +270,29 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in
269270
/// Routing key must be shorter than 255 bytes.
270271
/// </para>
271272
/// </remarks>
272-
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
273+
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body = default, bool mandatory = false, bool copyBody = false)
274+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
275+
276+
/// <summary>
277+
/// Asynchronously publishes a message.
278+
/// </summary>
279+
/// <remarks>
280+
/// <para>
281+
/// Routing key must be shorter than 255 bytes.
282+
/// </para>
283+
/// </remarks>
284+
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false, bool copyBody = true)
285+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
286+
287+
/// <summary>
288+
/// Asynchronously publishes a message.
289+
/// </summary>
290+
/// <remarks>
291+
/// <para>
292+
/// Routing key must be shorter than 255 bytes.
293+
/// </para>
294+
/// </remarks>
295+
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body = default, bool mandatory = false, bool copyBody = true)
273296
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
274297

275298
#nullable disable

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Buffers;
3334
using System.Collections.Generic;
3435
using System.Runtime.CompilerServices;
3536
using System.Threading;
@@ -327,13 +328,21 @@ public void BasicPublish<TProperties>(CachedString exchange, CachedString routin
327328
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
328329
=> InnerChannel.BasicPublish(exchange, routingKey, in basicProperties, body, mandatory);
329330

330-
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
331+
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory, bool copyBody = true)
331332
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
332-
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory);
333+
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody);
333334

334-
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
335+
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body, bool mandatory, bool copyBody = true)
335336
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
336-
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory);
337+
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody);
338+
339+
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory, bool copyBody = true)
340+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
341+
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody);
342+
343+
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body, bool mandatory, bool copyBody = true)
344+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
345+
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody);
337346

338347
public void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
339348
{

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Buffers;
3334
using System.Collections.Generic;
3435
using System.Diagnostics;
3536
using System.IO;
@@ -507,7 +508,7 @@ protected void ChannelSend<TMethod, THeader>(in TMethod method, in THeader heade
507508
}
508509

509510
[MethodImpl(MethodImplOptions.AggressiveInlining)]
510-
protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlyMemory<byte> body)
511+
protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlySequence<byte> body, bool copyBody)
511512
where TMethod : struct, IOutgoingAmqpMethod
512513
where THeader : IAmqpHeader
513514
{
@@ -516,7 +517,7 @@ protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THead
516517
_flowControlBlock.Wait();
517518
}
518519

519-
return Session.TransmitAsync(in method, in header, body);
520+
return Session.TransmitAsync(in method, in header, body, copyBody);
520521
}
521522

522523
internal void OnCallbackException(CallbackExceptionEventArgs args)
@@ -1258,7 +1259,7 @@ public void BasicPublish<TProperties>(CachedString exchange, CachedString routin
12581259
ChannelSend(in cmd, in basicProperties, body);
12591260
}
12601261

1261-
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
1262+
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body, bool mandatory, bool copyBody = true)
12621263
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
12631264
{
12641265
if (NextPublishSeqNo > 0)
@@ -1270,10 +1271,16 @@ public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingK
12701271
}
12711272

12721273
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
1273-
return ModelSendAsync(in cmd, in basicProperties, body);
1274+
return ModelSendAsync(in cmd, in basicProperties, body, copyBody);
12741275
}
12751276

1276-
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
1277+
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory, bool copyBody = true)
1278+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
1279+
{
1280+
return BasicPublishAsync(exchange, routingKey, in basicProperties, new ReadOnlySequence<byte>(body), mandatory, copyBody);
1281+
}
1282+
1283+
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body, bool mandatory, bool copyBody = true)
12771284
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
12781285
{
12791286
if (NextPublishSeqNo > 0)
@@ -1285,7 +1292,13 @@ public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedStr
12851292
}
12861293

12871294
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
1288-
return ModelSendAsync(in cmd, in basicProperties, body);
1295+
return ModelSendAsync(in cmd, in basicProperties, body, copyBody);
1296+
}
1297+
1298+
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory, bool copyBody = true)
1299+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
1300+
{
1301+
return BasicPublishAsync(exchange, routingKey, in basicProperties, new ReadOnlySequence<byte>(body), mandatory, copyBody);
12891302
}
12901303

12911304
public void UpdateSecret(string newSecret, string reason)

0 commit comments

Comments
 (0)