Skip to content

Commit 8a59fc6

Browse files
GerardSmitlukebakken
authored andcommitted
Added tests for non-copying body
1 parent ae7ca17 commit 8a59fc6

12 files changed

+290
-11
lines changed

projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ internal class RentedOutgoingMemory : IDisposable, IResettable
1818

1919
internal int Size => (int) Data.Length;
2020

21+
public int RentedArraySize => _rentedArray?.Length ?? 0;
22+
2123
internal ReadOnlySequence<byte> Data { get; private set; }
2224

2325
/// <summary>
@@ -100,7 +102,7 @@ bool IResettable.TryReset()
100102
return true;
101103
}
102104

103-
public static RentedOutgoingMemory GetAndInitialize(ReadOnlySequence<byte> mem, byte[] buffer, bool waitSend = false)
105+
public static RentedOutgoingMemory GetAndInitialize(ReadOnlySequence<byte> mem, byte[]? buffer = null, bool waitSend = false)
104106
{
105107
var rented = s_pool.Get();
106108

@@ -115,7 +117,7 @@ public static RentedOutgoingMemory GetAndInitialize(ReadOnlySequence<byte> mem,
115117
return rented;
116118
}
117119

118-
public static RentedOutgoingMemory GetAndInitialize(ReadOnlyMemory<byte> mem, byte[] buffer, bool waitSend = false)
120+
public static RentedOutgoingMemory GetAndInitialize(ReadOnlyMemory<byte> mem, byte[]? buffer = null, bool waitSend = false)
119121
{
120122
return GetAndInitialize(new ReadOnlySequence<byte>(mem), buffer, waitSend);
121123
}

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -763,7 +763,12 @@ public async ValueTask<IConnection> CreateConnectionAsync(IEndpointResolver endp
763763
}
764764
}
765765

766-
private ConnectionConfig CreateConfig(string clientProvidedName)
766+
internal ConnectionConfig CreateConfig()
767+
{
768+
return CreateConfig(ClientProvidedName);
769+
}
770+
771+
internal ConnectionConfig CreateConfig(string clientProvidedName)
767772
{
768773
return new ConnectionConfig(
769774
VirtualHost,

projects/RabbitMQ.Client/client/api/IChannelExtensions.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,14 @@ public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationA
140140
public static void BasicPublish(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
141141
=> channel.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
142142

143-
public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
144-
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
143+
public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false, bool? copyBody = null)
144+
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory, copyBody);
145145

146-
public static void BasicPublish(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
146+
public static void BasicPublish(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false, bool? copyBody = null)
147147
=> channel.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
148148

149-
public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
150-
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
149+
public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false, bool? copyBody = null)
150+
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory, copyBody);
151151
#nullable disable
152152

153153
/// <summary>

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ public interface IConnection : INetworkConnection, IDisposable
125125
/// </summary>
126126
IEnumerable<ShutdownReportEntry> ShutdownReport { get; }
127127

128+
/// <summary>
129+
/// The threshold for when to copy the body to a temporary array.
130+
/// </summary>
131+
int CopyBodyToMemoryThreshold { get; }
132+
128133
/// <summary>
129134
/// Application-specific connection name, will be displayed in the management UI
130135
/// if RabbitMQ server supports it. This value doesn't have to be unique and cannot

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ internal sealed partial class AutorecoveringConnection : IConnection
5050
private Connection _innerConnection;
5151
private bool _disposed;
5252

