Skip to content

Commit eb55784

Browse files
committed
Implement filters
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent a7cbab6 commit eb55784

File tree

9 files changed

+152
-72
lines changed

9 files changed

+152
-72
lines changed

.ci/ubuntu/cluster/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ function run_docker_compose
1919
docker compose --file "$script_dir/docker-compose.yml" $@
2020
}
2121

22-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main-otp27}"
22+
readonly rabbitmq_image="${RABBITMQ_IMAGE:- rabbitmq:4-management}"
2323

2424
if [[ ! -v GITHUB_ACTIONS ]]
2525
then

.ci/ubuntu/one-node/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
1010

1111

12-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main-otp27}"
12+
readonly rabbitmq_image="${RABBITMQ_IMAGE:- rabbitmq:4-management}"
1313

1414

1515

RabbitMQ.AMQP.Client/Consts.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using Amqp.Types;
6+
57
namespace RabbitMQ.AMQP.Client
68
{
79
public static class Consts
@@ -21,5 +23,27 @@ public static class Consts
2123
/// The default virtual host, <c>/</c>
2224
/// </summary>
2325
public const string DefaultVirtualHost = "/";
26+
27+
// amqp:sql-filter
28+
private const string AmqpSqlFilter = "amqp:sql-filter";
29+
internal static readonly Symbol s_streamSqlFilterSymbol = new(AmqpSqlFilter);
30+
internal const string SqlFilter = "SqlFilter";
31+
32+
internal const string AmqpPropertiesFilter = "amqp:properties-filter";
33+
internal const string AmqpApplicationPropertiesFilter = "amqp:application-properties-filter";
34+
35+
36+
37+
// sql-filter
38+
private const string RmqStreamFilter = "rabbitmq:stream-filter";
39+
private const string RmqStreamOffsetSpec = "rabbitmq:stream-offset-spec";
40+
private const string RmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered";
41+
42+
internal static readonly Symbol s_streamFilterSymbol = new(RmqStreamFilter);
43+
internal static readonly Symbol s_streamOffsetSpecSymbol = new(RmqStreamOffsetSpec);
44+
internal static readonly Symbol s_streamMatchUnfilteredSymbol = new(RmqStreamMatchUnfiltered);
45+
46+
47+
2448
}
2549
}

RabbitMQ.AMQP.Client/IConsumerBuilder.cs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,6 @@ public interface IStreamOptions
101101
/// <see cref="FilterMatchUnfiltered(bool)"/>
102102
IStreamFilterOptions Filter();
103103

104-
/// <summary>
105-
/// <para>SQL filter expression.</para>
106-
///
107-
/// </summary>
108-
/// <para>Requires RabbitMQ 4.2 or more.</para>
109-
/// Documentation: <see href="https://www.rabbitmq.com/docs/next/stream-filtering#sql-filter-expressions">SQL Filtering</see>
110-
IStreamFilterOptions Sql(string sql);
111104

112105
/// <summary>
113106
/// Return the consumer builder.
@@ -201,6 +194,16 @@ public interface IStreamFilterOptions
201194
/// <param name="value">application property value</param>
202195
/// <returns><see cref="IStreamFilterOptions"/></returns>
203196
IStreamFilterOptions PropertySymbol(string key, string value);
197+
198+
199+
/// <summary>
200+
/// <para>SQL filter expression.</para>
201+
///
202+
/// </summary>
203+
/// <para>Requires RabbitMQ 4.2 or more.</para>
204+
/// Documentation: <see href="https://www.rabbitmq.com/docs/next/stream-filtering#sql-filter-expressions">SQL Filtering</see>
205+
IStreamFilterOptions Sql(string sql);
206+
204207

