Skip to content

Commit 48e8a72

Browse files
GsantomaggioZerpet
andcommitted
Implement SQL filters
Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Aitor Perez <[email protected]>
1 parent 20f7e50 commit 48e8a72

File tree

7 files changed

+144
-72
lines changed

7 files changed

+144
-72
lines changed

RabbitMQ.AMQP.Client/Consts.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using Amqp.Types;
6+
57
namespace RabbitMQ.AMQP.Client
68
{
79
public static class Consts
@@ -21,5 +23,24 @@ public static class Consts
2123
/// The default virtual host, <c>/</c>
2224
/// </summary>
2325
public const string DefaultVirtualHost = "/";
26+
27+
// amqp:sql-filter
28+
private const string AmqpSqlFilter = "amqp:sql-filter";
29+
internal static readonly Symbol s_streamSqlFilterSymbol = new(AmqpSqlFilter);
30+
private const string SqlFilter = "sql-filter";
31+
internal static readonly Symbol s_sqlFilterSymbol = new(SqlFilter);
32+
33+
internal const string AmqpPropertiesFilter = "amqp:properties-filter";
34+
internal const string AmqpApplicationPropertiesFilter = "amqp:application-properties-filter";
35+
36+
// sql-filter
37+
private const string RmqStreamFilter = "rabbitmq:stream-filter";
38+
private const string RmqStreamOffsetSpec = "rabbitmq:stream-offset-spec";
39+
private const string RmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered";
40+
41+
internal static readonly Symbol s_streamFilterSymbol = new(RmqStreamFilter);
42+
internal static readonly Symbol s_streamOffsetSpecSymbol = new(RmqStreamOffsetSpec);
43+
internal static readonly Symbol s_streamMatchUnfilteredSymbol = new(RmqStreamMatchUnfiltered);
44+
2445
}
2546
}

RabbitMQ.AMQP.Client/IConsumerBuilder.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,6 @@ public interface IStreamOptions
101101
/// <see cref="FilterMatchUnfiltered(bool)"/>
102102
IStreamFilterOptions Filter();
103103

104-
/// <summary>
105-
/// <para>SQL filter expression.</para>
106-
///
107-
/// </summary>
108-
/// <para>Requires RabbitMQ 4.2 or more.</para>
109-
/// Documentation: <see href="https://www.rabbitmq.com/docs/next/stream-filtering#sql-filter-expressions">SQL Filtering</see>
110-
IStreamFilterOptions Sql(string sql);
111-
112104
/// <summary>
113105
/// Return the consumer builder.
114106
/// </summary>
@@ -202,6 +194,14 @@ public interface IStreamFilterOptions
202194
/// <returns><see cref="IStreamFilterOptions"/></returns>
203195
IStreamFilterOptions PropertySymbol(string key, string value);
204196

197+
/// <summary>
198+
/// <para>SQL filter expression.</para>
199+
///
200+
/// </summary>
201+
/// <para>Requires RabbitMQ 4.2 or more.</para>
202+
/// Documentation: <see href="https://www.rabbitmq.com/docs/next/stream-filtering#sql-filter-expressions">SQL Filtering</see>
203+
IStreamFilterOptions Sql(string sql);
204+
205205
/// <summary>
206206
/// Return the stream options.
207207
/// </summary>

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public override async Task OpenAsync()
5858
// ListenerContext will override only the filters the selected filters.
5959
if (_configuration.ListenerContext is not null)
6060
{
61-
var listenerStreamOptions = new ListenerStreamOptions(_configuration.Filters, _amqpConnection._featureFlags);
61+
var listenerStreamOptions = new ListenerStreamOptions(_configuration.Filters);
6262
var listenerContext = new IConsumerBuilder.ListenerContext(listenerStreamOptions);
6363
_configuration.ListenerContext(listenerContext);
6464
}

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 37 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,7 @@ public IConsumerBuilder SubscriptionListener(Action<IConsumerBuilder.ListenerCon
7676

7777
public IConsumerBuilder.IStreamOptions Stream()
7878
{
79-
return new ConsumerBuilderStreamOptions(this, _configuration.Filters,
80-
_amqpConnection._featureFlags);
79+
return new ConsumerBuilderStreamOptions(this, _configuration.Filters);
8180
}
8281

8382
public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationToken = default)
@@ -87,6 +86,13 @@ public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationTo
8786
throw new ConsumerException("Message handler is not set");
8887
}
8988