53-
private Connection InnerConnection
53+
internal Connection InnerConnection
5454
{
5555
get
5656
{
@@ -181,6 +181,8 @@ public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
181181

182182
public IEnumerable<ShutdownReportEntry> ShutdownReport => InnerConnection.ShutdownReport;
183183

184+
public int CopyBodyToMemoryThreshold => InnerConnection.CopyBodyToMemoryThreshold;
185+
184186
public IProtocol Protocol => Endpoint.Protocol;
185187

186188
public RecoveryAwareChannel CreateNonRecoveringChannel()

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ internal sealed partial class Connection : IConnection
5858
private ShutdownEventArgs? _closeReason;
5959
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
6060

61+
internal bool TrackRentedBytes = false;
62+
internal uint RentedBytes;
63+
6164
internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
6265
{
6366
_config = config;
@@ -552,9 +555,23 @@ internal void Write(RentedOutgoingMemory frames)
552555

553556
internal ValueTask WriteAsync(RentedOutgoingMemory frames)
554557
{
558+
TrackRented(frames.RentedArraySize);
559+
555560
return _frameHandler.WriteAsync(frames);
556561
}
557562

563+
private void TrackRented(int size)
564+
{
565+
if (TrackRentedBytes && size > 0)
566+
{
567+
#if NET
568+
Interlocked.Add(ref RentedBytes, (uint)size);
569+
#else
570+
Interlocked.Add(ref Unsafe.As<uint, int>(ref RentedBytes), size);
571+
#endif
572+
}
573+
}
574+
558575
public void Dispose()
559576
{
560577
if (_disposed)

projects/RabbitMQ.Client/client/impl/SessionBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader head
180180
ThrowAlreadyClosedException();
181181
}
182182

183-
copyBody ??= body.Length >= Connection.CopyBodyToMemoryThreshold;
183+
copyBody ??= body.Length > Connection.CopyBodyToMemoryThreshold;
184184

185185
return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize, copyBody.Value));
186186
}

projects/Test/AsyncIntegration/TestBasicPublishAsync.cs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,5 +67,76 @@ public async Task TestQueuePurgeAsync()
6767
Assert.True(await publishSyncSource.Task);
6868
Assert.Equal((uint)messageCount, await _channel.QueuePurgeAsync(q));
6969
}
70+
71+
[Fact]
72+
public async Task TestNonCopyingBody()
73+
{
74+
const int size = 1024;
75+
76+
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
77+
byte[] body = GetRandomBody(size);
78+
79+
uint rentedBytes;
80+
81+
using (var result = await TrackRentedBytes())
82+
{
83+
await _channel.BasicPublishAsync(string.Empty, q, body, copyBody: false);
84+
rentedBytes = result.RentedBytes;
85+
}
86+
87+
Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q));
88+
89+
// It is expected that the rented bytes is smaller than the size of the body
90+
// since we're not copying the body. Only the frame headers are rented.
91+
Assert.True(rentedBytes < size);
92+
}
93+
94+
[Fact]
95+
public async Task TestCopyingBody()
96+
{
97+
const int size = 1024;
98+
99+
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
100+
byte[] body = GetRandomBody(size);
101+
102+
uint rentedBytes;
103+
104+
using (var result = await TrackRentedBytes())
105+
{
106+
await _channel.BasicPublishAsync(string.Empty, q, body, copyBody: true);
107+
rentedBytes = result.RentedBytes;
108+
}
109+
110+
Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q));
111+
112+
// It is expected that the rented bytes is larger than the size of the body
113+
// since the body is copied with the frame headers.
114+
Assert.True(rentedBytes >= size);
115+
}
116+
117+
[Fact]
118+
public async Task TestDefaultCopyingBody()
119+
{
120+
Assert.Equal(int.MaxValue, _conn.CopyBodyToMemoryThreshold);
121+
122+
const int size = 1024;
123+
124+
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
125+
byte[] body = GetRandomBody(size);
126+
127+
uint rentedBytes;
128+
129+
using (var result = await TrackRentedBytes())
130+
{
131+
await _channel.BasicPublishAsync(string.Empty, q, body, copyBody: true);
132+
rentedBytes = result.RentedBytes;
133+
}
134+
135+
Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q));
136+
137+
// It is expected that the rented bytes is larger than the size of the body
138+
// since the body is copied with the frame headers.
139+
Assert.True(rentedBytes >= size);
140+
}
70141
}
71142
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
using System.Threading.Tasks;
2+
using RabbitMQ.Client;
3+
using Xunit;
4+
using Xunit.Abstractions;
5+
6+
namespace Test.AsyncIntegration;
7+
8+
public class TestBasicPublishCopyBodyAsync : AsyncIntegrationFixture
9+
{
10+
public TestBasicPublishCopyBodyAsync(ITestOutputHelper output) : base(output)
11+
{
12+
}
13+
14+
protected override ConnectionFactory CreateConnectionFactory()
15+
{
16+
var factory = base.CreateConnectionFactory();
17+
factory.CopyBodyToMemoryThreshold = 1024;
18+
return factory;
19+
}
20+
21+
[Theory(Skip = "Parallelization is disabled for this collection")]
22+
[InlineData(512)]
23+
[InlineData(1024)]
24+
public async Task TestNonCopyingBody(ushort size)
25+
{
26+
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
27+
byte[] body = GetRandomBody(size);
28+
29+
uint rentedBytes;
30+
31+
using (var result = await TrackRentedBytes())
32+
{
33+
await _channel.BasicPublishAsync(string.Empty, q, body);
34+
rentedBytes = result.RentedBytes;
35+
}
36+
37+
Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q));
38+
39+
// It is expected that the rented bytes is smaller than the size of the body
40+
// since we're not copying the body. Only the frame headers are rented.
41+
Assert.True(rentedBytes < size);
42+
}
43+
44+
[Theory]
45+
[InlineData(1025)]
46+
[InlineData(2048)]
47+
public async Task TestCopyingBody(ushort size)
48+
{
49+
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
50+
byte[] body = GetRandomBody(size);
51+
52+
uint rentedBytes;
53+
54+
using (var result = await TrackRentedBytes())
55+
{
56+
await _channel.BasicPublishAsync(string.Empty, q, body);
57+
rentedBytes = result.RentedBytes;
58+
}
59+
60+
Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q));
61+
62+
// It is expected that the rented bytes is larger than the size of the body
63+
// since the body is copied with the frame headers.
64+
Assert.True(rentedBytes >= size);
65+
}
66+
}

