Skip to content

Commit 9fe3f15

Browse files
committed
* Remove AmqpConnection from ConsumerConfiguration.
* Add TODO to detect RMQ version when stream filters are used.
1 parent 9f64013 commit 9fe3f15

File tree

2 files changed

+18
-8
lines changed

2 files changed

+18
-8
lines changed

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ private enum PauseStatus
2020
PAUSED,
2121
}
2222

23+
private readonly AmqpConnection _amqpConnection;
2324
private readonly Guid _id = Guid.NewGuid();
2425

2526
private ReceiverLink? _receiverLink;
@@ -28,10 +29,12 @@ private enum PauseStatus
2829
private readonly UnsettledMessageCounter _unsettledMessageCounter = new();
2930
private readonly ConsumerConfiguration _configuration;
3031

31-
internal AmqpConsumer(ConsumerConfiguration configuration)
32+
internal AmqpConsumer(AmqpConnection amqpConnection, ConsumerConfiguration configuration)
3233
{
34+
_amqpConnection = amqpConnection;
3335
_configuration = configuration;
34-
_configuration.Connection.AddConsumer(_id, this);
36+
37+
_amqpConnection.AddConsumer(_id, this);
3538
}
3639

3740
public override async Task OpenAsync()
@@ -74,7 +77,7 @@ void onAttached(ILink argLink, Attach argAttach)
7477
ReceiverLink? tmpReceiverLink = null;
7578
Task receiverLinkTask = Task.Run(async () =>
7679
{
77-
Session session = await _configuration.Connection._nativePubSubSessions.GetOrCreateSessionAsync()
80+
Session session = await _amqpConnection._nativePubSubSessions.GetOrCreateSessionAsync()
7881
.ConfigureAwait(false);
7982
tmpReceiverLink = new ReceiverLink(session, _id.ToString(), attach, onAttached);
8083
});
@@ -250,14 +253,14 @@ await _receiverLink.CloseAsync(TimeSpan.FromSeconds(5))
250253

251254
_receiverLink = null;
252255
OnNewStatus(State.Closed, null);
253-
_configuration.Connection.RemoveConsumer(_id);
256+
_amqpConnection.RemoveConsumer(_id);
254257
}
255258

256259
public override string ToString()
257260
{
258261
return $"Consumer{{Address='{_configuration.Address}', " +
259262
$"id={_id}, " +
260-
$"Connection='{_configuration.Connection}', " +
263+
$"Connection='{_amqpConnection}', " +
261264
$"State='{State}'}}";
262265
}
263266
}

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ namespace RabbitMQ.AMQP.Client.Impl
1616
/// </summary>
1717
internal sealed class ConsumerConfiguration
1818
{
19-
public AmqpConnection Connection { get; set; } = null!;
2019
public string Address { get; set; } = "";
2120
public int InitialCredits { get; set; } = 100; // TODO use constant, check with Java lib
2221
public Map Filters { get; set; } = new();
@@ -32,10 +31,11 @@ internal sealed class ConsumerConfiguration
3231
public class AmqpConsumerBuilder : IConsumerBuilder
3332
{
3433
private readonly ConsumerConfiguration _configuration = new();
34+
private readonly AmqpConnection _amqpConnection;
3535

3636
public AmqpConsumerBuilder(AmqpConnection connection)
3737
{
38-
_configuration.Connection = connection;
38+
_amqpConnection = connection;
3939
}
4040

4141
public IConsumerBuilder Queue(IQueueSpecification queueSpec)
@@ -80,7 +80,7 @@ public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationTo
8080
throw new ConsumerException("Message handler is not set");
8181
}
8282

83-
AmqpConsumer consumer = new(_configuration);
83+
AmqpConsumer consumer = new(_amqpConnection, _configuration);
8484

8585
// TODO pass cancellationToken
8686
await consumer.OpenAsync()
@@ -174,6 +174,13 @@ private void SetOffsetSpecificationFilter(object value)
174174

175175
public IConsumerBuilder.IStreamFilterOptions Filter()
176176
{
177+
/*
178+
* TODO detect RMQ version
179+
* if (!this.builder.connection.filterExpressionsSupported()) {
180+
* throw new IllegalArgumentException(
181+
* "AMQP filter expressions requires at least RabbitMQ 4.1.0");
182+
* }
183+
*/
177184
// TODO Should this be Consumer / Listener?
178185
return new StreamFilterOptions(this, _filters);
179186
}

0 commit comments

Comments
 (0)