89+
if (_configuration.Filters[Consts.s_sqlFilterSymbol] is not null &&
90+
_amqpConnection._featureFlags.IsSqlFeatureEnabled == false)
91+
{
92+
throw new ConsumerException("SQL filter is not supported by the connection." +
93+
"RabbitMQ 4.2.0 or later is required.");
94+
}
95+
9096
AmqpConsumer consumer = new(_amqpConnection, _configuration, _metricsReporter);
9197

9298
// TODO pass cancellationToken
@@ -107,34 +113,17 @@ public abstract class StreamOptions : IConsumerBuilder.IStreamOptions
107113
private static readonly Regex s_offsetValidator = new Regex("^[0-9]+[YMDhms]$",
108114
RegexOptions.Compiled | RegexOptions.CultureInvariant);
109115

110-
// sql-filter
111-
private const string SqlFilter = "sql-filter";
112-
private const string RmqStreamFilter = "rabbitmq:stream-filter";
113-
private const string RmqStreamOffsetSpec = "rabbitmq:stream-offset-spec";
114-
115-
private const string RmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered";
116-
117-
// amqp:sql-filter
118-
private const string AmqpSqlFilter = "amqp:sql-filter";
119-
120-
private static readonly Symbol s_streamFilterSymbol = new(RmqStreamFilter);
121-
private static readonly Symbol s_streamOffsetSpecSymbol = new(RmqStreamOffsetSpec);
122-
private static readonly Symbol s_streamMatchUnfilteredSymbol = new(RmqStreamMatchUnfiltered);
123-
private static readonly Symbol s_streamSqlFilterSymbol = new(AmqpSqlFilter);
124-
125116
private readonly Map _filters;
126-
private readonly FeatureFlags _featureFlags;
127117

128-
protected StreamOptions(Map filters, FeatureFlags featureFlags)
118+
protected StreamOptions(Map filters)
129119
{
130120
_filters = filters;
131-
_featureFlags = featureFlags;
132121
}
133122

134123
public IConsumerBuilder.IStreamOptions Offset(long offset)
135124
{
136-
_filters[s_streamOffsetSpecSymbol] =
137-
new DescribedValue(s_streamOffsetSpecSymbol, offset);
125+
_filters[Consts.s_streamOffsetSpecSymbol] =
126+
new DescribedValue(Consts.s_streamOffsetSpecSymbol, offset);
138127
return this;
139128
}
140129

@@ -168,42 +157,24 @@ public IConsumerBuilder.IStreamOptions Offset(string interval)
168157

169158
public IConsumerBuilder.IStreamOptions FilterValues(params string[] values)
170159
{
171-
_filters[s_streamFilterSymbol] =
172-
new DescribedValue(s_streamFilterSymbol, values.ToList());
160+
_filters[Consts.s_streamFilterSymbol] =
161+
new DescribedValue(Consts.s_streamFilterSymbol, values.ToList());
173162
return this;
174163
}
175164

176165
public IConsumerBuilder.IStreamOptions FilterMatchUnfiltered(bool matchUnfiltered)
177166
{
178-
_filters[s_streamMatchUnfilteredSymbol]
179-
= new DescribedValue(s_streamMatchUnfilteredSymbol, matchUnfiltered);
167+
_filters[Consts.s_streamMatchUnfilteredSymbol]
168+
= new DescribedValue(Consts.s_streamMatchUnfilteredSymbol, matchUnfiltered);
180169
return this;
181170
}
182171

