Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .ci/ubuntu/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
readonly script_dir
echo "[INFO] script_dir: '$script_dir'"

readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}"
if [[ $3 == 'arm' ]]
then
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}"
else
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}"
fi


readonly docker_name_prefix='rabbitmq-amqp-dotnet-client'
readonly docker_network_name="$docker_name_prefix-network"

Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ build:
test: build
dotnet test -c Debug $(CURDIR)/Tests/Tests.csproj --no-build --logger:"console;verbosity=detailed" /p:AltCover=true

rabbitmq-server-start:
./.ci/ubuntu/gha-setup.sh start
rabbitmq-server-start-arm:
./.ci/ubuntu/gha-setup.sh start pull arm

rabbitmq-server-stop:
./.ci/ubuntu/gha-setup.sh stop
Expand Down
39 changes: 32 additions & 7 deletions RabbitMQ.AMQP.Client/IEntities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,23 @@ public interface IEntityInfo
}

/// <summary>
/// Generic interface for declaring entities
/// Generic interface for declaring entities with result of type T
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IEntityDeclaration<T> where T : IEntityInfo
public interface IEntityInfoDeclaration<T> where T : IEntityInfo
{
Task<T> Declare();
}

public interface IQueueSpecification : IEntityDeclaration<IQueueInfo>
/// <summary>
/// Generic interface for declaring entities without result
/// </summary>
public interface IEntityDeclaration
{
Task Declare();
}

public interface IQueueSpecification : IEntityInfoDeclaration<IQueueInfo>
{
IQueueSpecification Name(string name);
public string Name();
Expand Down Expand Up @@ -44,22 +52,39 @@ public interface IQueueDeletion
Task<IEntityInfo> Delete(string name);
}

public interface IExchangeSpecification : IEntityDeclaration<IExchangeInfo>
public interface IExchangeSpecification : IEntityDeclaration
{
IExchangeSpecification Name(string name);

IExchangeSpecification AutoDelete(bool autoDelete);

IExchangeSpecification Type(ExchangeType type);

IExchangeSpecification Type(string type);
IExchangeSpecification Type(string type); // TODO: Add this

IExchangeSpecification Argument(string key, object value);
}


public interface IExchangeDeletion
{
// TODO consider returning a ExchangeStatus object with some info after deletion
Task<IEntityInfo> Delete(string name);
Task Delete(string name);
}

public interface IBindingSpecification
{
IBindingSpecification SourceExchange(string exchange);

IBindingSpecification DestinationQueue(string queue);

IBindingSpecification DestinationExchange(string exchange);

IBindingSpecification Key(string key);

IBindingSpecification Argument(string key, object value);

IBindingSpecification Arguments(Dictionary<string, object> arguments);

Task Bind();
Task Unbind();
}
3 changes: 0 additions & 3 deletions RabbitMQ.AMQP.Client/IEntitiesInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,3 @@ public enum ExchangeType
HEADERS
}

public interface IExchangeInfo : IEntityInfo
{
}
3 changes: 2 additions & 1 deletion RabbitMQ.AMQP.Client/IManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public interface IManagement : IClosable

IExchangeDeletion ExchangeDeletion();

IBindingSpecification Binding();

ITopologyListener TopologyListener();
}

170 changes: 170 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AmqpBindingSpecification.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
using Amqp.Types;

namespace RabbitMQ.AMQP.Client.Impl;

public abstract class BindingSpecificationBase
{
protected string Source = "";
protected string Destination = "";
protected string RoutingKey = "";
protected bool ToQueue = true;
protected Dictionary<string, object> _arguments = new();

protected Map ArgsToMap()
{
Map argMap = new();

foreach ((string key, object value) in _arguments)
{
argMap[key] = value;
}

return argMap;
}
}

public class AmqpBindingSpecification(AmqpManagement management) : BindingSpecificationBase, IBindingSpecification
{
private AmqpManagement Management { get; } = management;

public async Task Bind()
{
var kv = new Map
{
{ "source", Source },
{ "binding_key", RoutingKey },
{ "arguments", ArgsToMap() },
{ ToQueue ? "destination_queue" : "destination_exchange", Destination }
};

await Management.Request(kv, $"/{Consts.Bindings}",
AmqpManagement.Post,
[
AmqpManagement.Code204,
]).ConfigureAwait(false);
}

public async Task Unbind()
{
string destinationCharacter = ToQueue ? "dstq" : "dste";
if (_arguments.Count == 0)
{
string target =
$"/{Consts.Bindings}/src={Utils.EncodePathSegment(Source)};" +
$"{($"{destinationCharacter}={Utils.EncodePathSegment(Destination)};" +
$"key={Utils.EncodePathSegment(RoutingKey)};args=")}";

await Management.Request(
null, target,
AmqpManagement.Delete, new[] { AmqpManagement.Code204 }).ConfigureAwait(false);
}
else
{
string path = BindingsTarget(destinationCharacter, Source, Destination, RoutingKey);
List<Map> bindings = await GetBindings(path).ConfigureAwait(false);
string? uri = MatchBinding(bindings, RoutingKey, ArgsToMap());
if (uri != null)
{
await Management.Request(
null, uri,
AmqpManagement.Delete, new[] { AmqpManagement.Code204 }).ConfigureAwait(false);
}
}
}

public IBindingSpecification SourceExchange(string exchange)
{
ToQueue = false;
Source = exchange;
return this;
}

