Skip to content

Commit 9435502

Browse files
GsantomaggioZerpet
andauthored
Implement SQL filter (#131)
* Implement SQL filter --------- Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Aitor Perez <[email protected]>
1 parent 119026f commit 9435502

16 files changed

+244
-52
lines changed

.ci/ubuntu/cluster/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ function run_docker_compose
1919
docker compose --file "$script_dir/docker-compose.yml" $@
2020
}
2121

22-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}"
22+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management}"
2323

2424
if [[ ! -v GITHUB_ACTIONS ]]
2525
then
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
# This file might be required to test with images built from main.
1+
# This file is required to test with images built from main.
22
# Images in pivotalrabbitmq/rabbitmq disable the metrics collector by default.
33
management_agent.disable_metrics_collector = false

.ci/ubuntu/one-node/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
1010

1111

12-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}"
12+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management}"
1313

1414

1515

RabbitMQ.AMQP.Client/Consts.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using Amqp.Types;
6+
57
namespace RabbitMQ.AMQP.Client
68
{
79
public static class Consts
@@ -21,5 +23,24 @@ public static class Consts
2123
/// The default virtual host, <c>/</c>
2224
/// </summary>
2325
public const string DefaultVirtualHost = "/";
26+
27+
// amqp:sql-filter
28+
private const string AmqpSqlFilter = "amqp:sql-filter";
29+
internal static readonly Symbol s_streamSqlFilterSymbol = new(AmqpSqlFilter);
30+
internal const string SqlFilter = "sql-filter";
31+
internal static readonly Symbol s_sqlFilterSymbol = new(SqlFilter);
32+
33+
internal const string AmqpPropertiesFilter = "amqp:properties-filter";
34+
internal const string AmqpApplicationPropertiesFilter = "amqp:application-properties-filter";
35+
36+
// sql-filter
37+
private const string RmqStreamFilter = "rabbitmq:stream-filter";
38+
private const string RmqStreamOffsetSpec = "rabbitmq:stream-offset-spec";
39+
private const string RmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered";
40+
41+
internal static readonly Symbol s_streamFilterSymbol = new(RmqStreamFilter);
42+
internal static readonly Symbol s_streamOffsetSpecSymbol = new(RmqStreamOffsetSpec);
43+
internal static readonly Symbol s_streamMatchUnfilteredSymbol = new(RmqStreamMatchUnfiltered);
44+
2445
}
2546
}

RabbitMQ.AMQP.Client/FeatureFlags.cs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,25 @@
44

