Skip to content

Commit 64e7dc0

Browse files
committed
Merge branch 'main' into otel-integration-package
2 parents 7b937ab + e52d703 commit 64e7dc0

24 files changed

+188
-153
lines changed

projects/RabbitMQ.Client.OAuth2/OAuth2Client.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,11 @@ public void Dispose()
232232

233233
private Dictionary<string, string> buildRequestParameters()
234234
{
235-
var dict = new Dictionary<string, string>(_additionalRequestParameters);
236-
dict.Add(CLIENT_ID, _clientId);
237-
dict.Add(CLIENT_SECRET, _clientSecret);
235+
var dict = new Dictionary<string, string>(_additionalRequestParameters)
236+
{
237+
{ CLIENT_ID, _clientId },
238+
{ CLIENT_SECRET, _clientSecret }
239+
};
238240
if (_scope != null && _scope.Length > 0)
239241
{
240242
dict.Add(SCOPE, _scope);

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@ const RabbitMQ.Client.AmqpTcpEndpoint.DefaultAmqpSslPort = 5671 -> int
33
const RabbitMQ.Client.AmqpTcpEndpoint.UseDefaultPort = -1 -> int
44
const RabbitMQ.Client.ConnectionFactory.DefaultChannelMax = 2047 -> ushort
55
const RabbitMQ.Client.ConnectionFactory.DefaultFrameMax = 0 -> uint
6-
const RabbitMQ.Client.ConnectionFactory.DefaultMaxMessageSize = 134217728 -> uint
6+
const RabbitMQ.Client.ConnectionFactory.DefaultMaxInboundMessageBodySize = 67108864 -> uint
77
const RabbitMQ.Client.ConnectionFactory.DefaultPass = "guest" -> string
88
const RabbitMQ.Client.ConnectionFactory.DefaultUser = "guest" -> string
99
const RabbitMQ.Client.ConnectionFactory.DefaultVHost = "/" -> string
10-
const RabbitMQ.Client.ConnectionFactory.MaximumMaxMessageSize = 536870912 -> uint
1110
const RabbitMQ.Client.Constants.AccessRefused = 403 -> int
1211
const RabbitMQ.Client.Constants.ChannelError = 504 -> int
1312
const RabbitMQ.Client.Constants.CommandInvalid = 503 -> int
@@ -82,14 +81,13 @@ RabbitMQ.Client.AmqpTcpEndpoint.AddressFamily.set -> void
8281
RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint() -> void
8382
RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint(string hostName, int portOrMinusOne = -1) -> void
8483
RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl) -> void
85-
RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl, uint maxMessageSize) -> void
8684
RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint(System.Uri uri) -> void
8785
RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint(System.Uri uri, RabbitMQ.Client.SslOption ssl) -> void
8886
RabbitMQ.Client.AmqpTcpEndpoint.Clone() -> object
8987
RabbitMQ.Client.AmqpTcpEndpoint.CloneWithHostname(string hostname) -> RabbitMQ.Client.AmqpTcpEndpoint
9088
RabbitMQ.Client.AmqpTcpEndpoint.HostName.get -> string
9189
RabbitMQ.Client.AmqpTcpEndpoint.HostName.set -> void
92-
RabbitMQ.Client.AmqpTcpEndpoint.MaxMessageSize.get -> uint
90+
RabbitMQ.Client.AmqpTcpEndpoint.MaxInboundMessageBodySize.get -> uint
9391
RabbitMQ.Client.AmqpTcpEndpoint.Port.get -> int
9492
RabbitMQ.Client.AmqpTcpEndpoint.Port.set -> void
9593
RabbitMQ.Client.AmqpTcpEndpoint.Protocol.get -> RabbitMQ.Client.IProtocol
@@ -225,8 +223,8 @@ RabbitMQ.Client.ConnectionFactory.HandshakeContinuationTimeout.get -> System.Tim
225223
RabbitMQ.Client.ConnectionFactory.HandshakeContinuationTimeout.set -> void
226224
RabbitMQ.Client.ConnectionFactory.HostName.get -> string
227225
RabbitMQ.Client.ConnectionFactory.HostName.set -> void
228-
RabbitMQ.Client.ConnectionFactory.MaxMessageSize.get -> uint
229-
RabbitMQ.Client.ConnectionFactory.MaxMessageSize.set -> void
226+
RabbitMQ.Client.ConnectionFactory.MaxInboundMessageBodySize.get -> uint
227+
RabbitMQ.Client.ConnectionFactory.MaxInboundMessageBodySize.set -> void
230228
RabbitMQ.Client.ConnectionFactory.NetworkRecoveryInterval.get -> System.TimeSpan
231229
RabbitMQ.Client.ConnectionFactory.NetworkRecoveryInterval.set -> void
232230
RabbitMQ.Client.ConnectionFactory.Password.get -> string
@@ -787,6 +785,7 @@ readonly RabbitMQ.Client.ConnectionConfig.HandshakeContinuationTimeout -> System
787785
readonly RabbitMQ.Client.ConnectionConfig.HeartbeatInterval -> System.TimeSpan
788786
readonly RabbitMQ.Client.ConnectionConfig.MaxChannelCount -> ushort
789787
readonly RabbitMQ.Client.ConnectionConfig.MaxFrameSize -> uint
788+
readonly RabbitMQ.Client.ConnectionConfig.MaxInboundMessageBodySize -> uint
790789
readonly RabbitMQ.Client.ConnectionConfig.NetworkRecoveryInterval -> System.TimeSpan
791790
readonly RabbitMQ.Client.ConnectionConfig.Password -> string
792791
readonly RabbitMQ.Client.ConnectionConfig.RequestedConnectionTimeout -> System.TimeSpan
@@ -884,6 +883,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
884883
~const RabbitMQ.Client.RabbitMQActivitySource.PublisherSourceName = "RabbitMQ.Client.Publisher" -> string
885884
~const RabbitMQ.Client.RabbitMQActivitySource.SubscriberSourceName = "RabbitMQ.Client.Subscriber" -> string
886885
~override RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.Task
886+
~RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl, uint maxInboundMessageBodySize) -> void
887887
~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(RabbitMQ.Client.IEndpointResolver endpointResolver, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
888888
~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
889889
~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint> endpoints, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>

projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,22 +62,23 @@ public class AmqpTcpEndpoint
6262

6363
private int _port;
6464

65-
private readonly uint _maxMessageSize;
65+
private readonly uint _maxInboundMessageBodySize;
6666

6767
/// <summary>
6868
/// Creates a new instance of the <see cref="AmqpTcpEndpoint"/>.
6969
/// </summary>
7070
/// <param name="hostName">Hostname.</param>
7171
/// <param name="portOrMinusOne"> Port number. If the port number is -1, the default port number will be used.</param>
7272
/// <param name="ssl">Ssl option.</param>
73-
/// <param name="maxMessageSize">Maximum message size from RabbitMQ. <see cref="ConnectionFactory.MaximumMaxMessageSize"/>. It defaults to
74-
/// MaximumMaxMessageSize if the parameter is greater than MaximumMaxMessageSize.</param>
75-
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl, uint maxMessageSize)
73+
/// <param name="maxInboundMessageBodySize">Maximum message size from RabbitMQ.</param>
74+
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl,
75+
uint maxInboundMessageBodySize)
7676
{
7777
HostName = hostName;
7878
_port = portOrMinusOne;
7979
Ssl = ssl;
80-
_maxMessageSize = Math.Min(maxMessageSize, ConnectionFactory.MaximumMaxMessageSize);
80+
_maxInboundMessageBodySize = Math.Min(maxInboundMessageBodySize,
81+
InternalConstants.DefaultRabbitMqMaxInboundMessageBodySize);
8182
}
8283

