Skip to content

Implement SQL filter #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/ubuntu/cluster/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function run_docker_compose
docker compose --file "$script_dir/docker-compose.yml" $@
}

readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}"
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}"

if [[ ! -v GITHUB_ACTIONS ]]
then
Expand Down
2 changes: 1 addition & 1 deletion .ci/ubuntu/one-node/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ readonly script_dir
echo "[INFO] script_dir: '$script_dir'"


readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}"
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}"



Expand Down
20 changes: 20 additions & 0 deletions RabbitMQ.AMQP.Client/Consts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using Amqp.Types;

namespace RabbitMQ.AMQP.Client
{
public static class Consts
Expand All @@ -21,5 +23,23 @@ public static class Consts
/// The default virtual host, <c>/</c>
/// </summary>
public const string DefaultVirtualHost = "/";

// amqp:sql-filter
private const string AmqpSqlFilter = "amqp:sql-filter";
internal static readonly Symbol s_streamSqlFilterSymbol = new(AmqpSqlFilter);
internal const string SqlFilter = "SqlFilter";

internal const string AmqpPropertiesFilter = "amqp:properties-filter";
internal const string AmqpApplicationPropertiesFilter = "amqp:application-properties-filter";

// sql-filter
private const string RmqStreamFilter = "rabbitmq:stream-filter";
private const string RmqStreamOffsetSpec = "rabbitmq:stream-offset-spec";
private const string RmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered";

internal static readonly Symbol s_streamFilterSymbol = new(RmqStreamFilter);
internal static readonly Symbol s_streamOffsetSpecSymbol = new(RmqStreamOffsetSpec);
internal static readonly Symbol s_streamMatchUnfilteredSymbol = new(RmqStreamMatchUnfiltered);

}
}
21 changes: 18 additions & 3 deletions RabbitMQ.AMQP.Client/FeatureFlags.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,25 @@

namespace RabbitMQ.AMQP.Client
{
internal class FeatureFlags
public class FeatureFlags
{
public bool IsSqlFeatureEnabled { get; set; } = false;
public bool IsBrokerCompatible { get; set; } = false;

/// <summary>
/// Check if filter feature is enabled.
/// Filter feature is available in RabbitMQ 4.1 and later.
/// </summary>
public bool IsFilterFeatureEnabled { get; internal set; } = false;

/// <summary>
/// Check if Sql feature is enabled.
/// Sql feature is available in RabbitMQ 4.2 and later.
/// </summary>
public bool IsSqlFeatureEnabled { get; internal set; } = false;
/// <summary>
/// Check if the client is compatible with the broker version.
/// The client requires RabbitMQ 4.0 or later to be compatible.
/// </summary>
public bool IsBrokerCompatible { get; internal set; } = false;

public void Validate()
{
Expand Down
8 changes: 8 additions & 0 deletions RabbitMQ.AMQP.Client/IConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ public interface IStreamFilterOptions
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions PropertySymbol(string key, string value);

/// <summary>
/// <para>SQL filter expression.</para>
///
/// </summary>
/// <para>Requires RabbitMQ 4.2 or more.</para>
/// Documentation: <see href="https://www.rabbitmq.com/docs/next/stream-filtering#sql-filter-expressions">SQL Filtering</see>
IStreamFilterOptions Sql(string sql);

/// <summary>
/// Return the stream options.
/// </summary>
Expand Down
7 changes: 2 additions & 5 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
private readonly IMetricsReporter? _metricsReporter;

private readonly Dictionary<string, object> _connectionProperties = new();
private bool _areFilterExpressionsSupported = false;
private FeatureFlags _featureFlags = new FeatureFlags();
internal readonly FeatureFlags _featureFlags = new FeatureFlags();

/// <summary>
/// _publishersDict contains all the publishers created by the connection.
Expand Down Expand Up @@ -261,8 +260,6 @@ protected override void Dispose(bool disposing)

internal Connection? NativeConnection => _nativeConnection;

internal bool AreFilterExpressionsSupported => _areFilterExpressionsSupported;

// TODO this couples AmqpConnection with AmqpPublisher, yuck
internal void AddPublisher(Guid id, IPublisher consumer)
{
Expand Down Expand Up @@ -674,7 +671,7 @@ private void HandleProperties(Fields properties)
// this is a feature that was introduced in RabbitMQ 4.2.0
_featureFlags.IsSqlFeatureEnabled = Utils.Is4_2_OrMore(brokerVersion);

_areFilterExpressionsSupported = Utils.SupportsFilterExpressions(brokerVersion);
_featureFlags.IsFilterFeatureEnabled = Utils.SupportsFilterExpressions(brokerVersion);
}
}
}
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public override async Task OpenAsync()
// ListenerContext will override only the filters the selected filters.
if (_configuration.ListenerContext is not null)
{
var listenerStreamOptions = new ListenerStreamOptions(_configuration.Filters, _amqpConnection.AreFilterExpressionsSupported);
var listenerStreamOptions = new ListenerStreamOptions(_configuration.Filters);
var listenerContext = new IConsumerBuilder.ListenerContext(listenerStreamOptions);
_configuration.ListenerContext(listenerContext);
}
Expand Down
71 changes: 40 additions & 31 deletions RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ internal sealed class ConsumerConfiguration
{
public string Address { get; set; } = "";
public int InitialCredits { get; set; } = 100; // TODO use constant, check with Java lib

public Map Filters { get; set; } = new();

// TODO is a MessageHandler *really* optional???
public MessageHandler? Handler { get; set; }

// TODO re-name to ListenerContextAction? Callback?
public Action<IConsumerBuilder.ListenerContext>? ListenerContext = null;
}
Expand Down Expand Up @@ -73,8 +76,7 @@ public IConsumerBuilder SubscriptionListener(Action<IConsumerBuilder.ListenerCon

public IConsumerBuilder.IStreamOptions Stream()
{
return new ConsumerBuilderStreamOptions(this, _configuration.Filters,
_amqpConnection.AreFilterExpressionsSupported);
return new ConsumerBuilderStreamOptions(this, _configuration.Filters);
}

public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationToken = default)
Expand All @@ -84,6 +86,13 @@ public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationTo
throw new ConsumerException("Message handler is not set");
}