55
namespace RabbitMQ.AMQP.Client
66
{
7-
internal class FeatureFlags
7+
public class FeatureFlags
88
{
9-
public bool IsSqlFeatureEnabled { get; set; } = false;
10-
public bool IsBrokerCompatible { get; set; } = false;
9+
10+
/// <summary>
11+
/// Check if filter feature is enabled.
12+
/// Filter feature is available in RabbitMQ 4.1 and later.
13+
/// </summary>
14+
public bool IsFilterFeatureEnabled { get; internal set; } = false;
15+
16+
/// <summary>
17+
/// Check if Sql feature is enabled.
18+
/// Sql feature is available in RabbitMQ 4.2 and later.
19+
/// </summary>
20+
public bool IsSqlFeatureEnabled { get; internal set; } = false;
21+
/// <summary>
22+
/// Check if the client is compatible with the broker version.
23+
/// The client requires RabbitMQ 4.0 or later to be compatible.
24+
/// </summary>
25+
public bool IsBrokerCompatible { get; internal set; } = false;
1126

1227
public void Validate()
1328
{

RabbitMQ.AMQP.Client/IConsumerBuilder.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,14 @@ public interface IStreamFilterOptions
194194
/// <returns><see cref="IStreamFilterOptions"/></returns>
195195
IStreamFilterOptions PropertySymbol(string key, string value);
196196

197+
/// <summary>
198+
/// <para>SQL filter expression.</para>
199+
///
200+
/// </summary>
201+
/// <para>Requires RabbitMQ 4.2 or more.</para>
202+
/// Documentation: <see href="https://www.rabbitmq.com/docs/next/stream-filtering#sql-filter-expressions">SQL Filtering</see>
203+
IStreamFilterOptions Sql(string sql);
204+
197205
/// <summary>
198206
/// Return the stream options.
199207
/// </summary>

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
3939
private readonly IMetricsReporter? _metricsReporter;
4040

4141
private readonly Dictionary<string, object> _connectionProperties = new();
42-
private bool _areFilterExpressionsSupported = false;
43-
private FeatureFlags _featureFlags = new FeatureFlags();
42+
internal readonly FeatureFlags _featureFlags = new FeatureFlags();
4443

4544
/// <summary>
4645
/// _publishersDict contains all the publishers created by the connection.
@@ -261,8 +260,6 @@ protected override void Dispose(bool disposing)
261260

262261
internal Connection? NativeConnection => _nativeConnection;
263262

264-
internal bool AreFilterExpressionsSupported => _areFilterExpressionsSupported;
265-
266263
// TODO this couples AmqpConnection with AmqpPublisher, yuck
267264
internal void AddPublisher(Guid id, IPublisher consumer)
268265
{
@@ -674,7 +671,7 @@ private void HandleProperties(Fields properties)
674671
// this is a feature that was introduced in RabbitMQ 4.2.0
675672
_featureFlags.IsSqlFeatureEnabled = Utils.Is4_2_OrMore(brokerVersion);
676673

677-
_areFilterExpressionsSupported = Utils.SupportsFilterExpressions(brokerVersion);
674+
_featureFlags.IsFilterFeatureEnabled = Utils.SupportsFilterExpressions(brokerVersion);
678675
}
679676
}
680677
}

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public override async Task OpenAsync()
5858
// ListenerContext will override only the filters the selected filters.
5959
if (_configuration.ListenerContext is not null)
6060
{
61-
var listenerStreamOptions = new ListenerStreamOptions(_configuration.Filters, _amqpConnection.AreFilterExpressionsSupported);
61+
var listenerStreamOptions = new ListenerStreamOptions(_configuration.Filters);
6262
var listenerContext = new IConsumerBuilder.ListenerContext(listenerStreamOptions);
6363
_configuration.ListenerContext(listenerContext);
6464
}
@@ -88,6 +88,12 @@ void OnAttached(ILink argLink, Attach argAttach)
8888

8989
// TODO configurable timeout
9090
var waitSpan = TimeSpan.FromSeconds(5);
91+
92+
// TODO
93+
// Even 10ms is enough to allow the links to establish,
94+
// which tells me it allows the .NET runtime to process
95+
await Task.Delay(10).ConfigureAwait(false);
96+
9197
_receiverLink = await attachCompletedTcs.Task.WaitAsync(waitSpan)
9298
.ConfigureAwait(false);
9399

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 40 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ internal sealed class ConsumerConfiguration
1818
{
1919
public string Address { get; set; } = "";
2020
public int InitialCredits { get; set; } = 100; // TODO use constant, check with Java lib
21+
2122
public Map Filters { get; set; } = new();
23+
2224
// TODO is a MessageHandler *really* optional???
2325
public MessageHandler? Handler { get; set; }
26+
2427
// TODO re-name to ListenerContextAction? Callback?
2528
public Action<IConsumerBuilder.ListenerContext>? ListenerContext = null;
2629
}
@@ -73,8 +76,7 @@ public IConsumerBuilder SubscriptionListener(Action<IConsumerBuilder.ListenerCon
7376

7477
public IConsumerBuilder.IStreamOptions Stream()
7578
{
76-
return new ConsumerBuilderStreamOptions(this, _configuration.Filters,
77-
_amqpConnection.AreFilterExpressionsSupported);
79+
return new ConsumerBuilderStreamOptions(this, _configuration.Filters);
7880
}
7981

8082
public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationToken = default)
@@ -84,6 +86,13 @@ public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationTo
8486
throw new ConsumerException("Message handler is not set");
8587
}
8688