public IBindingSpecification DestinationQueue(string queue)
{
ToQueue = true;
Destination = queue;
return this;
}

public IBindingSpecification DestinationExchange(string exchange)
{
Destination = exchange;
return this;
}

public IBindingSpecification Key(string key)
{
RoutingKey = key;
return this;
}

public IBindingSpecification Argument(string key, object value)
{
_arguments[key] = value;
return this;
}

public IBindingSpecification Arguments(Dictionary<string, object> arguments)
{
_arguments = arguments;
return this;
}

private string BindingsTarget(
string destinationField, string source, string destination, string key)
{
return "/bindings?src="
+ Utils.EncodeHttpParameter(source)
+ "&"
+ destinationField
+ "="
+ Utils.EncodeHttpParameter(destination)
+ "&key="
+ Utils.EncodeHttpParameter(key);
}

private async Task<List<Map>> GetBindings(string path)
{
var result = await Management.Request(
null, path,
AmqpManagement.Get, new[] { AmqpManagement.Code200 }).ConfigureAwait(false);

if (result.Body is not List list)
{
return [];
}


var l = new List<Map>() { };
foreach (object o in list)
{
if (o is Map item)
{
l.Add(item);
}
}

return l;
}

private string? MatchBinding(List<Map> bindings, string key, Map arguments)
{
string? uri = null;
foreach (Map binding in bindings)
{
string bindingKey = (string)binding["binding_key"];
Map bindingArguments = (Map)binding["arguments"];
if ((key == null && bindingKey == null) || (key != null && key.Equals(bindingKey)))
{
if ((arguments == null && bindingArguments == null) ||
(arguments != null && Utils.CompareMap(arguments, bindingArguments)))
{
uri = binding["location"].ToString();
break;
}
}
}

return uri;
}
}
8 changes: 8 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class AmqpConnection : AbstractClosable, IConnection

private readonly AmqpManagement _management = new();
private readonly RecordingTopologyListener _recordingTopologyListener = new();
private readonly TaskCompletionSource<bool> _connectionCloseTaskCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);

private readonly ConnectionSettings _connectionSettings;
internal readonly AmqpSessionManagement NativePubSubSessions;
Expand Down Expand Up @@ -287,6 +288,8 @@ await _recordingTopologyListener.Accept(visitor)
{
_semaphoreClose.Release();
}

_connectionCloseTaskCompletionSource.SetResult(true);
};
}

Expand Down Expand Up @@ -334,6 +337,11 @@ await _management.CloseAsync()
{
_semaphoreClose.Release();
}

await _connectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(10))
.ConfigureAwait(false);

OnNewStatus(State.Closed, null);
}


Expand Down
21 changes: 5 additions & 16 deletions RabbitMQ.AMQP.Client/Impl/AmqpExchangeSpecification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@

namespace RabbitMQ.AMQP.Client.Impl;

public class AmqpExchangeInfo : IExchangeInfo
{
}

public class AmqpExchangeSpecification(AmqpManagement management) : IExchangeSpecification
{
private string _name = "";
Expand All @@ -15,7 +11,7 @@ public class AmqpExchangeSpecification(AmqpManagement management) : IExchangeSpe
private string _typeString = ""; // TODO: add this
private readonly Map _arguments = new();

public async Task<IExchangeInfo> Declare()
public async Task Declare()
{
if (string.IsNullOrEmpty(_name))
{
Expand All @@ -34,15 +30,13 @@ public async Task<IExchangeInfo> Declare()
// TODO: encodePathSegment(queues)
// Message request = await management.Request(kv, $"/{Consts.Exchanges}/{_name}",
// for the moment we won't use the message response
await management.Request(kv, $"/{Consts.Exchanges}/{_name}",
await management.Request(kv, $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_name)}",
AmqpManagement.Put,
[
AmqpManagement.Code204,
AmqpManagement.Code201,
AmqpManagement.Code409
]).ConfigureAwait(false);

return new AmqpExchangeInfo();
}

public IExchangeSpecification Name(string name)
Expand Down Expand Up @@ -76,18 +70,13 @@ public IExchangeSpecification Argument(string key, object value)
}
}

public class DefaultExchangeDeletionInfo : IEntityInfo
{
}

public class AmqpExchangeDeletion(AmqpManagement management) : IExchangeDeletion
{
public async Task<IEntityInfo> Delete(string name)
public async Task Delete(string name)
{
await management
.Request(null, $"/{Consts.Exchanges}/{name}", AmqpManagement.Delete, new[] { AmqpManagement.Code204, })
.Request(null, $"/{Consts.Exchanges}/{Utils.EncodePathSegment(name)}", AmqpManagement.Delete,
new[] { AmqpManagement.Code204, })
.ConfigureAwait(false);

return new DefaultExchangeDeletionInfo();
}
}
8 changes: 8 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class AmqpManagement : AbstractClosable, IManagement // TODO: Implement T
internal const int Code204 = 204; // TODO: handle 204
internal const int Code409 = 409;
internal const string Put = "PUT";
internal const string Get = "GET";
internal const string Post = "POST";

internal const string Delete = "DELETE";

private const string ReplyTo = "$me";
Expand Down Expand Up @@ -74,6 +77,11 @@ public IExchangeDeletion ExchangeDeletion()
return new AmqpExchangeDeletion(this);
}

public IBindingSpecification Binding()
{
return new AmqpBindingSpecification(this);
}

public ITopologyListener TopologyListener()
{
return _recordingTopologyListener!;
Expand Down
Loading