Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions RabbitMQ.AMQP.Client/FeatureFlags.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// This source code is dual-licensed under the Apache License, version 2.0,
// and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

namespace RabbitMQ.AMQP.Client
{
internal class FeatureFlags
{
public bool IsSqlFeatureEnabled { get; set; } = false;
public bool IsBrokerCompatible { get; set; } = false;

public void Validate()
{
if (!IsBrokerCompatible)
{
throw new ConnectionException("Client not compatible with the broker version. " +
"The client requires RabbitMQ 4.0 or later.");
}
}
}
}
12 changes: 8 additions & 4 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class AmqpConnection : AbstractLifeCycle, IConnection

private readonly Dictionary<string, object> _connectionProperties = new();
private bool _areFilterExpressionsSupported = false;
private FeatureFlags _featureFlags = new FeatureFlags();

/// <summary>
/// _publishersDict contains all the publishers created by the connection.
Expand Down Expand Up @@ -90,6 +91,7 @@ public static async Task<IConnection> CreateAsync(ConnectionSettings connectionS
AmqpConnection connection = new(connectionSettings, metricsReporter);
await connection.OpenAsync()
.ConfigureAwait(false);

return connection;
}

Expand Down Expand Up @@ -189,6 +191,7 @@ await OpenConnectionAsync(CancellationToken.None)
.ConfigureAwait(false);
await base.OpenAsync()
.ConfigureAwait(false);
_featureFlags.Validate();
}

public override async Task CloseAsync()
Expand Down Expand Up @@ -665,10 +668,11 @@ private void HandleProperties(Fields properties)
}

string brokerVersion = (string)_connectionProperties["version"];
if (false == Utils.Is4_0_OrMore(brokerVersion))
{
// TODO Java client throws exception here
}
_featureFlags.IsBrokerCompatible = Utils.Is4_0_OrMore(brokerVersion);

// check if the broker supports filter expressions
// this is a feature that was introduced in RabbitMQ 4.2.0
_featureFlags.IsSqlFeatureEnabled = Utils.Is4_2_OrMore(brokerVersion);

_areFilterExpressionsSupported = Utils.SupportsFilterExpressions(brokerVersion);
}
Expand Down
5 changes: 5 additions & 0 deletions RabbitMQ.AMQP.Client/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ internal static bool Is4_1_OrMore(string brokerVersion)
return VersionCompare(CurrentVersion(brokerVersion), "4.1.0") >= 0;
}

internal static bool Is4_2_OrMore(string brokerVersion)
{
return VersionCompare(CurrentVersion(brokerVersion), "4.2.0") >= 0;
}

private static string CurrentVersion(string currentVersion)
{
// versions built from source: 3.7.0+rc.1.4.gedc5d96
Expand Down