diff --git a/.ci/ubuntu/cluster/gha-setup.sh b/.ci/ubuntu/cluster/gha-setup.sh index ab369a8..a03ffb4 100755 --- a/.ci/ubuntu/cluster/gha-setup.sh +++ b/.ci/ubuntu/cluster/gha-setup.sh @@ -19,7 +19,7 @@ function run_docker_compose docker compose --file "$script_dir/docker-compose.yml" $@ } -readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-beta.4-management-alpine}" +readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}" if [[ ! -v GITHUB_ACTIONS ]] then diff --git a/.ci/ubuntu/one-node/gha-setup.sh b/.ci/ubuntu/one-node/gha-setup.sh index a18b6dc..77e9cee 100755 --- a/.ci/ubuntu/one-node/gha-setup.sh +++ b/.ci/ubuntu/one-node/gha-setup.sh @@ -9,7 +9,7 @@ readonly script_dir echo "[INFO] script_dir: '$script_dir'" -readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-beta.4-management-alpine}" +readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}" diff --git a/.ci/windows/versions.json b/.ci/windows/versions.json index 088c055..058bd93 100644 --- a/.ci/windows/versions.json +++ b/.ci/windows/versions.json @@ -1,4 +1,4 @@ { "erlang": "27.2", - "rabbitmq": "4.1.0-beta.4" + "rabbitmq": "4.1.0" } diff --git a/.gitignore b/.gitignore index 1a4833d..3a4df22 100644 --- a/.gitignore +++ b/.gitignore @@ -129,4 +129,5 @@ docs/temp/ # ci logs .ci/ubuntu/log/* +.ci/ubuntu/cluster/log/* diff --git a/Directory.Packages.props b/Directory.Packages.props index e0551b2..d45c5f0 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -7,6 +7,7 @@ + diff --git a/RabbitMQ.AMQP.Client/IMessage.cs b/RabbitMQ.AMQP.Client/IMessage.cs index cebd3b5..1a459e1 100644 --- a/RabbitMQ.AMQP.Client/IMessage.cs +++ b/RabbitMQ.AMQP.Client/IMessage.cs @@ -96,7 +96,10 @@ public interface IMessage public object Annotation(string key); public IMessage Annotation(string key, object value); - public object Body(); + public byte[] Body(); + + public string BodyAsString(); + public IMessage Body(object body); IMessageAddressBuilder ToAddress(); diff --git a/RabbitMQ.AMQP.Client/IRpcServer.cs b/RabbitMQ.AMQP.Client/IRpcServer.cs index a6ae22e..a6bfc9c 100644 --- a/RabbitMQ.AMQP.Client/IRpcServer.cs +++ b/RabbitMQ.AMQP.Client/IRpcServer.cs @@ -78,7 +78,8 @@ public interface IRpcServer : ILifeCycle public interface IContext { - IMessage Message(object body); + IMessage Message(byte[] body); + IMessage Message(string body); } } } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs index e3ca964..630ac3a 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs @@ -24,9 +24,24 @@ public AmqpMessage() NativeMessage = new Message(); } - public AmqpMessage(object body) + /// + /// Create a message with a body of type byte[] and BodySection of type Data. + /// + /// + public AmqpMessage(byte[] body) { - NativeMessage = new Message(body); + NativeMessage = new Message(); + NativeMessage.BodySection = new Data { Binary = body }; + } + + /// + /// Create a message with a body of type string and BodySection of type Data. + /// The string is converted to a byte[] using UTF8 encoding. + /// + public AmqpMessage(string body) + { + NativeMessage = new Message(); + NativeMessage.BodySection = new Data() { Binary = System.Text.Encoding.UTF8.GetBytes(body) }; } public AmqpMessage(Message nativeMessage) @@ -245,9 +260,22 @@ public object Annotation(string key) return NativeMessage.MessageAnnotations[new Symbol(key)]; } - public object Body() + public byte[] Body() { - return NativeMessage.Body; + return (byte[])NativeMessage.Body; + } + + public string BodyAsString() + { + if (NativeMessage.BodySection is Data data) + { + return System.Text.Encoding.UTF8.GetString(data.Binary); + } + else + { + throw new InvalidOperationException("Body is not an Application Data"); + } + } public IMessage Body(object body) @@ -255,25 +283,17 @@ public IMessage Body(object body) RestrictedDescribed bodySection; if (body is byte[] byteArray) { - bodySection = new Data - { - Binary = byteArray - }; + bodySection = new Data { Binary = byteArray }; } else if (body is IList list) { - bodySection = new AmqpSequence - { - List = list - }; + bodySection = new AmqpSequence { List = list }; } else { - bodySection = new AmqpValue - { - Value = body - }; + bodySection = new AmqpValue { Value = body }; } + NativeMessage.BodySection = bodySection; return this; } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs index 68b50cd..896c934 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs @@ -166,7 +166,8 @@ await Utils.WaitWithBackOffUntilFuncAsync(async () => private class RpcServerContext : IRpcServer.IContext { - public IMessage Message(object body) => new AmqpMessage(body); + public IMessage Message(byte[] body) => new AmqpMessage(body); + public IMessage Message(string body) => new AmqpMessage(body); } public override async Task CloseAsync() diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index 8a170bc..5f110a5 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -245,8 +245,9 @@ RabbitMQ.AMQP.Client.IMessage.AbsoluteExpiryTime() -> System.DateTime RabbitMQ.AMQP.Client.IMessage.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.Annotation(string! key) -> object! RabbitMQ.AMQP.Client.IMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage! -RabbitMQ.AMQP.Client.IMessage.Body() -> object! +RabbitMQ.AMQP.Client.IMessage.Body() -> byte[]! RabbitMQ.AMQP.Client.IMessage.Body(object! body) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.IMessage.BodyAsString() -> string! RabbitMQ.AMQP.Client.IMessage.ContentEncoding() -> string! RabbitMQ.AMQP.Client.IMessage.ContentEncoding(string! contentEncoding) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.ContentType() -> string! @@ -391,11 +392,13 @@ RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime() -> System.DateTime RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage() -> void RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(Amqp.Message! nativeMessage) -> void -RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(object! body) -> void +RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(byte[]! body) -> void +RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(string! body) -> void RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key) -> object! RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage! -RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body() -> object! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body() -> byte[]! RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body(object! body) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.BodyAsString() -> string! RabbitMQ.AMQP.Client.Impl.AmqpMessage.ContentEncoding() -> string! RabbitMQ.AMQP.Client.Impl.AmqpMessage.ContentEncoding(string! contentEncoding) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.ContentType() -> string! @@ -674,7 +677,8 @@ RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestPostProcessor(System.Func RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.IRpcServer RabbitMQ.AMQP.Client.IRpcServer.IContext -RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(object! body) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(byte[]! body) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(string! body) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IRpcServerBuilder RabbitMQ.AMQP.Client.IRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task! RabbitMQ.AMQP.Client.IRpcServerBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! diff --git a/Tests/Amqp091/FromToAmqp091Tests.cs b/Tests/Amqp091/FromToAmqp091Tests.cs new file mode 100644 index 0000000..8848027 --- /dev/null +++ b/Tests/Amqp091/FromToAmqp091Tests.cs @@ -0,0 +1,94 @@ +// This source code is dual-licensed under the Apache License, version 2.0, +// and the Mozilla Public License, version 2.0. +// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. + +using System.Threading.Tasks; +using RabbitMQ.AMQP.Client; +using RabbitMQ.AMQP.Client.Impl; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Xunit; +using Xunit.Abstractions; + +namespace Tests.Amqp091 +{ + public class FromToAmqp091Tests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) + { + [Fact] + public async Task ToAmqp091() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + + IQueueSpecification queueSpec = _management.Queue().Name(_queueName).Type(QueueType.QUORUM); + await queueSpec.DeclareAsync(); + + var publisher = await _connection.PublisherBuilder().BuildAsync(); + const string body = "{Text:as,Seq:1,Max:7000}"; + IMessage amqpMessage = new AmqpMessage(body).ToAddress().Queue(_queueName).Build(); + for (int i = 0; i < 1; i++) + { + PublishResult result = await publisher.PublishAsync(message: amqpMessage).ConfigureAwait(true); + Assert.NotNull(result); + Assert.Equal(OutcomeState.Accepted, result.Outcome.State); + } + + var factory = new ConnectionFactory(); + var connection = factory.CreateConnection(); + var channel = connection.CreateModel(); + var consumer091 = new EventingBasicConsumer(channel); + var tcs091 = new TaskCompletionSource(); + consumer091.Received += (sender, ea) => + { + tcs091.SetResult(ea); + channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); + }; + channel.BasicConsume(_queueName, false, "consumerTag", consumer091); + var receivedMessage091 = await tcs091.Task; + Assert.NotNull(receivedMessage091); + Assert.Equal(_queueName, receivedMessage091.RoutingKey); + Assert.Equal("consumerTag", receivedMessage091.ConsumerTag); + Assert.Equal("{Text:as,Seq:1,Max:7000}", + System.Text.Encoding.UTF8.GetString(receivedMessage091.Body.ToArray())); + channel.Close(); + connection.Close(); + } + + [Fact] + public async Task FromAmqp091() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + + IQueueSpecification queueSpec = _management.Queue().Name(_queueName).Type(QueueType.QUORUM); + await queueSpec.DeclareAsync(); + + // publish a message with AMQP 0-9-1 + var factory = new ConnectionFactory(); + var connection = factory.CreateConnection(); + var channel = connection.CreateModel(); + channel.BasicPublish( + exchange: "", + routingKey: _queueName, + basicProperties: null, + body: System.Text.Encoding.UTF8.GetBytes("{Text:as,Seq:1,Max:7000}")); + + TaskCompletionSource tcs = new(); + IConsumer consumer = await _connection.ConsumerBuilder() + .Queue(_queueName) + .MessageHandler((context, message) => + { + tcs.SetResult(message); + context.Accept(); + return Task.CompletedTask; + }).BuildAndStartAsync(); + + var receivedMessage = await tcs.Task; + Assert.NotNull(receivedMessage); + Assert.Equal("{Text:as,Seq:1,Max:7000}", receivedMessage.BodyAsString()); + channel.Close(); + connection.Close(); + await consumer.CloseAsync(); + } + } +} diff --git a/Tests/Consumer/BasicConsumerTests.cs b/Tests/Consumer/BasicConsumerTests.cs index 739ee29..968f698 100644 --- a/Tests/Consumer/BasicConsumerTests.cs +++ b/Tests/Consumer/BasicConsumerTests.cs @@ -38,7 +38,7 @@ public async Task SimpleConsumeMessage() await WhenTcsCompletes(tcs); IMessage receivedMessage = await tcs.Task; - Assert.Equal("message_0", receivedMessage.Body()); + Assert.Equal("message_0", receivedMessage.BodyAsString()); await consumer.CloseAsync(); consumer.Dispose(); @@ -69,7 +69,7 @@ public async Task ConsumerReQueueMessage() { try { - Assert.Equal("message_0", message.Body()); + Assert.Equal("message_0", message.BodyAsString()); Interlocked.Increment(ref consumed); switch (consumed) { @@ -167,7 +167,7 @@ Task MessageHandler(IContext cxt, IMessage msg) { if (i % 2 == 0) { - Assert.Equal($"message_{i}", receivedMessagesFromTask[i].Body()); + Assert.Equal($"message_{i}", receivedMessagesFromTask[i].BodyAsString()); } } } diff --git a/Tests/MessagesTests.cs b/Tests/MessagesTests.cs index 749aef1..5dc6e17 100644 --- a/Tests/MessagesTests.cs +++ b/Tests/MessagesTests.cs @@ -22,7 +22,7 @@ public void ValidateMessage() Assert.Equal("CorrelationId_2123", message.CorrelationId()); Assert.Equal("ReplyTo_5123", message.ReplyTo()); Assert.Equal("Subject_9123", message.Subject()); - Assert.Equal("my_body", message.Body()); + Assert.Equal("my_body", message.BodyAsString()); } [Fact] diff --git a/Tests/Rpc/RecoveryRPCTests.cs b/Tests/Rpc/RecoveryRPCTests.cs index 83da14d..8eae484 100644 --- a/Tests/Rpc/RecoveryRPCTests.cs +++ b/Tests/Rpc/RecoveryRPCTests.cs @@ -67,7 +67,7 @@ public async Task RpcServerAndClientShouldRecoverAfterKillConnection() { IMessage response = await rpcClient.PublishAsync(request); messagesConfirmed++; - Assert.Equal("pong", response.Body()); + Assert.Equal("pong", response.BodyAsString()); } catch (AmqpNotOpenException) { diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index 198393b..588d607 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -57,7 +57,7 @@ Task RpcHandler(IRpcServer.IContext context, IMessage request) await p.PublishAsync(new AmqpMessage("test")); IMessage m = await WhenTcsCompletes(tcs); - Assert.Equal("pong", m.Body()); + Assert.Equal("pong", m.BodyAsString()); await rpcServer.CloseAsync(); } @@ -143,7 +143,7 @@ Task MessageHandler(IContext context, IMessage message) Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); IMessage m = await WhenTcsCompletes(tcs); - Assert.Equal("pong", m.Body()); + Assert.Equal("pong", m.BodyAsString()); await rpcServer.CloseAsync(); await consumer.CloseAsync(); @@ -173,7 +173,7 @@ public async Task RpcServerClientPingPongWithDefault() IMessage message = new AmqpMessage("ping"); IMessage response = await rpcClient.PublishAsync(message); - Assert.Equal("pong", response.Body()); + Assert.Equal("pong", response.BodyAsString()); await rpcClient.CloseAsync(); await rpcServer.CloseAsync(); } @@ -209,7 +209,7 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS IMessage message = new AmqpMessage("ping"); IMessage response = await rpcClient.PublishAsync(message); - Assert.Equal("pong", response.Body()); + Assert.Equal("pong", response.BodyAsString()); Assert.Equal(_correlationId, response.CorrelationId()); await rpcClient.CloseAsync(); await rpcServer.CloseAsync(); @@ -265,7 +265,7 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor() while (i < 30) { IMessage response = await rpcClient.PublishAsync(message); - Assert.Equal("pong", response.Body()); + Assert.Equal("pong", response.BodyAsString()); // the server replies with the correlation id in the application properties Assert.Equal($"{_correlationId}_{i}", response.Property("correlationId")); Assert.Equal($"{_correlationId}_{i}", response.Properties()["correlationId"]); @@ -324,7 +324,7 @@ Task RpcHandler(IRpcServer.IContext context, IMessage request) { IMessage message = new AmqpMessage("ping").Property("id", i1); IMessage response = await rpcClient.PublishAsync(message); - Assert.Equal("pong", response.Body()); + Assert.Equal("pong", response.BodyAsString()); })); } @@ -376,7 +376,7 @@ static async Task RpcHandler(IRpcServer.IContext context, IMessage req IMessage msg = new AmqpMessage("ping").Property("wait", 1); IMessage reply = await rpcClient.PublishAsync(msg); - Assert.Equal("pong", reply.Body()); + Assert.Equal("pong", reply.BodyAsString()); await Assert.ThrowsAsync(() => rpcClient.PublishAsync( new AmqpMessage("ping").Property("wait", 700))); diff --git a/Tests/Tests.csproj b/Tests/Tests.csproj index 7833983..a408d5c 100644 --- a/Tests/Tests.csproj +++ b/Tests/Tests.csproj @@ -29,6 +29,7 @@ +