if (_configuration.Filters[Consts.SqlFilter] is not null &&
_amqpConnection._featureFlags.IsSqlFeatureEnabled == false)
{
throw new ConsumerException("SQL filter is not supported by the connection." +
"RabbitMQ 4.2.0 or later is required.");
}

AmqpConsumer consumer = new(_amqpConnection, _configuration, _metricsReporter);

// TODO pass cancellationToken
Expand All @@ -104,27 +113,17 @@ public abstract class StreamOptions : IConsumerBuilder.IStreamOptions
private static readonly Regex s_offsetValidator = new Regex("^[0-9]+[YMDhms]$",
RegexOptions.Compiled | RegexOptions.CultureInvariant);

private const string RmqStreamFilter = "rabbitmq:stream-filter";
private const string RmqStreamOffsetSpec = "rabbitmq:stream-offset-spec";
private const string RmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered";

private static readonly Symbol s_streamFilterSymbol = new(RmqStreamFilter);
private static readonly Symbol s_streamOffsetSpecSymbol = new(RmqStreamOffsetSpec);
private static readonly Symbol s_streamMatchUnfilteredSymbol = new(RmqStreamMatchUnfiltered);

private readonly Map _filters;
private readonly bool _areFilterExpressionsSupported;

protected StreamOptions(Map filters, bool areFilterExpressionsSupported)
protected StreamOptions(Map filters)
{
_filters = filters;
_areFilterExpressionsSupported = areFilterExpressionsSupported;
}

public IConsumerBuilder.IStreamOptions Offset(long offset)
{
_filters[s_streamOffsetSpecSymbol] =
new DescribedValue(s_streamOffsetSpecSymbol, offset);
_filters[Consts.s_streamOffsetSpecSymbol] =
new DescribedValue(Consts.s_streamOffsetSpecSymbol, offset);
return this;
}

Expand Down Expand Up @@ -158,24 +157,24 @@ public IConsumerBuilder.IStreamOptions Offset(string interval)