8384
/// <summary>
@@ -87,7 +88,7 @@ public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl, uint
8788
/// <param name="portOrMinusOne"> Port number. If the port number is -1, the default port number will be used.</param>
8889
/// <param name="ssl">Ssl option.</param>
8990
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl) :
90-
this(hostName, portOrMinusOne, ssl, ConnectionFactory.DefaultMaxMessageSize)
91+
this(hostName, portOrMinusOne, ssl, ConnectionFactory.DefaultMaxInboundMessageBodySize)
9192
{
9293
}
9394

@@ -134,7 +135,7 @@ public AmqpTcpEndpoint(Uri uri) : this(uri.Host, uri.Port)
134135
/// <returns>A copy with the same hostname, port, and TLS settings</returns>
135136
public object Clone()
136137
{
137-
return new AmqpTcpEndpoint(HostName, _port, Ssl, _maxMessageSize);
138+
return new AmqpTcpEndpoint(HostName, _port, Ssl, _maxInboundMessageBodySize);
138139
}
139140

140141
/// <summary>
@@ -144,7 +145,7 @@ public object Clone()
144145
/// <returns>A copy with the provided hostname and port/TLS settings of this endpoint</returns>
145146
public AmqpTcpEndpoint CloneWithHostname(string hostname)
146147
{
147-
return new AmqpTcpEndpoint(hostname, _port, Ssl, _maxMessageSize);
148+
return new AmqpTcpEndpoint(hostname, _port, Ssl, _maxInboundMessageBodySize);
148149
}
149150