89+
if (_configuration.Filters[Consts.s_sqlFilterSymbol] is not null &&
90+
_amqpConnection._featureFlags.IsSqlFeatureEnabled == false)
91+
{
92+
throw new ConsumerException("SQL filter is not supported by the connection." +
93+
"RabbitMQ 4.2.0 or later is required.");
94+
}
95+
8796
AmqpConsumer consumer = new(_amqpConnection, _configuration, _metricsReporter);
8897

8998
// TODO pass cancellationToken
@@ -104,27 +113,17 @@ public abstract class StreamOptions : IConsumerBuilder.IStreamOptions
104113
private static readonly Regex s_offsetValidator = new Regex("^[0-9]+[YMDhms]$",
105114
RegexOptions.Compiled | RegexOptions.CultureInvariant);
106115

107-
private const string RmqStreamFilter = "rabbitmq:stream-filter";
108-
private const string RmqStreamOffsetSpec = "rabbitmq:stream-offset-spec";
109-
private const string RmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered";
110-
111-
private static readonly Symbol s_streamFilterSymbol = new(RmqStreamFilter);
112-
private static readonly Symbol s_streamOffsetSpecSymbol = new(RmqStreamOffsetSpec);
113-
private static readonly Symbol s_streamMatchUnfilteredSymbol = new(RmqStreamMatchUnfiltered);
114-
115116
private readonly Map _filters;
116-
private readonly bool _areFilterExpressionsSupported;
117117

118-
protected StreamOptions(Map filters, bool areFilterExpressionsSupported)
118+
protected StreamOptions(Map filters)
119119
{
120120
_filters = filters;
121-
_areFilterExpressionsSupported = areFilterExpressionsSupported;
122121
}
123122

124123
public IConsumerBuilder.IStreamOptions Offset(long offset)
125124
{
126-
_filters[s_streamOffsetSpecSymbol] =
127-
new DescribedValue(s_streamOffsetSpecSymbol, offset);
125+
_filters[Consts.s_streamOffsetSpecSymbol] =
126+
new DescribedValue(Consts.s_streamOffsetSpecSymbol, offset);
128127
return this;
129128
}
130129

@@ -158,24 +157,24 @@ public IConsumerBuilder.IStreamOptions Offset(string interval)
158157

159158
public IConsumerBuilder.IStreamOptions FilterValues(params string[] values)
160159
{
161-
_filters[s_streamFilterSymbol] =
162-
new DescribedValue(s_streamFilterSymbol, values.ToList());
160+
_filters[Consts.s_streamFilterSymbol] =
161+
new DescribedValue(Consts.s_streamFilterSymbol, values.ToList());
163162
return this;
164163
}
165164

166165
public IConsumerBuilder.IStreamOptions FilterMatchUnfiltered(bool matchUnfiltered)
167166
{
168-
_filters[s_streamMatchUnfilteredSymbol]
169-
= new DescribedValue(s_streamMatchUnfilteredSymbol, matchUnfiltered);
167+
_filters[Consts.s_streamMatchUnfilteredSymbol]
168+
= new DescribedValue(Consts.s_streamMatchUnfilteredSymbol, matchUnfiltered);
170169
return this;
171170
}
172171

173172
public abstract IConsumerBuilder Builder();
174173

175174
private void SetOffsetSpecificationFilter(object value)
176175
{
177-
_filters[s_streamOffsetSpecSymbol]
178-
= new DescribedValue(s_streamOffsetSpecSymbol, value);
176+
_filters[Consts.s_streamOffsetSpecSymbol]
177+
= new DescribedValue(Consts.s_streamOffsetSpecSymbol, value);
179178
}
180179

