Skip to content

Commit 16cda8b

Browse files
committed
direct reply to restart
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 1cb5551 commit 16cda8b

File tree

14 files changed

+196
-51
lines changed

14 files changed

+196
-51
lines changed

RabbitMQ.AMQP.Client/IConsumer.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@ public interface IConsumer : ILifeCycle
3939
long UnsettledMessageCount { get; }
4040

4141
/// <summary>
42-
/// Returns queue address the consumer is consuming from.
42+
/// Returns queue name the consumer is consuming from.
43+
/// The Queue name is usually configured by the user via the <see cref="IConsumerBuilder"/>,
44+
/// but can also be generated by the client the special direct-reply-to queue.
4345
/// </summary>
44-
string? QueueAddress { get; }
46+
string Queue { get; }
4547
}
4648

4749
public interface IContext

RabbitMQ.AMQP.Client/IRequester.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,6 @@ public interface IRequester : ILifeCycle
105105
/// - The server uses this address to send the reply message. with direct-reply-to
106106
/// </summary>
107107
/// <returns></returns>
108-
public string ReplyToQueueAddress();
108+
public string GetReplyToQueue();
109109
}
110110
}

RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,36 +21,21 @@ public T Exchange(IExchangeSpecification exchangeSpec)
2121
public T Exchange(string? exchangeName)
2222
{
2323
_exchange = exchangeName;
24-
if (_owner == null)
25-
{
26-
throw new InvalidOperationException("Owner is null");
27-
}
28-
29-
return _owner;
24+
return _owner ?? throw new InvalidOperationException("Owner is null");
3025
}
3126

3227
public T Queue(IQueueSpecification queueSpec) => Queue(queueSpec.QueueName);
3328

3429
public T Queue(string? queueName)
3530
{
3631
_queue = queueName;
37-
if (_owner == null)
38-
{
39-
throw new InvalidOperationException("Owner is null");
40-
}
41-
42-
return _owner;
32+
return _owner ?? throw new InvalidOperationException("Owner is null");
4333
}
4434

4535
public T Key(string? key)
4636
{
4737
_key = key;
48-
if (_owner == null)
49-
{
50-
throw new InvalidOperationException("Owner is null");
51-
}
52-
53-
return _owner;
38+
return _owner ?? throw new InvalidOperationException("Owner is null");
5439
}
5540

5641
public string Address()
@@ -85,12 +70,19 @@ public string Address()
8570
return "";
8671
}
8772

88-
if (string.IsNullOrEmpty(_queue))
89-
{
90-
throw new InvalidAddressException("Queue must be set");
91-
}
73+
return string.IsNullOrEmpty(_queue)
74+
? throw new InvalidAddressException("Queue must be set")
75+
: $"/{Consts.Queues}/{Utils.EncodePathSegment(_queue)}";
76+
}
9277

93-
return $"/{Consts.Queues}/{Utils.EncodePathSegment(_queue)}";
78+
public string DecodeQueuePathSegment(string path)
79+
{
80+
string? v = Utils.DecodePathSegment(path);
81+
return v == null
82+
? throw new InvalidAddressException("Invalid path segment")
83+
:
84+
// remove the /queues prefix to the path
85+
v.Substring($"/{Consts.Queues}/".Length);
9486
}
9587
}
9688