150151
/// <summary>
@@ -195,11 +196,11 @@ public IProtocol Protocol
195196

196197
/// <summary>
197198
/// Get the maximum size for a message in bytes.
198-
/// The default value is defined in <see cref="ConnectionFactory.DefaultMaxMessageSize"/>.
199+
/// The default value is defined in <see cref="ConnectionFactory.DefaultMaxInboundMessageBodySize"/>.
199200
/// </summary>
200-
public uint MaxMessageSize
201+
public uint MaxInboundMessageBodySize
201202
{
202-
get { return _maxMessageSize; }
203+
get { return _maxInboundMessageBodySize; }
203204
}
204205

205206
/// <summary>

projects/RabbitMQ.Client/client/api/ConnectionConfig.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ public sealed class ConnectionConfig
9090
/// </summary>
9191
public readonly uint MaxFrameSize;
9292

93+
/// <summary>
94+
/// Maximum body size of a message (in bytes).
95+
/// </summary>
96+
public readonly uint MaxInboundMessageBodySize;
97+
9398
/// <summary>
9499
/// Set to false to make automatic connection recovery not recover topology (exchanges, queues, bindings, etc).
95100
/// </summary>
@@ -149,7 +154,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
149154
ICredentialsProvider credentialsProvider, ICredentialsRefresher credentialsRefresher,
150155
IEnumerable<IAuthMechanismFactory> authMechanisms,
151156
IDictionary<string, object?> clientProperties, string? clientProvidedName,
152-
ushort maxChannelCount, uint maxFrameSize, bool topologyRecoveryEnabled,
157+
ushort maxChannelCount, uint maxFrameSize, uint maxInboundMessageBodySize, bool topologyRecoveryEnabled,
153158
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
154159
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
155160
bool dispatchConsumersAsync, int dispatchConsumerConcurrency,
@@ -165,6 +170,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
165170
ClientProvidedName = clientProvidedName;
166171
MaxChannelCount = maxChannelCount;
167172
MaxFrameSize = maxFrameSize;
173+
MaxInboundMessageBodySize = maxInboundMessageBodySize;
168174
TopologyRecoveryEnabled = topologyRecoveryEnabled;
169175
TopologyRecoveryFilter = topologyRecoveryFilter;
170176
TopologyRecoveryExceptionHandler = topologyRecoveryExceptionHandler;

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ namespace RabbitMQ.Client
5959
/// factory.VirtualHost = ConnectionFactory.DefaultVHost;
6060
/// factory.HostName = hostName;
6161
/// factory.Port = AmqpTcpEndpoint.UseDefaultPort;
62-
/// factory.MaxMessageSize = 512 * 1024 * 1024;
62+
/// factory.MaxInboundMessageBodySize = 512 * 1024 * 1024;
6363
/// //
6464
/// IConnection conn = factory.CreateConnection();
6565
/// //
@@ -107,15 +107,9 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
107107
public const uint DefaultFrameMax = 0;
108108