183-
public IConsumerBuilder.IStreamFilterOptions Sql(string sql)
184-
{
185-
if (string.IsNullOrWhiteSpace(sql))
186-
{
187-
throw new ArgumentNullException(nameof(sql));
188-
}
189-
190-
if (false == _featureFlags.IsSqlFeatureEnabled)
191-
{
192-
throw new ConsumerException("SQL filter is not supported by the broker. " +
193-
"The broker must be RabbitMQ 4.2 or later.");
194-
}
195-
196-
_filters[SqlFilter] =
197-
new DescribedValue(s_streamSqlFilterSymbol, sql);
198-
return Filter();
199-
}
200-
201172
public abstract IConsumerBuilder Builder();
202173

203174
private void SetOffsetSpecificationFilter(object value)
204175
{
205-
_filters[s_streamOffsetSpecSymbol]
206-
= new DescribedValue(s_streamOffsetSpecSymbol, value);
176+
_filters[Consts.s_streamOffsetSpecSymbol]
177+
= new DescribedValue(Consts.s_streamOffsetSpecSymbol, value);
207178
}
208179

209180
public IConsumerBuilder.IStreamFilterOptions Filter()
@@ -226,8 +197,8 @@ public IConsumerBuilder.IStreamFilterOptions Filter()
226197
/// </summary>
227198
public class ListenerStreamOptions : StreamOptions
228199
{
229-
public ListenerStreamOptions(Map filters, FeatureFlags featureFlags)
230-
: base(filters, featureFlags)
200+
public ListenerStreamOptions(Map filters)
201+
: base(filters)
231202
{
232203
}
233204

@@ -249,8 +220,8 @@ public class ConsumerBuilderStreamOptions : StreamOptions
249220
private readonly IConsumerBuilder _consumerBuilder;
250221

251222
public ConsumerBuilderStreamOptions(IConsumerBuilder consumerBuilder,
252-
Map filters, FeatureFlags featureFlags)
253-
: base(filters, featureFlags)
223+
Map filters)
224+
: base(filters)
254225
{
255226
_consumerBuilder = consumerBuilder;
256227
}
@@ -267,15 +238,27 @@ public override IConsumerBuilder Builder()
267238
/// </summary>
268239
public class StreamFilterOptions : IConsumerBuilder.IStreamFilterOptions
269240
{
270-
private IConsumerBuilder.IStreamOptions _streamOptions;
271-
private Map _filters;
241+
private readonly IConsumerBuilder.IStreamOptions _streamOptions;
242+
private readonly Map _filters;
272243

273244
public StreamFilterOptions(IConsumerBuilder.IStreamOptions streamOptions, Map filters)
274245
{
275246
_streamOptions = streamOptions;
276247
_filters = filters;
277248
}
278249

250+
public IConsumerBuilder.IStreamFilterOptions Sql(string sql)
251+
{
252+
if (string.IsNullOrWhiteSpace(sql))
253+
{
254+
throw new ArgumentNullException(nameof(sql));
255+
}
256+
257+
_filters[Consts.s_sqlFilterSymbol] =
258+
new DescribedValue(Consts.s_streamSqlFilterSymbol, sql);
259+
return this;
260+
}
261+
279262
public IConsumerBuilder.IStreamOptions Stream()
280263
{
281264
return _streamOptions;
@@ -328,9 +311,8 @@ public IConsumerBuilder.IStreamFilterOptions PropertySymbol(string key, string v
328311

329312
private StreamFilterOptions PropertyFilter(string propertyKey, object propertyValue)
330313
{
331-
const string AmqpPropertiesFilter = "amqp:properties-filter";
332314

333-
DescribedValue propertiesFilterValue = Filter(AmqpPropertiesFilter);
315+
DescribedValue propertiesFilterValue = Filter(Consts.AmqpPropertiesFilter);
334316
Map propertiesFilter = (Map)propertiesFilterValue.Value;
335317
// Note: you MUST use a symbol as the key
336318
propertiesFilter.Add(new Symbol(propertyKey), propertyValue);
@@ -339,9 +321,8 @@ private StreamFilterOptions PropertyFilter(string propertyKey, object propertyVa
339321

340322
private StreamFilterOptions ApplicationPropertyFilter(string propertyKey, object propertyValue)
341323
{
342-
const string AmqpApplicationPropertiesFilter = "amqp:application-properties-filter";
343324

344-
DescribedValue applicationPropertiesFilterValue = Filter(AmqpApplicationPropertiesFilter);
325+
DescribedValue applicationPropertiesFilterValue = Filter(Consts.AmqpApplicationPropertiesFilter);
345326
Map applicationPropertiesFilter = (Map)applicationPropertiesFilterValue.Value;
346327
// Note: do NOT put a symbol as the key
347328
applicationPropertiesFilter.Add(propertyKey, propertyValue);

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Property(string! key,
188188
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.PropertySymbol(string! key, string! value) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
189189
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.ReplyTo(string! replyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
190190
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.ReplyToGroupId(string! groupId) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
191+
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Sql(string! sql) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
191192
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
192193
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Subject(string! subject) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
193194
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.To(string! to) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
@@ -201,7 +202,6 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.Offset(long offset) -> Rabb
201202
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.Offset(RabbitMQ.AMQP.Client.StreamOffsetSpecification specification) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
202203
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.Offset(string! interval) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
203204
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.Offset(System.DateTime timestamp) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
204-
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.Sql(string! sql) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
205205
RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext
206206
RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext.ListenerContext(RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! streamOptions) -> void
207207
RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext.StreamOptions.get -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
@@ -540,7 +540,7 @@ RabbitMQ.AMQP.Client.Impl.BindingSpecification._routingKey -> string!
540540
RabbitMQ.AMQP.Client.Impl.BindingSpecification._sourceName -> string!
541541
RabbitMQ.AMQP.Client.Impl.BindingSpecification._toQueue -> bool
542542
RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions
543-
RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.ConsumerBuilderStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters, RabbitMQ.AMQP.Client.FeatureFlags! featureFlags) -> void
543+
RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.ConsumerBuilderStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters) -> void
544544
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>
545545
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>.Address() -> string!
546546
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>.DefaultAddressBuilder() -> void
@@ -564,7 +564,7 @@ RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.Type() -> RabbitMQ.AMQP.Client.QueueT
564564
RabbitMQ.AMQP.Client.Impl.FieldNotSetException
565565
RabbitMQ.AMQP.Client.Impl.FieldNotSetException.FieldNotSetException() -> void
566566
RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions
567-
RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types.Map! filters, RabbitMQ.AMQP.Client.FeatureFlags! featureFlags) -> void
567+
RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types.Map! filters) -> void
568568
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder
569569
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.Build() -> RabbitMQ.AMQP.Client.IMessage!
570570
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.MessageAddressBuilder(RabbitMQ.AMQP.Client.IMessage! message) -> void
@@ -612,6 +612,7 @@ RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Property(string! key, object! valu
612612
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.PropertySymbol(string! key, string! value) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
613613
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.ReplyTo(string! replyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
614614
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.ReplyToGroupId(string! groupId) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
615+
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Sql(string! sql) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
615616
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
616617
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.StreamFilterOptions(RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! streamOptions, Amqp.Types.Map! filters) -> void
617618
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Subject(string! subject) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
@@ -625,8 +626,7 @@ RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(long offset) -> RabbitMQ.AMQP.Cli
625626
RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(RabbitMQ.AMQP.Client.StreamOffsetSpecification specification) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
626627
RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(string! interval) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
627628
RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(System.DateTime timestamp) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
628-
RabbitMQ.AMQP.Client.Impl.StreamOptions.Sql(string! sql) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
629-
RabbitMQ.AMQP.Client.Impl.StreamOptions.StreamOptions(Amqp.Types.Map! filters, RabbitMQ.AMQP.Client.FeatureFlags! featureFlags) -> void
629+
RabbitMQ.AMQP.Client.Impl.StreamOptions.StreamOptions(Amqp.Types.Map! filters) -> void
630630
RabbitMQ.AMQP.Client.InternalBugException
631631
RabbitMQ.AMQP.Client.InternalBugException.InternalBugException() -> void
632632
RabbitMQ.AMQP.Client.InternalBugException.InternalBugException(string! message) -> void

0 commit comments

Comments
 (0)