@@ -127,6 +119,7 @@ public IMessage Build()
127119
public class RequesterAddressBuilder : DefaultAddressBuilder<IRequesterAddressBuilder>, IRequesterAddressBuilder
128120
{
129121
readonly AmqpRequesterBuilder _builder;
122+
130123
public RequesterAddressBuilder(AmqpRequesterBuilder builder)
131124
{
132125
_builder = builder;

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,12 +256,18 @@ ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
256256
/// </summary>
257257
public long UnsettledMessageCount => _unsettledMessageCounter.Get();
258258

259-
public string? QueueAddress
259+
public string Queue
260260
{
261261
get
262262
{
263-
string? x = _attach?.Source is not Source source ? null : source.Address;
264-
return x;
263+
string? sourceAddress = _attach?.Source is not Source source ? null : source.Address;
264+
if (sourceAddress is null)
265+
{
266+
throw new InvalidOperationException(
267+
"_attach.Source.Address is null");
268+
}
269+
270+
return AddressBuilderHelper.AddressBuilder().DecodeQueuePathSegment(sourceAddress);
265271
}
266272
}
267273

@@ -318,7 +324,7 @@ await _receiverLink.CloseAsync(TimeSpan.FromSeconds(ConsumerDefaults.CloseTimeou
318324

319325
public override string ToString()
320326
{
321-
return $"Consumer{{Address='{QueueAddress}', " +
327+
return $"Consumer{{Address='{Queue}', " +
322328
$"id={_id}, " +
323329
$"Connection='{_amqpConnection}', " +
324330
$"State='{State}'}}";

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,9 @@ public Task<PublishResult> PublishAsync(IMessage message, CancellationToken canc
138138
void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)
139139
{
140140
// Note: sometimes `inMessage` is null 🤔
141-
Debug.Assert(Object.ReferenceEquals(this, state));
141+
Debug.Assert(ReferenceEquals(this, state));
142142

143-
if (false == Object.ReferenceEquals(_senderLink, sender))
143+
if (!ReferenceEquals(_senderLink, sender))
144144
{
145145
// TODO log this case?
146146
}

RabbitMQ.AMQP.Client/Impl/AmqpRequester.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ public class RequesterConfiguration
1414
public AmqpConnection Connection { get; set; } = null!;
1515
public string ReplyToQueue { get; set; } = "";
1616

17-
public string RequestAddress { get; set; } = "";
1817
public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(10);
1918

2019
public Func<object>? CorrelationIdSupplier { get; set; } = null;
@@ -79,7 +78,6 @@ public IRequesterBuilder Timeout(TimeSpan timeout)
7978

8079
public async Task<IRequester> BuildAsync()
8180
{
82-
_configuration.RequestAddress = _addressBuilder.Address();
8381
_configuration.Connection = _connection;
8482
var rpcClient = new AmqpRequester(_configuration);
8583
await rpcClient.OpenAsync().ConfigureAwait(false);
@@ -135,7 +133,7 @@ private IMessage RequestPostProcess(IMessage request, object correlationId)
135133
return _configuration.RequestPostProcessor(request, correlationId);
136134
}
137135

138-
string s = ReplyToQueueAddress();
136+
string s = GetReplyToQueue();
139137
return request.ReplyTo(s)
140138
.MessageId(correlationId);
141139
}
@@ -230,7 +228,8 @@ public async Task<IMessage> PublishAsync(IMessage message, CancellationToken can
230228
if (_publisher != null)
231229
{
232230
PublishResult pr = await _publisher.PublishAsync(
233-
message.To(_configuration.RequestAddress), cancellationToken).ConfigureAwait(false);
231+
message.To(AddressBuilderHelper.AddressBuilder().Queue(GetReplyToQueue()).Address()),
232+
cancellationToken).ConfigureAwait(false);
234233

235234
if (pr.Outcome.State != OutcomeState.Accepted)
236235
{
@@ -250,14 +249,14 @@ await _pendingRequests[correlationId].Task.WaitAsync(_configuration.Timeout)
250249
}
251250
}
252251

253-
public string ReplyToQueueAddress()
252+
public string GetReplyToQueue()
254253
{
255254
if (_consumer == null)
256255
{
257256
throw new InvalidOperationException("Requester is not opened");
258257
}
259258

260-
return _consumer.QueueAddress ??
259+
return _consumer.Queue ??
261260
throw new InvalidOperationException("ReplyToQueueAddress is not available");
262261
}
263262
}

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ RabbitMQ.AMQP.Client.IConnection.RequesterBuilder() -> RabbitMQ.AMQP.Client.IReq
171171
RabbitMQ.AMQP.Client.IConnection.ResponderBuilder() -> RabbitMQ.AMQP.Client.IResponderBuilder!
172172
RabbitMQ.AMQP.Client.IConsumer
173173
RabbitMQ.AMQP.Client.IConsumer.Pause() -> void
174-
RabbitMQ.AMQP.Client.IConsumer.QueueAddress.get -> string?
174+
RabbitMQ.AMQP.Client.IConsumer.Queue.get -> string!
175175
RabbitMQ.AMQP.Client.IConsumer.Unpause() -> void
176176
RabbitMQ.AMQP.Client.IConsumer.UnsettledMessageCount.get -> long
177177
RabbitMQ.AMQP.Client.IConsumerBuilder
@@ -366,7 +366,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpConnection.RequesterBuilder() -> RabbitMQ.AMQP.Cli
366366
RabbitMQ.AMQP.Client.Impl.AmqpConnection.ResponderBuilder() -> RabbitMQ.AMQP.Client.IResponderBuilder!
367367
RabbitMQ.AMQP.Client.Impl.AmqpConsumer
368368
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Pause() -> void
369-
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.QueueAddress.get -> string?
369+
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Queue.get -> string!
370370
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Unpause() -> void
371371
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.UnsettledMessageCount.get -> long
372372
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder
@@ -498,8 +498,8 @@ RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumInitialGroupSize(int siz
498498
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumTargetGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
499499
RabbitMQ.AMQP.Client.Impl.AmqpRequester
500500
RabbitMQ.AMQP.Client.Impl.AmqpRequester.AmqpRequester(RabbitMQ.AMQP.Client.Impl.RequesterConfiguration! configuration) -> void
501+
RabbitMQ.AMQP.Client.Impl.AmqpRequester.GetReplyToQueue() -> string!
501502
RabbitMQ.AMQP.Client.Impl.AmqpRequester.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IMessage!>!
502-
RabbitMQ.AMQP.Client.Impl.AmqpRequester.ReplyToQueueAddress() -> string!
503503
RabbitMQ.AMQP.Client.Impl.AmqpRequesterBuilder
504504
RabbitMQ.AMQP.Client.Impl.AmqpRequesterBuilder.AmqpRequesterBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void
505505
RabbitMQ.AMQP.Client.Impl.AmqpRequesterBuilder.BuildAsync() -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IRequester!>!
@@ -549,6 +549,7 @@ RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions
549549
RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.ConsumerBuilderStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters) -> void
550550
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>
551551
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>.Address() -> string!
552+
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>.DecodeQueuePathSegment(string! path) -> string!
552553
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>.DefaultAddressBuilder() -> void
553554
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>.Exchange(RabbitMQ.AMQP.Client.IExchangeSpecification! exchangeSpec) -> T
554555
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>.Exchange(string? exchangeName) -> T
@@ -586,8 +587,6 @@ RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.CorrelationIdSupplier.get -> Sy
586587
RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.CorrelationIdSupplier.set -> void
587588
RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.ReplyToQueue.get -> string!
588589
RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.ReplyToQueue.set -> void
589-
RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.RequestAddress.get -> string!
590-
RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.RequestAddress.set -> void
591590
RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.RequesterConfiguration() -> void
592591
RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.RequestPostProcessor.get -> System.Func<RabbitMQ.AMQP.Client.IMessage!, object!, RabbitMQ.AMQP.Client.IMessage!>?
593592
RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.RequestPostProcessor.set -> void
@@ -694,8 +693,8 @@ RabbitMQ.AMQP.Client.IRecoveryConfiguration.IsActivated() -> bool
694693
RabbitMQ.AMQP.Client.IRecoveryConfiguration.IsTopologyActive() -> bool
695694
RabbitMQ.AMQP.Client.IRecoveryConfiguration.Topology(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
696695
RabbitMQ.AMQP.Client.IRequester
696+
RabbitMQ.AMQP.Client.IRequester.GetReplyToQueue() -> string!
697697
RabbitMQ.AMQP.Client.IRequester.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IMessage!>!
698-
RabbitMQ.AMQP.Client.IRequester.ReplyToQueueAddress() -> string!
699698
RabbitMQ.AMQP.Client.IRequesterAddressBuilder
700699
RabbitMQ.AMQP.Client.IRequesterAddressBuilder.Requester() -> RabbitMQ.AMQP.Client.IRequesterBuilder!
701700
RabbitMQ.AMQP.Client.IRequesterBuilder

RabbitMQ.AMQP.Client/Utils.cs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,11 @@ internal static Attach CreateAttach(string? address,
191191
{
192192
return PercentCodec.EncodePathSegment(url);
193193
}
194+
195+
internal static string? DecodePathSegment(string url)
196+
{
197+
return PercentCodec.DecodePathSegment(url);
198+
}
194199

195200
internal static string EncodeHttpParameter(string url)
196201
{
@@ -374,7 +379,66 @@ static PercentCodec()
374379
s_unreserved['_'] = true;
375380
s_unreserved['~'] = true;
376381
}
382+
383+
private static int HexValue(char ch)
384+
{
385+
return ch switch
386+
{
387+
>= '0' and <= '9' => ch - '0',
388+
>= 'A' and <= 'F' => ch - 'A' + 10,
389+
>= 'a' and <= 'f' => ch - 'a' + 10,
390+
_ => -1
391+
};
392+
}
393+
377394

395+
internal static string? DecodePathSegment(string? segment)
396+
{
397+
if (segment == null)
398+
{
399+
return null;
400+
}
401+
402+
var bytes = new List<byte>(segment.Length);
403+
for (int i = 0; i < segment.Length; i++)
404+
{
405+
char c = segment[i];
406+
if (c == '%')
407+
{
408+
if (i + 2 >= segment.Length)
409+
{
410+
throw new FormatException("Invalid percent-encoding: incomplete escape sequence.");
411+
}
412+
413+
int hi = HexValue(segment[i + 1]);
414+
int lo = HexValue(segment[i + 2]);
415+
if (hi < 0 || lo < 0)
416+
{
417+
throw new FormatException($"Invalid percent-encoding: '{segment.Substring(i, 3)}'.");
418+
}
419+
420+
bytes.Add((byte)((hi << 4) | lo));
421+
i += 2;
422+
}
423+
else
424+
{
425+
// Append UTF-8 encoding of the character (handles non-ASCII chars)
426+
if (c <= 0x7F)
427+
{
428+
bytes.Add((byte)c);
429+
}
430+
else
431+
{
432+
bytes.AddRange(Encoding.UTF8.GetBytes(new[] { c }));
433+
}
434+
}
435+
}
436+
437+
return Encoding.UTF8.GetString(bytes.ToArray());
438+
}
439+
440+
441+
378442
internal static string? EncodePathSegment(string? segment)
379443
{
380444
if (segment == null)

Tests/AddressBuilderTests.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// This source code is dual-licensed under the Apache License, version 2.0,
2+
// and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
5+
using RabbitMQ.AMQP.Client.Impl;
6+
using Xunit;
7+
8+
namespace Tests
9+
{
10+
public class AddressBuilderTests
11+
{
12+
[Theory]
13+
[InlineData("myQueue", "/queues/myQueue")]
14+
[InlineData("queue/with/slash", "/queues/queue%2Fwith%2Fslash")]
15+
[InlineData("queue with spaces", "/queues/queue%20with%20spaces")]
16+
[InlineData("queue+with+plus", "/queues/queue%2Bwith%2Bplus")]
17+
[InlineData("queue?with?question", "/queues/queue%3Fwith%3Fquestion")]
18+
[InlineData("特殊字符", "/queues/%E7%89%B9%E6%AE%8A%E5%AD%97%E7%AC%A6")]
19+
[InlineData("emoji😊queue", "/queues/emoji%F0%9F%98%8Aqueue")]
20+
[InlineData("!@#$%^&*()", "/queues/%21%40%23%24%25%5E%26%2A%28%29")]
21+
public void AddressBuilder_EncodeAndDecode(string queue, string queuePath)
22+
{
23+
AddressBuilder addressBuilder = new();
24+
string fullAddress = addressBuilder.Queue(queue).Address();
25+
Assert.Equal(queuePath, fullAddress);
26+
string decodedQueue = addressBuilder.DecodeQueuePathSegment(fullAddress);
27+
Assert.Equal(queue, decodedQueue);
28+
}
29+
}
30+
}

Tests/Consumer/BasicConsumerTests.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -371,17 +371,13 @@ public async Task ConsumerShouldThrowWhenQueueDoesNotExist()
371371

372372
IConsumerBuilder consumerBuilder = _connection.ConsumerBuilder()
373373
.Queue(doesNotExist)
374-
.MessageHandler((context, message) =>
375-
{
376-
return Task.CompletedTask;
377-
}
378-
);
374+
.MessageHandler((context, message) => Task.CompletedTask);
379375

380376
// TODO these are timeout exceptions under the hood, compare
381377
// with the Java client
382378
ConsumerException ex = await Assert.ThrowsAsync<ConsumerException>(
383379
() => consumerBuilder.BuildAndStartAsync());
384-
Assert.Contains(doesNotExist, ex.Message);
380+
Assert.Contains("amqp:not-found", ex.Message);
385381
}
386382

387383
[Fact]

0 commit comments

Comments
 (0)