109109
/// <summary>
110-
/// Default value for <code>ConnectionFactory</code>'s <code>MaxMessageSize</code>.
110+
/// Default value for <code>ConnectionFactory</code>'s <code>MaxInboundMessageBodySize</code>.
111111
/// </summary>
112-
public const uint DefaultMaxMessageSize = 134217728;
113-
/// <summary>
114-
/// Largest message size, in bytes, allowed in RabbitMQ.
115-
/// Note: <code>rabbit.max_message_size</code> setting (https://www.rabbitmq.com/configure.html)
116-
/// configures the largest message size which should be lower than this maximum of 536 Mbs.
117-
/// </summary>
118-
public const uint MaximumMaxMessageSize = 536870912;
112+
public const uint DefaultMaxInboundMessageBodySize = 1_048_576 * 64;
119113

120114
/// <summary>
121115
/// Default value for desired heartbeat interval. Default is 60 seconds,
@@ -291,13 +285,13 @@ public ConnectionFactory()
291285
/// </summary>
292286
public AmqpTcpEndpoint Endpoint
293287
{
294-
get { return new AmqpTcpEndpoint(HostName, Port, Ssl, MaxMessageSize); }
288+
get { return new AmqpTcpEndpoint(HostName, Port, Ssl, MaxInboundMessageBodySize); }
295289
set
296290
{
297291
Port = value.Port;
298292
HostName = value.HostName;
299293
Ssl = value.Ssl;
300-
MaxMessageSize = value.MaxMessageSize;
294+
MaxInboundMessageBodySize = value.MaxInboundMessageBodySize;
301295
}
302296
}
303297

@@ -359,7 +353,7 @@ public AmqpTcpEndpoint Endpoint
359353
/// Maximum allowed message size, in bytes, from RabbitMQ.
360354
/// Corresponds to the <code>ConnectionFactory.DefaultMaxMessageSize</code> setting.
361355
/// </summary>
362-
public uint MaxMessageSize { get; set; } = DefaultMaxMessageSize;
356+
public uint MaxInboundMessageBodySize { get; set; } = DefaultMaxInboundMessageBodySize;
363357

