Skip to content

Commit 7378ea1

Browse files
committed
udpate rabbitmq to 4.2
implement the direct reply queue address Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent b2f86cc commit 7378ea1

File tree

11 files changed

+139
-30
lines changed

11 files changed

+139
-30
lines changed

.ci/ubuntu/cluster/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ function run_docker_compose
1919
docker compose --file "$script_dir/docker-compose.yml" $@
2020
}
2121

22-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management}"
22+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-management}"
2323

2424
if [[ ! -v GITHUB_ACTIONS ]]
2525
then

.ci/ubuntu/one-node/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
1010

1111

12-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management}"
12+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-management}"
1313

1414

1515

RabbitMQ.AMQP.Client/IConsumer.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ public interface IConsumer : ILifeCycle
3737
/// Returns the number of unsettled messages.
3838
/// </summary>
3939
long UnsettledMessageCount { get; }
40+
41+
/// <summary>
42+
/// Returns queue address the consumer is consuming from.
43+
/// </summary>
44+
string? QueueAddress { get; }
4045
}
4146

4247
public interface IContext

RabbitMQ.AMQP.Client/IConsumerBuilder.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,18 @@ public enum StreamOffsetSpecification
1919
public interface IConsumerBuilder
2020
{
2121
IConsumerBuilder Queue(IQueueSpecification queueSpecification);
22-
IConsumerBuilder Queue(string queueName);
22+
IConsumerBuilder Queue(string? queueName);
23+
24+
/// <summary>
25+
/// If direct reply-to is enabled, the client will use the direct reply-to feature of AMQP 1.0.
26+
/// The server must also support direct reply-to.
27+
/// This feature allows the server to send the reply directly to the client without going through a reply queue.
28+
/// This can improve performance and reduce latency.
29+
/// Default is false.
30+
/// https://www.rabbitmq.com/docs/direct-reply-to
31+
/// </summary>
32+
33+
IConsumerBuilder DirectReplyTo(bool directReplyTo);
2334

2435
IConsumerBuilder MessageHandler(MessageHandler handler);
2536

RabbitMQ.AMQP.Client/IRequester.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
namespace RabbitMQ.AMQP.Client
1010
{
11-
1211
public interface IRequesterAddressBuilder : IAddressBuilder<IRequesterAddressBuilder>
1312
{
1413
IRequesterBuilder Requester();
@@ -66,7 +65,6 @@ public interface IRequesterBuilder
6665
/// </summary>
6766
/// <param name="correlationIdSupplier"></param>
6867
/// <returns></returns>
69-
7068
IRequesterBuilder CorrelationIdSupplier(Func<object>? correlationIdSupplier);
7169

7270
/// <summary>
@@ -75,6 +73,7 @@ public interface IRequesterBuilder
7573
/// <param name="timeout"></param>
7674
/// <returns></returns>
7775
IRequesterBuilder Timeout(TimeSpan timeout);
76+
7877
/// <summary>
7978
/// Build and return the RPC client.
8079
/// </summary>

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ private enum PauseStatus
3838
private readonly Guid _id = Guid.NewGuid();
3939

4040
private ReceiverLink? _receiverLink;
41+
private Attach? _attach;
4142

4243
private PauseStatus _pauseStatus = PauseStatus.UNPAUSED;
4344
private readonly UnsettledMessageCounter _unsettledMessageCounter = new();
@@ -57,8 +58,8 @@ public override async Task OpenAsync()
5758
{
5859
try
5960
{
60-
TaskCompletionSource<ReceiverLink> attachCompletedTcs =
61-
Utils.CreateTaskCompletionSource<ReceiverLink>();
61+
TaskCompletionSource<(ReceiverLink, Attach)> attachCompletedTcs =
62+
Utils.CreateTaskCompletionSource<(ReceiverLink, Attach)>();
6263

6364
// this is an event to get the filters to the listener context
6465
// it _must_ be here because in case of reconnect the original filters could be not valid anymore
@@ -72,14 +73,24 @@ public override async Task OpenAsync()
7273
_configuration.ListenerContext(listenerContext);
7374
}
7475

75-
Attach attach = Utils.CreateAttach(_configuration.Address, DeliveryMode.AtLeastOnce, _id,
76-
_configuration.Filters);
76+
Attach attach;
77+
78+
if (_configuration.DirectReplyTo)
79+
{
80+
attach = Utils.CreateDirectReplyToAttach(_id, _configuration.Filters);
81+
}
82+
else
83+
{
84+
string address = AddressBuilderHelper.AddressBuilder().Queue(_configuration.Queue).Address();
85+
attach = Utils.CreateAttach(address, DeliveryMode.AtLeastOnce, _id,
86+
_configuration.Filters);
87+
}
7788

7889
void OnAttached(ILink argLink, Attach argAttach)
7990
{
8091
if (argLink is ReceiverLink link)
8192
{
82-
attachCompletedTcs.SetResult(link);
93+
attachCompletedTcs.SetResult((link, argAttach));
8394
}
8495
else
8596
{
@@ -103,10 +114,10 @@ void OnAttached(ILink argLink, Attach argAttach)
103114
// which tells me it allows the .NET runtime to process
104115
await Task.Delay(ConsumerDefaults.AttachDelayMilliseconds).ConfigureAwait(false);
105116

106-
_receiverLink = await attachCompletedTcs.Task.WaitAsync(waitSpan)
117+
(_receiverLink, _attach) = await attachCompletedTcs.Task.WaitAsync(waitSpan)
107118
.ConfigureAwait(false);
108119

109-
if (false == Object.ReferenceEquals(_receiverLink, tmpReceiverLink))
120+
if (!ReferenceEquals(_receiverLink, tmpReceiverLink))
110121
{
111122
// TODO log this case?
112123
}
@@ -136,7 +147,7 @@ private void ValidateReceiverLink()
136147

137148
if (_receiverLink.LinkState != LinkState.Attached)
138149
{
139-
var errorMessage = _receiverLink.Error?.ToString() ?? "Unknown error";
150+
string errorMessage = _receiverLink.Error?.ToString() ?? "Unknown error";
140151
throw new ConsumerException($"{ToString()} Receiver link not attached. Error: {errorMessage}");
141152
}
142153
}
@@ -245,6 +256,15 @@ ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
245256
/// </summary>
246257
public long UnsettledMessageCount => _unsettledMessageCounter.Get();
247258

259+
public string? QueueAddress
260+
{
261+
get
262+
{
263+
string? x = _attach?.Source is not Source source ? null : source.Address;
264+
return x;
265+
}
266+
}
267+
248268
/// <summary>
249269
/// Request to receive messages again.
250270
/// </summary>
@@ -298,7 +318,8 @@ await _receiverLink.CloseAsync(TimeSpan.FromSeconds(5))
298318

299319
public override string ToString()
300320
{
301-
return $"Consumer{{Address='{_configuration.Address}', " +
321+
string address = AddressBuilderHelper.AddressBuilder().Queue(_configuration.Queue).Address();
322+
return $"Consumer{{Address='{address}', " +
302323
$"id={_id}, " +
303324
$"Connection='{_amqpConnection}', " +
304325
$"State='{State}'}}";

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,25 @@ namespace RabbitMQ.AMQP.Client.Impl
1616
/// </summary>
1717
internal sealed class ConsumerConfiguration
1818
{
19-
public string Address { get; set; } = "";
19+
20+
public string? Queue { get; set; } = null;
2021
public int InitialCredits { get; set; } = 100; // TODO use constant, check with Java lib
2122

2223
public Map Filters { get; set; } = new();
2324

2425
// TODO is a MessageHandler *really* optional???
2526
public MessageHandler? Handler { get; set; }
2627

28+
/// <summary>
29+
/// If direct reply-to is enabled, the client will use the direct reply-to feature of AMQP 1.0.
30+
/// The server must also support direct reply-to.
31+
/// This feature allows the server to send the reply directly to the client without going through a reply queue.
32+
/// This can improve performance and reduce latency.
33+
/// Default is false.
34+
/// https://www.rabbitmq.com/docs/direct-reply-to
35+
/// </summary>
36+
public bool DirectReplyTo { get; set; }
37+
2738
// TODO re-name to ListenerContextAction? Callback?
2839
public Action<IConsumerBuilder.ListenerContext>? ListenerContext = null;
2940
}
@@ -49,10 +60,9 @@ public IConsumerBuilder Queue(IQueueSpecification queueSpec)
4960
return Queue(queueSpec.QueueName);
5061
}
5162

52-
public IConsumerBuilder Queue(string queueName)
63+
public IConsumerBuilder Queue(string? queueName)
5364
{
54-
string address = AddressBuilderHelper.AddressBuilder().Queue(queueName).Address();
55-
_configuration.Address = address;
65+
_configuration.Queue = queueName;
5666
return this;
5767
}
5868

@@ -62,6 +72,12 @@ public IConsumerBuilder MessageHandler(MessageHandler handler)
6272
return this;
6373
}
6474

75+
public IConsumerBuilder DirectReplyTo(bool directReplyTo)
76+
{
77+
_configuration.DirectReplyTo = directReplyTo;
78+
return this;
79+
}
80+
6581
public IConsumerBuilder InitialCredits(int initialCredits)
6682
{
6783
_configuration.InitialCredits = initialCredits;
@@ -87,10 +103,10 @@ public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationTo
87103
}
88104

89105
if (_configuration.Filters[Consts.s_sqlFilterSymbol] is not null &&
90-
_amqpConnection._featureFlags.IsSqlFeatureEnabled == false)
106+
!_amqpConnection._featureFlags.IsSqlFeatureEnabled)
91107
{
92-
throw new ConsumerException("SQL filter is not supported by the connection." +
93-
"RabbitMQ 4.2.0 or later is required.");
108+
throw new NotSupportedException("SQL filter is not supported by the connection." +
109+
"RabbitMQ 4.2.0 or later is required.");
94110
}
95111

96112
AmqpConsumer consumer = new(_amqpConnection, _configuration, _metricsReporter);

RabbitMQ.AMQP.Client/Impl/AmqpRequester.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ public class RpcClientConfiguration
1313
{
1414
public AmqpConnection Connection { get; set; } = null!;
1515
public string ReplyToQueue { get; set; } = "";
16+
17+
internal string ReplyToQueueAddress { get; set; } = "";
18+
1619
public string RequestAddress { get; set; } = "";
1720
public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(10);
1821

@@ -134,7 +137,7 @@ private IMessage RequestPostProcess(IMessage request, object correlationId)
134137
return _configuration.RequestPostProcessor(request, correlationId);
135138
}
136139

137-
return request.ReplyTo(AddressBuilderHelper.AddressBuilder().Queue(_configuration.ReplyToQueue).Address())
140+
return request.ReplyTo(_configuration.ReplyToQueueAddress)
138141
.MessageId(correlationId);
139142
}
140143

@@ -145,17 +148,20 @@ public AmqpRequester(RpcClientConfiguration configuration)
145148

146149
public override async Task OpenAsync()
147150
{
148-
if (string.IsNullOrEmpty(_configuration.ReplyToQueue))
151+
bool isDirectReplyToSupported = _configuration.Connection._featureFlags.IsDirectReplyToSupported;
152+
string queueReplyTo = _configuration.ReplyToQueue;
153+
if (string.IsNullOrEmpty(_configuration.ReplyToQueue) && !isDirectReplyToSupported)
149154
{
150155
IQueueInfo queueInfo = await _configuration.Connection.Management().Queue().AutoDelete(true)
151156
.Exclusive(true).DeclareAsync()
152157
.ConfigureAwait(false);
153-
_configuration.ReplyToQueue = queueInfo.Name();
158+
queueReplyTo = queueInfo.Name();
154159
}
155160

156161
_publisher = await _configuration.Connection.PublisherBuilder().BuildAsync().ConfigureAwait(false);
157162
_consumer = await _configuration.Connection.ConsumerBuilder()
158-
.Queue(_configuration.ReplyToQueue)
163+
.Queue(queueReplyTo)
164+
.DirectReplyTo(isDirectReplyToSupported)
159165
.MessageHandler((context, message) =>
160166
{
161167
// TODO MessageHandler funcs should catch all exceptions
@@ -165,9 +171,14 @@ public override async Task OpenAsync()
165171
{
166172
request.SetResult(message);
167173
}
174+
168175
return Task.CompletedTask;
169176
}).BuildAndStartAsync().ConfigureAwait(false);
170177

178+
if (_consumer.QueueAddress is not null)
179+
{
180+
_configuration.ReplyToQueueAddress = _consumer.QueueAddress;
181+
}
171182
await base.OpenAsync().ConfigureAwait(false);
172183
}
173184

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,12 @@ RabbitMQ.AMQP.Client.IConnection.RequesterBuilder() -> RabbitMQ.AMQP.Client.IReq
171171
RabbitMQ.AMQP.Client.IConnection.ResponderBuilder() -> RabbitMQ.AMQP.Client.IResponderBuilder!
172172
RabbitMQ.AMQP.Client.IConsumer
173173
RabbitMQ.AMQP.Client.IConsumer.Pause() -> void
174+
RabbitMQ.AMQP.Client.IConsumer.QueueAddress.get -> string?
174175
RabbitMQ.AMQP.Client.IConsumer.Unpause() -> void
175176
RabbitMQ.AMQP.Client.IConsumer.UnsettledMessageCount.get -> long
176177
RabbitMQ.AMQP.Client.IConsumerBuilder
177178
RabbitMQ.AMQP.Client.IConsumerBuilder.BuildAndStartAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IConsumer!>!
179+
RabbitMQ.AMQP.Client.IConsumerBuilder.DirectReplyTo(bool directReplyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
178180
RabbitMQ.AMQP.Client.IConsumerBuilder.InitialCredits(int initialCredits) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
179181
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions
180182
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
@@ -208,7 +210,7 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext.ListenerContext(RabbitMQ.A
208210
RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext.StreamOptions.get -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
209211
RabbitMQ.AMQP.Client.IConsumerBuilder.MessageHandler(RabbitMQ.AMQP.Client.MessageHandler! handler) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
210212
RabbitMQ.AMQP.Client.IConsumerBuilder.Queue(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpecification) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
211-
RabbitMQ.AMQP.Client.IConsumerBuilder.Queue(string! queueName) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
213+
RabbitMQ.AMQP.Client.IConsumerBuilder.Queue(string? queueName) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
212214
RabbitMQ.AMQP.Client.IConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
213215
RabbitMQ.AMQP.Client.IConsumerBuilder.SubscriptionListener(System.Action<RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext!>! listenerContext) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
214216
RabbitMQ.AMQP.Client.IContext
@@ -364,15 +366,17 @@ RabbitMQ.AMQP.Client.Impl.AmqpConnection.RequesterBuilder() -> RabbitMQ.AMQP.Cli
364366
RabbitMQ.AMQP.Client.Impl.AmqpConnection.ResponderBuilder() -> RabbitMQ.AMQP.Client.IResponderBuilder!
365367
RabbitMQ.AMQP.Client.Impl.AmqpConsumer
366368
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Pause() -> void
369+
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.QueueAddress.get -> string?
367370
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Unpause() -> void
368371
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.UnsettledMessageCount.get -> long
369372
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder
370373
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.AmqpConsumerBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection, RabbitMQ.AMQP.Client.IMetricsReporter? metricsReporter) -> void
371374
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.BuildAndStartAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IConsumer!>!
375+
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.DirectReplyTo(bool directReplyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
372376
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.InitialCredits(int initialCredits) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
373377
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.MessageHandler(RabbitMQ.AMQP.Client.MessageHandler! handler) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
374378
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Queue(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpec) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
375-
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Queue(string! queueName) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
379+
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Queue(string? queueName) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
376380
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
377381
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.SubscriptionListener(System.Action<RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext!>! context) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
378382
RabbitMQ.AMQP.Client.Impl.AmqpEnvironment

RabbitMQ.AMQP.Client/Utils.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,47 @@ internal static void ValidatePositive(string label, long value)
115115
}
116116
}
117117

118+
internal static Attach CreateDirectReplyToAttach(Guid linkId, Map? sourceFilter = null)
119+
{
120+
// Attach{name='receiver-ID:f99ef5a7-7b91-48cb-a52e-d86e6f14f33b:3:2:1:1',
121+
// handle=null, role=RECEIVER, sndSettleMode=SETTLED, rcvSettleMode=FIRST,
122+
// source=Source{address='null', durable=NONE, expiryPolicy=LINK_DETACH, timeout=0,
123+
// dynamic=true, dynamicNodeProperties=null, distributionMode=null, filter=null,
124+
// defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=false, messageAnnotations=null},
125+
// outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list],
126+
// capabilities=[rabbitmq:volatile-queue]}, target=Target{address='null', durable=NONE,
127+
// expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null},
128+
// unsettled=null, incompleteUnsettled=null, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null,
129+
// desiredCapabilities=null, properties=null}
130+
131+
var a = new Attach()
132+
{
133+
SndSettleMode = SenderSettleMode.Settled,
134+
// LinkName = $"receiver-ID:{linkId.ToString()}",
135+
RcvSettleMode = ReceiverSettleMode.First,
136+
Source = new Source()
137+
{
138+
Capabilities = new Symbol[] { new("rabbitmq:volatile-queue") },
139+
ExpiryPolicy = new Symbol("link-detach"),
140+
Dynamic = true,
141+
Timeout = 0,
142+
FilterSet = sourceFilter,
143+
DefaultOutcome = new Modified()
144+
{
145+
DeliveryFailed = true,
146+
UndeliverableHere = false,
147+
}
148+
},
149+
// Target = new Target()
150+
// {
151+
// ExpiryPolicy = new Symbol("SESSION_END"),
152+
// Dynamic = false,
153+
// Durable = 0,
154+
// },
155+
};
156+
return a;
157+
}
158+
118159
internal static Attach CreateAttach(string? address,
119160
DeliveryMode deliveryMode, Guid linkId, Map? sourceFilter = null)
120161
{

0 commit comments

Comments
 (0)