projects/Test/Common/IntegrationFixtureBase.cs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
using System.Reflection;
3939
using System.Text;
4040
using System.Threading;
41+
using System.Threading.Tasks;
4142
using RabbitMQ.Client;
4243
using RabbitMQ.Client.Exceptions;
4344
using RabbitMQ.Client.Framing.Impl;
@@ -48,6 +49,8 @@ namespace Test
4849
{
4950
public abstract class IntegrationFixtureBase : IDisposable
5051
{
52+
private readonly SemaphoreSlim _byteTrackingLock = new SemaphoreSlim(1, 1);
53+
5154
private static bool s_isRunningInCI = false;
5255
private static bool s_isWindows = false;
5356
private static bool s_isVerbose = false;
@@ -371,7 +374,7 @@ protected void Wait(ManualResetEventSlim latch, TimeSpan timeSpan, string desc)
371374
$"waiting {timeSpan.TotalSeconds} seconds on a latch for '{desc}' timed out");
372375
}
373376

374-
protected ConnectionFactory CreateConnectionFactory()
377+
protected virtual ConnectionFactory CreateConnectionFactory()
375378
{
376379
string now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture);
377380
return new ConnectionFactory
@@ -418,6 +421,39 @@ protected void HandleChannelShutdown(IChannel ch, ShutdownEventArgs args, Action
418421
a(args);
419422
}
420423

424+
protected async Task<TrackRentedByteResult> TrackRentedBytes()
425+
{
426+
Connection connection;
427+
428+
if (_conn is AutorecoveringConnection autorecoveringConnection)
429+
{
430+
connection = autorecoveringConnection.InnerConnection as Connection;
431+
}
432+
else
433+
{
434+
connection = _conn as Connection;
435+
}
436+
437+
if (connection is null)
438+
{
439+
throw new InvalidOperationException("Cannot track rented bytes without a connection");
440+
}
441+
442+
await _byteTrackingLock.WaitAsync();
443+
444+
try
445+
{
446+
connection.RentedBytes = 0;
447+
connection.TrackRentedBytes = true;
448+
return new TrackRentedByteResult(connection, _byteTrackingLock);
449+
}
450+
catch
451+
{
452+
_byteTrackingLock.Release();
453+
throw;
454+
}
455+
}
456+
421457
private static void InitIsRunningInCI()
422458
{
423459
bool ci;

0 commit comments

Comments
 (0)