364358
/// <summary>
365359
/// The uri to use for the connection.
@@ -484,7 +478,7 @@ public Task<IConnection> CreateConnectionAsync(IEnumerable<string> hostnames,
484478
public Task<IConnection> CreateConnectionAsync(IEnumerable<string> hostnames, string clientProvidedName,
485479
CancellationToken cancellationToken = default)
486480
{
487-
IEnumerable<AmqpTcpEndpoint> endpoints = hostnames.Select(h => new AmqpTcpEndpoint(h, Port, Ssl, MaxMessageSize));
481+
IEnumerable<AmqpTcpEndpoint> endpoints = hostnames.Select(h => new AmqpTcpEndpoint(h, Port, Ssl, MaxInboundMessageBodySize));
488482
return CreateConnectionAsync(EndpointResolverFactory(endpoints), clientProvidedName, cancellationToken);
489483
}
490484

@@ -602,6 +596,7 @@ private ConnectionConfig CreateConfig(string clientProvidedName)
602596
clientProvidedName,
603597
RequestedChannelMax,
604598
RequestedFrameMax,
599+
MaxInboundMessageBodySize,
605600
TopologyRecoveryEnabled,
606601
TopologyRecoveryFilter,
607602
TopologyRecoveryExceptionHandler,

projects/RabbitMQ.Client/client/api/InternalConstants.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,12 @@ internal static class InternalConstants
3737
{
3838
internal static readonly TimeSpan DefaultConnectionAbortTimeout = TimeSpan.FromSeconds(5);
3939
internal static readonly TimeSpan DefaultConnectionCloseTimeout = TimeSpan.FromSeconds(30);
40+
41+
/// <summary>
42+
/// Largest message size, in bytes, allowed in RabbitMQ.
43+
/// Note: <code>rabbit.max_message_size</code> setting (https://www.rabbitmq.com/configure.html)
44+
/// configures the largest message size which should be lower than this maximum of 128MiB.
45+
/// </summary>
46+
internal const uint DefaultRabbitMqMaxInboundMessageBodySize = 1_048_576 * 128;
4047
}
4148
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ private void DoDeleteAutoDeleteExchange(string exchangeName)
170170

171171
bool AnyBindingsOnExchange(string exchange)
172172
{
173-
foreach (var recordedBinding in _recordedBindings)
173+
foreach (RecordedBinding recordedBinding in _recordedBindings)
174174
{
175175
if (recordedBinding.Source == exchange)
176176
{
@@ -400,15 +400,15 @@ await _recordedEntitiesSemaphore.WaitAsync()
400400

401401
private void DoDeleteRecordedConsumer(string consumerTag)
402402
{
403-
if (_recordedConsumers.Remove(consumerTag, out var recordedConsumer))
403+
if (_recordedConsumers.Remove(consumerTag, out RecordedConsumer recordedConsumer))
404404
{
405405
DeleteAutoDeleteQueue(recordedConsumer.Queue);
406406
}
407407
}
408408

409409
private void DeleteAutoDeleteQueue(string queue)
410410
{
411-
if (_recordedQueues.TryGetValue(queue, out var recordedQueue) && recordedQueue.AutoDelete)
411+
if (_recordedQueues.TryGetValue(queue, out RecordedQueue recordedQueue) && recordedQueue.AutoDelete)
412412
{
413413
// last consumer on this connection is gone, remove recorded queue if it is auto-deleted.
414414
if (!AnyConsumersOnQueue(queue))
@@ -420,7 +420,7 @@ private void DeleteAutoDeleteQueue(string queue)
420420

421421
private bool AnyConsumersOnQueue(string queue)
422422
{
423-
foreach (var pair in _recordedConsumers)
423+
foreach (KeyValuePair<string, RecordedConsumer> pair in _recordedConsumers)
424424
{
425425
if (pair.Value.Queue == queue)
426426
{

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ private void OnChannelShutdown(ShutdownEventArgs reason)
488488
if (_confirmsTaskCompletionSources?.Count > 0)
489489
{
490490
var exception = new AlreadyClosedException(reason);
491-
foreach (var confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
491+
foreach (TaskCompletionSource<bool> confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
492492
{
493493
confirmsTaskCompletionSource.TrySetException(exception);
494494
}
@@ -635,7 +635,7 @@ protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
635635
if (_pendingDeliveryTags.Count == 0 && _confirmsTaskCompletionSources.Count > 0)
636636
{
637637
// Done, mark tasks
638-
foreach (var confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
638+
foreach (TaskCompletionSource<bool> confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
639639
{
640640
confirmsTaskCompletionSource.TrySetResult(_onlyAcksReceived);
641641
}
@@ -754,7 +754,7 @@ protected async Task<bool> HandleChannelCloseOkAsync(IncomingCommand cmd, Cancel
754754
*/
755755
FinishClose();
756756

757-
if (_continuationQueue.TryPeek<ChannelCloseAsyncRpcContinuation>(out var k))
757+
if (_continuationQueue.TryPeek<ChannelCloseAsyncRpcContinuation>(out ChannelCloseAsyncRpcContinuation k))
758758
{
759759
_continuationQueue.Next();
760760
await k.HandleCommandAsync(cmd)
@@ -1891,7 +1891,8 @@ private static BasicProperties PopulateActivityAndPropagateTraceId<TProperties>(
18911891
props = new BasicProperties();
18921892
}
18931893

1894-
var headers = props.Headers ?? new Dictionary<string, object>();
1894+
IDictionary<string, object> headers = props.Headers ?? new Dictionary<string, object>();
1895+
18951896
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
18961897
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
18971898
props.Headers = headers;

0 commit comments

Comments
 (0)