Skip to content

Commit 6322854

Browse files
committed
consume form 091
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 4e3d7d7 commit 6322854

File tree

7 files changed

+86
-3
lines changed

7 files changed

+86
-3
lines changed

.ci/ubuntu/cluster/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ function run_docker_compose
1919
docker compose --file "$script_dir/docker-compose.yml" $@
2020
}
2121

22-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-beta.4-management-alpine}"
22+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}"
2323

2424
if [[ ! -v GITHUB_ACTIONS ]]
2525
then

.ci/ubuntu/one-node/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
1010

1111

12-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-beta.4-management-alpine}"
12+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}"
1313

1414

1515

.ci/windows/versions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"erlang": "27.2",
3-
"rabbitmq": "4.1.0-beta.4"
3+
"rabbitmq": "4.1.0"
44
}

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,4 +129,5 @@ docs/temp/
129129

130130
# ci logs
131131
.ci/ubuntu/log/*
132+
.ci/ubuntu/cluster/log/*
132133

Directory.Packages.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
<PackageVersion Include="AMQPNetLite.Core" Version="2.4.11" />
88
<PackageVersion Include="OpenTelemetry" Version="1.10.0" />
99
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.10.0" />
10+
<PackageVersion Include="RabbitMQ.Client" Version="7.1.2" />
1011
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.0" />
1112
<!-- HAClient -->
1213
<PackageVersion Include="DotNext.Threading" Version="5.15.0" />
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// This source code is dual-licensed under the Apache License, version 2.0,
2+
// and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
5+
using System.Collections.Generic;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Newtonsoft.Json;
9+
using RabbitMQ.AMQP.Client;
10+
using RabbitMQ.AMQP.Client.Impl;
11+
using RabbitMQ.Client;
12+
using RabbitMQ.Client.Events;
13+
using Xunit;
14+
using Xunit.Abstractions;
15+
16+
namespace Tests.Amqp091
17+
{
18+
public class FromToAmqp091Tests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
19+
{
20+
[Fact]
21+
public async Task ToAmqp091()
22+
{
23+
Assert.NotNull(_connection);
24+
Assert.NotNull(_management);
25+
_queueName = "q";
26+
27+
IQueueSpecification queueSpec = _management.Queue().Name(_queueName).Type(QueueType.QUORUM);
28+
await queueSpec.DeclareAsync();
29+
30+
var publisher = await _connection.PublisherBuilder().BuildAsync();
31+
// string text = JsonConvert.SerializeObject(message); //produces {"Text":"as","Seq":1,"Max":7000}
32+
byte[] body = System.Text.Encoding.UTF8.GetBytes("{Text:as,Seq:1,Max:7000}");
33+
IMessage amqpMessage = new AmqpMessage(body).ToAddress().Queue(_queueName).Build();
34+
for (int i = 0; i < 100; i++)
35+
{
36+
PublishResult result = await publisher.PublishAsync(message: amqpMessage).ConfigureAwait(true);
37+
Assert.NotNull(result);
38+
Assert.Equal(OutcomeState.Accepted, result.Outcome.State);
39+
}
40+
41+
42+
// TaskCompletionSource<IMessage> tcs = new();
43+
// IConsumer consumer = await _connection.ConsumerBuilder()
44+
// .Queue(queueSpec)
45+
// .MessageHandler((context, message) =>
46+
// {
47+
// tcs.SetResult(message);
48+
// context.Accept();
49+
// return Task.CompletedTask;
50+
// }
51+
// ).BuildAndStartAsync();
52+
//
53+
// IMessage receivedMessage = await tcs.Task;
54+
// Assert.NotNull(receivedMessage);
55+
// // get the string form bytes
56+
//
57+
// string receivedMessageBody = System.Text.Encoding.UTF8.GetString((byte[])receivedMessage.Body());
58+
// Assert.Equal("{Text:as,Seq:1,Max:7000}", receivedMessageBody);
59+
60+
61+
var factory = new ConnectionFactory();
62+
var connection = await factory.CreateConnectionAsync();
63+
var channel = await connection.CreateChannelAsync();
64+
var consumer091 = new AsyncEventingBasicConsumer(channel);
65+
var tcs091 = new TaskCompletionSource<BasicDeliverEventArgs>();
66+
consumer091.ReceivedAsync += async (sender, ea) =>
67+
{
68+
tcs091.SetResult(ea);
69+
await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
70+
};
71+
await channel.BasicConsumeAsync(_queueName, false, "consumerTag", consumer091);
72+
var receivedMessage091 = await tcs091.Task;
73+
Assert.NotNull(receivedMessage091);
74+
Assert.Equal(_queueName, receivedMessage091.RoutingKey);
75+
Assert.Equal("consumerTag", receivedMessage091.ConsumerTag);
76+
Assert.Equal("{Text:as,Seq:1,Max:7000}",
77+
System.Text.Encoding.UTF8.GetString(receivedMessage091.Body.ToArray()));
78+
}
79+
}
80+
}

Tests/Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
<PackageReference Include="Microsoft.Extensions.Diagnostics.Testing" />
3030
<PackageReference Include="Microsoft.Extensions.Diagnostics" />
3131
<PackageReference Include="Microsoft.NET.Test.Sdk" />
32+
<PackageReference Include="RabbitMQ.Client" />
3233
<PackageReference Include="System.IdentityModel.Tokens.Jwt" />
3334
<PackageReference Include="System.Text.Json" />
3435
<PackageReference Include="xunit" />

0 commit comments

Comments
 (0)