205208
/// <summary>
206209
/// Return the stream options.

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public override async Task OpenAsync()
5858
// ListenerContext will override only the filters the selected filters.
5959
if (_configuration.ListenerContext is not null)
6060
{
61-
var listenerStreamOptions = new ListenerStreamOptions(_configuration.Filters, _amqpConnection._featureFlags);
61+
var listenerStreamOptions = new ListenerStreamOptions(_configuration.Filters);
6262
var listenerContext = new IConsumerBuilder.ListenerContext(listenerStreamOptions);
6363
_configuration.ListenerContext(listenerContext);
6464
}

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 40 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,7 @@ public IConsumerBuilder SubscriptionListener(Action<IConsumerBuilder.ListenerCon
7676

7777
public IConsumerBuilder.IStreamOptions Stream()
7878
{
79-
return new ConsumerBuilderStreamOptions(this, _configuration.Filters,
80-
_amqpConnection._featureFlags);
79+
return new ConsumerBuilderStreamOptions(this, _configuration.Filters);
8180
}
8281

8382
public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationToken = default)
@@ -87,6 +86,13 @@ public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationTo
8786
throw new ConsumerException("Message handler is not set");
8887
}
8988

89+
if (_configuration.Filters[Consts.SqlFilter] is not null &&
90+
_amqpConnection._featureFlags.IsSqlFeatureEnabled == false)
91+
{
92+
throw new ConsumerException("SQL filter is not supported by the connection." +
93+
"RabbitMQ 4.2.0 or later is required.");
94+
}
95+
9096
AmqpConsumer consumer = new(_amqpConnection, _configuration, _metricsReporter);
9197

9298
// TODO pass cancellationToken
@@ -107,34 +113,18 @@ public abstract class StreamOptions : IConsumerBuilder.IStreamOptions
107113
private static readonly Regex s_offsetValidator = new Regex("^[0-9]+[YMDhms]$",
108114
RegexOptions.Compiled | RegexOptions.CultureInvariant);
109115

110-
// sql-filter
111-
private const string SqlFilter = "sql-filter";
112-
private const string RmqStreamFilter = "rabbitmq:stream-filter";
113-
private const string RmqStreamOffsetSpec = "rabbitmq:stream-offset-spec";
114-
115-
private const string RmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered";
116-
117-
// amqp:sql-filter
118-
private const string AmqpSqlFilter = "amqp:sql-filter";
119-
120-
private static readonly Symbol s_streamFilterSymbol = new(RmqStreamFilter);
121-
private static readonly Symbol s_streamOffsetSpecSymbol = new(RmqStreamOffsetSpec);
122-
private static readonly Symbol s_streamMatchUnfilteredSymbol = new(RmqStreamMatchUnfiltered);
123-
private static readonly Symbol s_streamSqlFilterSymbol = new(AmqpSqlFilter);
124116

125117
private readonly Map _filters;
126-
private readonly FeatureFlags _featureFlags;
127118

128-
protected StreamOptions(Map filters, FeatureFlags featureFlags)
119+
protected StreamOptions(Map filters)
129120
{
130121
_filters = filters;
131-
_featureFlags = featureFlags;
132122
}
133123

134124
public IConsumerBuilder.IStreamOptions Offset(long offset)
135125
{
136-
_filters[s_streamOffsetSpecSymbol] =
137-
new DescribedValue(s_streamOffsetSpecSymbol, offset);
126+
_filters[Consts.s_streamOffsetSpecSymbol] =
127+
new DescribedValue(Consts.s_streamOffsetSpecSymbol, offset);
138128
return this;
139129
}
140130

@@ -168,42 +158,25 @@ public IConsumerBuilder.IStreamOptions Offset(string interval)
168158

169159
public IConsumerBuilder.IStreamOptions FilterValues(params string[] values)
170160
{
171-
_filters[s_streamFilterSymbol] =
172-
new DescribedValue(s_streamFilterSymbol, values.ToList());
161+
_filters[Consts.s_streamFilterSymbol] =
162+
new DescribedValue(Consts.s_streamFilterSymbol, values.ToList());
173163
return this;
174164
}
175165

176166
public IConsumerBuilder.IStreamOptions FilterMatchUnfiltered(bool matchUnfiltered)
177167
{
178-
_filters[s_streamMatchUnfilteredSymbol]
179-
= new DescribedValue(s_streamMatchUnfilteredSymbol, matchUnfiltered);
168+
_filters[Consts.s_streamMatchUnfilteredSymbol]
169+
= new DescribedValue(Consts.s_streamMatchUnfilteredSymbol, matchUnfiltered);
180170
return this;
181171
}
182172