public IConsumerBuilder.IStreamOptions FilterValues(params string[] values)
{
_filters[s_streamFilterSymbol] =
new DescribedValue(s_streamFilterSymbol, values.ToList());
_filters[Consts.s_streamFilterSymbol] =
new DescribedValue(Consts.s_streamFilterSymbol, values.ToList());
return this;
}

public IConsumerBuilder.IStreamOptions FilterMatchUnfiltered(bool matchUnfiltered)
{
_filters[s_streamMatchUnfilteredSymbol]
= new DescribedValue(s_streamMatchUnfilteredSymbol, matchUnfiltered);
_filters[Consts.s_streamMatchUnfilteredSymbol]
= new DescribedValue(Consts.s_streamMatchUnfilteredSymbol, matchUnfiltered);
return this;
}

public abstract IConsumerBuilder Builder();

private void SetOffsetSpecificationFilter(object value)
{
_filters[s_streamOffsetSpecSymbol]
= new DescribedValue(s_streamOffsetSpecSymbol, value);
_filters[Consts.s_streamOffsetSpecSymbol]
= new DescribedValue(Consts.s_streamOffsetSpecSymbol, value);
}

public IConsumerBuilder.IStreamFilterOptions Filter()
Expand All @@ -198,8 +197,8 @@ public IConsumerBuilder.IStreamFilterOptions Filter()
/// </summary>
public class ListenerStreamOptions : StreamOptions
{
public ListenerStreamOptions(Map filters, bool areFilterExpressionsSupported)
: base(filters, areFilterExpressionsSupported)
public ListenerStreamOptions(Map filters)
: base(filters)
{
}

Expand All @@ -221,8 +220,8 @@ public class ConsumerBuilderStreamOptions : StreamOptions
private readonly IConsumerBuilder _consumerBuilder;

public ConsumerBuilderStreamOptions(IConsumerBuilder consumerBuilder,
Map filters, bool areFilterExpressionsSupported)
: base(filters, areFilterExpressionsSupported)
Map filters)
: base(filters)
{
_consumerBuilder = consumerBuilder;
}
Expand All @@ -239,15 +238,27 @@ public override IConsumerBuilder Builder()
/// </summary>
public class StreamFilterOptions : IConsumerBuilder.IStreamFilterOptions
{
private IConsumerBuilder.IStreamOptions _streamOptions;
private Map _filters;
private readonly IConsumerBuilder.IStreamOptions _streamOptions;
private readonly Map _filters;

public StreamFilterOptions(IConsumerBuilder.IStreamOptions streamOptions, Map filters)
{
_streamOptions = streamOptions;
_filters = filters;
}

public IConsumerBuilder.IStreamFilterOptions Sql(string sql)
{
if (string.IsNullOrWhiteSpace(sql))
{
throw new ArgumentNullException(nameof(sql));
}

_filters[Consts.SqlFilter] =
new DescribedValue(Consts.s_streamSqlFilterSymbol, sql);
return this;
}

public IConsumerBuilder.IStreamOptions Stream()
{
return _streamOptions;
Expand Down Expand Up @@ -300,9 +311,8 @@ public IConsumerBuilder.IStreamFilterOptions PropertySymbol(string key, string v

private StreamFilterOptions PropertyFilter(string propertyKey, object propertyValue)
{
const string AmqpPropertiesFilter = "amqp:properties-filter";

DescribedValue propertiesFilterValue = Filter(AmqpPropertiesFilter);
DescribedValue propertiesFilterValue = Filter(Consts.AmqpPropertiesFilter);
Map propertiesFilter = (Map)propertiesFilterValue.Value;
// Note: you MUST use a symbol as the key
propertiesFilter.Add(new Symbol(propertyKey), propertyValue);
Expand All @@ -311,9 +321,8 @@ private StreamFilterOptions PropertyFilter(string propertyKey, object propertyVa

private StreamFilterOptions ApplicationPropertyFilter(string propertyKey, object propertyValue)
{
const string AmqpApplicationPropertiesFilter = "amqp:application-properties-filter";

DescribedValue applicationPropertiesFilterValue = Filter(AmqpApplicationPropertiesFilter);
DescribedValue applicationPropertiesFilterValue = Filter(Consts.AmqpApplicationPropertiesFilter);
Map applicationPropertiesFilter = (Map)applicationPropertiesFilterValue.Value;
// Note: do NOT put a symbol as the key
applicationPropertiesFilter.Add(propertyKey, propertyValue);
Expand Down
14 changes: 11 additions & 3 deletions RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ RabbitMQ.AMQP.Client.ExchangeType.DIRECT = 0 -> RabbitMQ.AMQP.Client.ExchangeTyp
RabbitMQ.AMQP.Client.ExchangeType.FANOUT = 1 -> RabbitMQ.AMQP.Client.ExchangeType
RabbitMQ.AMQP.Client.ExchangeType.HEADERS = 3 -> RabbitMQ.AMQP.Client.ExchangeType
RabbitMQ.AMQP.Client.ExchangeType.TOPIC = 2 -> RabbitMQ.AMQP.Client.ExchangeType
RabbitMQ.AMQP.Client.FeatureFlags
RabbitMQ.AMQP.Client.FeatureFlags.FeatureFlags() -> void
RabbitMQ.AMQP.Client.FeatureFlags.IsBrokerCompatible.get -> bool
RabbitMQ.AMQP.Client.FeatureFlags.IsFilterFeatureEnabled.get -> bool
RabbitMQ.AMQP.Client.FeatureFlags.IsSqlFeatureEnabled.get -> bool
RabbitMQ.AMQP.Client.FeatureFlags.Validate() -> void
RabbitMQ.AMQP.Client.IAddressBuilder<T>
RabbitMQ.AMQP.Client.IAddressBuilder<T>.Exchange(RabbitMQ.AMQP.Client.IExchangeSpecification! exchangeSpec) -> T
RabbitMQ.AMQP.Client.IAddressBuilder<T>.Exchange(string! exchangeName) -> T
Expand Down Expand Up @@ -182,6 +188,7 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Property(string! key,
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.PropertySymbol(string! key, string! value) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.ReplyTo(string! replyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.ReplyToGroupId(string! groupId) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Sql(string! sql) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Subject(string! subject) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.To(string! to) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
Expand Down Expand Up @@ -533,7 +540,7 @@ RabbitMQ.AMQP.Client.Impl.BindingSpecification._routingKey -> string!
RabbitMQ.AMQP.Client.Impl.BindingSpecification._sourceName -> string!
RabbitMQ.AMQP.Client.Impl.BindingSpecification._toQueue -> bool
RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions
RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.ConsumerBuilderStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters, bool areFilterExpressionsSupported) -> void
RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.ConsumerBuilderStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters) -> void
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>.Address() -> string!
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>.DefaultAddressBuilder() -> void
Expand All @@ -557,7 +564,7 @@ RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.Type() -> RabbitMQ.AMQP.Client.QueueT
RabbitMQ.AMQP.Client.Impl.FieldNotSetException
RabbitMQ.AMQP.Client.Impl.FieldNotSetException.FieldNotSetException() -> void
RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions
RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types.Map! filters, bool areFilterExpressionsSupported) -> void
RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types.Map! filters) -> void
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.Build() -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.MessageAddressBuilder(RabbitMQ.AMQP.Client.IMessage! message) -> void
Expand Down Expand Up @@ -605,6 +612,7 @@ RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Property(string! key, object! valu
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.PropertySymbol(string! key, string! value) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.ReplyTo(string! replyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.ReplyToGroupId(string! groupId) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Sql(string! sql) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.StreamFilterOptions(RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! streamOptions, Amqp.Types.Map! filters) -> void
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Subject(string! subject) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
Expand All @@ -618,7 +626,7 @@ RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(long offset) -> RabbitMQ.AMQP.Cli
RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(RabbitMQ.AMQP.Client.StreamOffsetSpecification specification) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(string! interval) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(System.DateTime timestamp) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.Impl.StreamOptions.StreamOptions(Amqp.Types.Map! filters, bool areFilterExpressionsSupported) -> void
RabbitMQ.AMQP.Client.Impl.StreamOptions.StreamOptions(Amqp.Types.Map! filters) -> void
RabbitMQ.AMQP.Client.InternalBugException
RabbitMQ.AMQP.Client.InternalBugException.InternalBugException() -> void
RabbitMQ.AMQP.Client.InternalBugException.InternalBugException(string! message) -> void
Expand Down
Loading
Loading