181180
public IConsumerBuilder.IStreamFilterOptions Filter()
@@ -198,8 +197,8 @@ public IConsumerBuilder.IStreamFilterOptions Filter()
198197
/// </summary>
199198
public class ListenerStreamOptions : StreamOptions
200199
{
201-
public ListenerStreamOptions(Map filters, bool areFilterExpressionsSupported)
202-
: base(filters, areFilterExpressionsSupported)
200+
public ListenerStreamOptions(Map filters)
201+
: base(filters)
203202
{
204203
}
205204

@@ -221,8 +220,8 @@ public class ConsumerBuilderStreamOptions : StreamOptions
221220
private readonly IConsumerBuilder _consumerBuilder;
222221

223222
public ConsumerBuilderStreamOptions(IConsumerBuilder consumerBuilder,
224-
Map filters, bool areFilterExpressionsSupported)
225-
: base(filters, areFilterExpressionsSupported)
223+
Map filters)
224+
: base(filters)
226225
{
227226
_consumerBuilder = consumerBuilder;
228227
}
@@ -239,15 +238,27 @@ public override IConsumerBuilder Builder()
239238
/// </summary>
240239
public class StreamFilterOptions : IConsumerBuilder.IStreamFilterOptions
241240
{
242-
private IConsumerBuilder.IStreamOptions _streamOptions;
243-
private Map _filters;
241+
private readonly IConsumerBuilder.IStreamOptions _streamOptions;
242+
private readonly Map _filters;
244243

245244
public StreamFilterOptions(IConsumerBuilder.IStreamOptions streamOptions, Map filters)
246245
{
247246
_streamOptions = streamOptions;
248247
_filters = filters;
249248
}
250249

250+
public IConsumerBuilder.IStreamFilterOptions Sql(string sql)
251+
{
252+
if (string.IsNullOrWhiteSpace(sql))
253+
{
254+
throw new ArgumentNullException(nameof(sql));
255+
}
256+
257+
_filters[Consts.s_sqlFilterSymbol] =
258+
new DescribedValue(Consts.s_streamSqlFilterSymbol, sql);
259+
return this;
260+
}
261+
251262
public IConsumerBuilder.IStreamOptions Stream()
252263
{
253264
return _streamOptions;
@@ -300,9 +311,8 @@ public IConsumerBuilder.IStreamFilterOptions PropertySymbol(string key, string v
300311

301312
private StreamFilterOptions PropertyFilter(string propertyKey, object propertyValue)
302313
{
303-
const string AmqpPropertiesFilter = "amqp:properties-filter";
304314

305-
DescribedValue propertiesFilterValue = Filter(AmqpPropertiesFilter);
315+
DescribedValue propertiesFilterValue = Filter(Consts.AmqpPropertiesFilter);
306316
Map propertiesFilter = (Map)propertiesFilterValue.Value;
307317
// Note: you MUST use a symbol as the key
308318
propertiesFilter.Add(new Symbol(propertyKey), propertyValue);
@@ -311,9 +321,8 @@ private StreamFilterOptions PropertyFilter(string propertyKey, object propertyVa
311321

312322
private StreamFilterOptions ApplicationPropertyFilter(string propertyKey, object propertyValue)
313323
{
314-
const string AmqpApplicationPropertiesFilter = "amqp:application-properties-filter";
315324

316-
DescribedValue applicationPropertiesFilterValue = Filter(AmqpApplicationPropertiesFilter);
325+
DescribedValue applicationPropertiesFilterValue = Filter(Consts.AmqpApplicationPropertiesFilter);
317326
Map applicationPropertiesFilter = (Map)applicationPropertiesFilterValue.Value;
318327
// Note: do NOT put a symbol as the key
319328
applicationPropertiesFilter.Add(propertyKey, propertyValue);

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ void OnAttached(ILink argLink, Attach argAttach)
6767

6868
// TODO configurable timeout
6969
var waitSpan = TimeSpan.FromSeconds(5);
70+
// TODO
71+
// Even 10ms is enough to allow the links to establish,
72+
// which tells me it allows the .NET runtime to process
73+
await Task.Delay(10)
74+
.ConfigureAwait(false);
75+
7076
_senderLink = await attachCompletedTcs.Task.WaitAsync(waitSpan)
7177
.ConfigureAwait(false);
7278

0 commit comments

Comments
 (0)