183-
public IConsumerBuilder.IStreamFilterOptions Sql(string sql)
184-
{
185-
if (string.IsNullOrWhiteSpace(sql))
186-
{
187-
throw new ArgumentNullException(nameof(sql));
188-
}
189-
190-
if (false == _featureFlags.IsSqlFeatureEnabled)
191-
{
192-
throw new ConsumerException("SQL filter is not supported by the broker. " +
193-
"The broker must be RabbitMQ 4.2 or later.");
194-
}
195-
196-
_filters[SqlFilter] =
197-
new DescribedValue(s_streamSqlFilterSymbol, sql);
198-
return Filter();
199-
}
200173

201174
public abstract IConsumerBuilder Builder();
202175

203176
private void SetOffsetSpecificationFilter(object value)
204177
{
205-
_filters[s_streamOffsetSpecSymbol]
206-
= new DescribedValue(s_streamOffsetSpecSymbol, value);
178+
_filters[Consts.s_streamOffsetSpecSymbol]
179+
= new DescribedValue(Consts.s_streamOffsetSpecSymbol, value);
207180
}
208181

209182
public IConsumerBuilder.IStreamFilterOptions Filter()
@@ -226,8 +199,8 @@ public IConsumerBuilder.IStreamFilterOptions Filter()
226199
/// </summary>
227200
public class ListenerStreamOptions : StreamOptions
228201
{
229-
public ListenerStreamOptions(Map filters, FeatureFlags featureFlags)
230-
: base(filters, featureFlags)
202+
public ListenerStreamOptions(Map filters)
203+
: base(filters)
231204
{
232205
}
233206

@@ -249,8 +222,8 @@ public class ConsumerBuilderStreamOptions : StreamOptions
249222
private readonly IConsumerBuilder _consumerBuilder;
250223

251224
public ConsumerBuilderStreamOptions(IConsumerBuilder consumerBuilder,
252-
Map filters, FeatureFlags featureFlags)
253-
: base(filters, featureFlags)
225+
Map filters)
226+
: base(filters)
254227
{
255228
_consumerBuilder = consumerBuilder;
256229
}
@@ -267,15 +240,28 @@ public override IConsumerBuilder Builder()
267240
/// </summary>
268241
public class StreamFilterOptions : IConsumerBuilder.IStreamFilterOptions
269242
{
270-
private IConsumerBuilder.IStreamOptions _streamOptions;
271-
private Map _filters;
243+
private readonly IConsumerBuilder.IStreamOptions _streamOptions;
244+
private readonly Map _filters;
272245

273246
public StreamFilterOptions(IConsumerBuilder.IStreamOptions streamOptions, Map filters)
274247
{
275248
_streamOptions = streamOptions;
276249
_filters = filters;
277250
}
278251

252+
public IConsumerBuilder.IStreamFilterOptions Sql(string sql)
253+
{
254+
if (string.IsNullOrWhiteSpace(sql))
255+
{
256+
throw new ArgumentNullException(nameof(sql));
257+
}
258+
259+
260+
_filters[Consts.SqlFilter] =
261+
new DescribedValue(Consts.s_streamSqlFilterSymbol, sql);
262+
return this;
263+
}
264+
279265
public IConsumerBuilder.IStreamOptions Stream()
280266
{
281267
return _streamOptions;
@@ -328,9 +314,9 @@ public IConsumerBuilder.IStreamFilterOptions PropertySymbol(string key, string v
328314

329315
private StreamFilterOptions PropertyFilter(string propertyKey, object propertyValue)
330316
{
331-
const string AmqpPropertiesFilter = "amqp:properties-filter";
332317

333-
DescribedValue propertiesFilterValue = Filter(AmqpPropertiesFilter);
318+
319+
DescribedValue propertiesFilterValue = Filter(Consts.AmqpPropertiesFilter);
334320
Map propertiesFilter = (Map)propertiesFilterValue.Value;
335321
// Note: you MUST use a symbol as the key
336322
propertiesFilter.Add(new Symbol(propertyKey), propertyValue);
@@ -339,9 +325,8 @@ private StreamFilterOptions PropertyFilter(string propertyKey, object propertyVa
339325

340326
private StreamFilterOptions ApplicationPropertyFilter(string propertyKey, object propertyValue)
341327
{
342-
const string AmqpApplicationPropertiesFilter = "amqp:application-properties-filter";
343-
344-
DescribedValue applicationPropertiesFilterValue = Filter(AmqpApplicationPropertiesFilter);
328+
329+
DescribedValue applicationPropertiesFilterValue = Filter(Consts.AmqpApplicationPropertiesFilter);
345330
Map applicationPropertiesFilter = (Map)applicationPropertiesFilterValue.Value;
346331
// Note: do NOT put a symbol as the key
347332
applicationPropertiesFilter.Add(propertyKey, propertyValue);

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Property(string! key,
188188
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.PropertySymbol(string! key, string! value) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
189189
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.ReplyTo(string! replyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
190190
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.ReplyToGroupId(string! groupId) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
191+
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Sql(string! sql) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
191192
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
192193
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Subject(string! subject) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
193194
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.To(string! to) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
@@ -201,7 +202,6 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.Offset(long offset) -> Rabb
201202
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.Offset(RabbitMQ.AMQP.Client.StreamOffsetSpecification specification) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
202203
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.Offset(string! interval) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
203204
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.Offset(System.DateTime timestamp) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
204-
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.Sql(string! sql) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
205205
RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext
206206
RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext.ListenerContext(RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! streamOptions) -> void
207207
RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext.StreamOptions.get -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
@@ -540,7 +540,7 @@ RabbitMQ.AMQP.Client.Impl.BindingSpecification._routingKey -> string!
540540
RabbitMQ.AMQP.Client.Impl.BindingSpecification._sourceName -> string!
541541
RabbitMQ.AMQP.Client.Impl.BindingSpecification._toQueue -> bool
542542
RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions
543-
RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.ConsumerBuilderStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters, RabbitMQ.AMQP.Client.FeatureFlags! featureFlags) -> void
543+
RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.ConsumerBuilderStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters) -> void
544544
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>
545545
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>.Address() -> string!
546546
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>.DefaultAddressBuilder() -> void
@@ -564,7 +564,7 @@ RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.Type() -> RabbitMQ.AMQP.Client.QueueT
564564
RabbitMQ.AMQP.Client.Impl.FieldNotSetException
565565
RabbitMQ.AMQP.Client.Impl.FieldNotSetException.FieldNotSetException() -> void
566566
RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions
567-
RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types.Map! filters, RabbitMQ.AMQP.Client.FeatureFlags! featureFlags) -> void
567+
RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types.Map! filters) -> void
568568
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder
569569
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.Build() -> RabbitMQ.AMQP.Client.IMessage!
570570
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.MessageAddressBuilder(RabbitMQ.AMQP.Client.IMessage! message) -> void
@@ -612,6 +612,7 @@ RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Property(string! key, object! valu
612612
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.PropertySymbol(string! key, string! value) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
613613
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.ReplyTo(string! replyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
614614
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.ReplyToGroupId(string! groupId) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
615+
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Sql(string! sql) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
615616
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
616617
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.StreamFilterOptions(RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! streamOptions, Amqp.Types.Map! filters) -> void
617618
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Subject(string! subject) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
@@ -625,8 +626,7 @@ RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(long offset) -> RabbitMQ.AMQP.Cli
625626
RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(RabbitMQ.AMQP.Client.StreamOffsetSpecification specification) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
626627
RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(string! interval) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
627628
RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(System.DateTime timestamp) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
628-
RabbitMQ.AMQP.Client.Impl.StreamOptions.Sql(string! sql) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
629-
RabbitMQ.AMQP.Client.Impl.StreamOptions.StreamOptions(Amqp.Types.Map! filters, RabbitMQ.AMQP.Client.FeatureFlags! featureFlags) -> void
629+
RabbitMQ.AMQP.Client.Impl.StreamOptions.StreamOptions(Amqp.Types.Map! filters) -> void
630630
RabbitMQ.AMQP.Client.InternalBugException
631631
RabbitMQ.AMQP.Client.InternalBugException.InternalBugException() -> void
632632
RabbitMQ.AMQP.Client.InternalBugException.InternalBugException(string! message) -> void

0 commit comments

Comments
 (0)