Skip to content

Commit 5ae374b

Browse files
Luka-HeLuka HeijdenrijkGsantomaggio
authored
Support AMQP over WebSockets (#141)
* Add WebSocket scheme support to ConnectionSettings * Add example for AMQP 1.0 over WebSocket example --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Co-authored-by: Luka Heijdenrijk <Luka@agro-it.nl> Co-authored-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 16b38ed commit 5ae374b

File tree

9 files changed

+131
-8
lines changed

9 files changed

+131
-8
lines changed

Directory.Packages.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
<ItemGroup>
66
<!-- RabbitMQ.Amqp.Client -->
77
<PackageVersion Include="AMQPNetLite.Core" Version="2.4.11" />
8+
<PackageVersion Include="AMQPNetLite.WebSockets" Version="2.4.11" />
89
<PackageVersion Include="OpenTelemetry" Version="1.10.0" />
910
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.10.0" />
1011
<PackageVersion Include="RabbitMQ.Client" Version="6.8.1" />

RabbitMQ.AMQP.Client/ConnectionSettings.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public ConnectionSettingsBuilder Scheme(string scheme)
8181
}
8282
else
8383
{
84-
throw new ArgumentOutOfRangeException(nameof(scheme), "scheme must be 'amqp' or 'amqps'");
84+
throw new ArgumentOutOfRangeException(nameof(scheme), "scheme must be 'amqp', 'amqps', 'ws' or 'wss'");
8585
}
8686
}
8787

@@ -214,6 +214,7 @@ public class ConnectionSettings : IEquatable<ConnectionSettings>
214214
private readonly uint _maxFrameSize = Consts.DefaultMaxFrameSize;
215215
private readonly TlsSettings? _tlsSettings;
216216
private readonly IRecoveryConfiguration _recoveryConfiguration = new RecoveryConfiguration();
217+
private readonly string _webSocketPath = "/";
217218

218219
public ConnectionSettings(Uri uri,
219220
string? containerId = null,
@@ -231,9 +232,10 @@ public ConnectionSettings(Uri uri,
231232
string scheme = uri.Scheme;
232233
if (false == Utils.IsValidScheme(scheme))
233234
{
234-
throw new ArgumentOutOfRangeException("uri.Scheme", "Uri scheme must be 'amqp' or 'amqps'");
235+
throw new ArgumentOutOfRangeException("uri.Scheme", "Uri scheme must be 'amqp', 'amqps', 'ws' or 'wss'");
235236
}
236237

238+
_webSocketPath = string.IsNullOrWhiteSpace(uri.AbsolutePath) ? "/" : uri.AbsolutePath;
237239
_address = InitAddress(uri.Host, uri.Port, user, password, scheme);
238240
_tlsSettings = InitTlsSettings();
239241
}
@@ -242,14 +244,14 @@ protected Address InitAddress(string host, int port, string? user, string? passw
242244
{
243245
if (_oAuth2Options is not null)
244246
{
245-
return new Address(host, port, "", _oAuth2Options.Token, "/", scheme);
247+
return new Address(host, port, "", _oAuth2Options.Token, _webSocketPath, scheme);
246248
}
247249

248250
return new Address(host,
249251
port: port,
250252
user: user,
251253
password: password,
252-
path: "/",
254+
path: _webSocketPath,
253255
scheme: scheme);
254256
}
255257

@@ -274,9 +276,10 @@ public ConnectionSettings(string scheme,
274276
{
275277
if (false == Utils.IsValidScheme(scheme))
276278
{
277-
throw new ArgumentOutOfRangeException(nameof(scheme), "scheme must be 'amqp' or 'amqps'");
279+
throw new ArgumentOutOfRangeException(nameof(scheme), "scheme must be 'amqp', 'amqps', 'ws' or 'wss'");
278280
}
279281

282+
_webSocketPath = "/"; // For TCP transports the path value is ignored by AMQP .Net Lite, so keeping it set to "/" preserves current behavior.
280283
_address = InitAddress(host, port, user, password, scheme);
281284
if (virtualHost is not null)
282285
{
@@ -507,7 +510,7 @@ public ClusterConnectionSettings(IEnumerable<Uri> uris,
507510
string scheme = uri.Scheme;
508511
if (false == Utils.IsValidScheme(scheme))
509512
{
510-
throw new ArgumentOutOfRangeException("uri.Scheme", "Uri scheme must be 'amqp' or 'amqps'");
513+
throw new ArgumentOutOfRangeException("uri.Scheme", "Uri scheme must be 'amqp', 'amqps', 'ws' or 'wss'");
511514
}
512515

513516
(string? user, string? password) = ProcessUserInfo(uri);

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,16 @@ await _semaphoreOpen.WaitAsync(cancellationToken)
361361
open.MaxFrameSize = _connectionSettings.MaxFrameSize;
362362
}
363363

364-
var cf = new ConnectionFactory();
364+
ConnectionFactory cf;
365+
366+
if (_connectionSettings.Scheme.Equals("ws", StringComparison.OrdinalIgnoreCase) || _connectionSettings.Scheme.Equals("wss", StringComparison.OrdinalIgnoreCase))
367+
{
368+
cf = new ConnectionFactory(new TransportProvider[] { new WebSocketTransportFactory() });
369+
}
370+
else
371+
{
372+
cf = new ConnectionFactory();
373+
}
365374

366375
if (_connectionSettings is { UseSsl: true, TlsSettings: not null })
367376
{

RabbitMQ.AMQP.Client/RabbitMQ.AMQP.Client.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
<ItemGroup>
3535
<PackageReference Include="AMQPNetLite.Core" />
36+
<PackageReference Include="AMQPNetLite.WebSockets" />
3637
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
3738
</ItemGroup>
3839

RabbitMQ.AMQP.Client/Utils.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,9 @@ internal static bool CompareMap(Map map1, Map map2)
228228
internal static bool IsValidScheme(string scheme)
229229
{
230230
if (scheme.Equals("amqp", StringComparison.InvariantCultureIgnoreCase) ||
231-
scheme.Equals("amqps", StringComparison.InvariantCultureIgnoreCase))
231+
scheme.Equals("amqps", StringComparison.InvariantCultureIgnoreCase) ||
232+
scheme.Equals("ws", StringComparison.InvariantCultureIgnoreCase) ||
233+
scheme.Equals("wss", StringComparison.InvariantCultureIgnoreCase))
232234
{
233235
return true;
234236
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+

2+
// AMQP 1.0 over WebSocket Example using the RabbitMQ AMQP Client Library
3+
// This example demonstrates how to connect to a RabbitMQ broker using AMQP 1.0 over WebSocket,
4+
// publish messages to a queue, and consume messages from that queue.
5+
6+
using System.Diagnostics;
7+
using RabbitMQ.AMQP.Client;
8+
using RabbitMQ.AMQP.Client.Impl;
9+
using Trace = Amqp.Trace;
10+
using TraceLevel = Amqp.TraceLevel;
11+
12+
Trace.TraceLevel = TraceLevel.Information;
13+
Trace.TraceLevel = TraceLevel.Information;
14+
15+
ConsoleTraceListener consoleListener = new();
16+
Trace.TraceListener = (l, f, a) =>
17+
consoleListener.WriteLine($"[{DateTime.Now}] [{l}] - {f}");
18+
19+
Trace.WriteLine(TraceLevel.Information, "Starting the websocket example...");
20+
21+
// Create a ws in rabbitmq in the virtual host ws and user rabbit with password rabbit
22+
23+
const string amqpConnectionString = "ws://rabbit:rabbit@127.0.0.1:15678/ws";
24+
var cs = new ConnectionSettings(new Uri(amqpConnectionString));
25+
IEnvironment environment = AmqpEnvironment.Create(cs);
26+
27+
// open a connection from the environment setting
28+
IConnection connection = await environment.CreateConnectionAsync().ConfigureAwait(false);
29+
IManagement management = connection.Management();
30+
await management.Queue("ws-queue").Type(QueueType.QUORUM).DeclareAsync().ConfigureAwait(false);
31+
32+
// publish a message
33+
IPublisher publisher = await connection.PublisherBuilder().Queue("ws-queue").BuildAsync().ConfigureAwait(false);
34+
for (int i = 0; i < 10; i++)
35+
{
36+
PublishResult r = await publisher.PublishAsync(new AmqpMessage("Hello over WebSocket " + i)).ConfigureAwait(false);
37+
switch (r.Outcome.State)
38+
{
39+
case OutcomeState.Accepted:
40+
Trace.WriteLine(TraceLevel.Information, $"[Publisher] Message {i} published successfully over WebSocket");
41+
break;
42+
case OutcomeState.Rejected:
43+
Trace.WriteLine(TraceLevel.Error, $"[Publisher] Message {i} was rejected over WebSocket: {r.Outcome.Error}");
44+
break;
45+
case OutcomeState.Released:
46+
Trace.WriteLine(TraceLevel.Warning, $"[Publisher] Message {i} was Released over WebSocket");
47+
break;
48+
49+
}
50+
51+
await Task.Delay(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false);
52+
}
53+
54+
// consume messages
55+
IConsumer consumer = await connection.ConsumerBuilder().Queue("ws-queue").MessageHandler(
56+
(context, message) =>
57+
{
58+
Trace.WriteLine(TraceLevel.Information, $"[Consumer] Message: {message.BodyAsString()} received over WebSocket");
59+
context.Accept();
60+
return Task.CompletedTask;
61+
}
62+
).BuildAndStartAsync().ConfigureAwait(false);
63+
64+
// press a key to close
65+
66+
Trace.WriteLine(TraceLevel.Information, $"Connected to the broker {connection} successfully");
67+
Trace.WriteLine(TraceLevel.Information, "Press any key to exit...");
68+
Console.ReadKey();
69+
await management.Queue("ws-queue").DeleteAsync().ConfigureAwait(false);
70+
await consumer.CloseAsync().ConfigureAwait(false);
71+
await publisher.CloseAsync().ConfigureAwait(false);
72+
await connection.CloseAsync().ConfigureAwait(false);
73+
Trace.WriteLine(TraceLevel.Information, "Closed connection. Exiting...");
74+

docs/Examples/WebSockets/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
AMQP 1.0 over WebSocket Example
2+
===============================================================
3+
4+
This example demonstrates how to use AMQP 1.0 over WebSocket. </br>
5+
You need [Tanzu RabbitMQ 4.0](https://www.vmware.com/products/app-platform/tanzu-rabbitmq) or later with the AMQP 1.0 and `rabbitmq_web_amqp` plugins enabled.
6+
7+
For more info read the blog post: [AMQP 1.0 over WebSocket](https://rabbitmq.com/blog/2024/using-websockets-with-rabbitmq-and-amqp-1-0/)
8+
9+
To run the example you need to have:
10+
- Tanzu RabbitMQ 4.0 or later with the AMQP 1.0 and `rabbitmq_web_amqp` plugins enabled.
11+
- A vhost called `ws` configured for WebSocket connections.
12+
- A user `rabbit` pwd `rabbit` with access to the `ws` vhost.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<ProjectReference Include="..\..\..\RabbitMQ.AMQP.Client\RabbitMQ.AMQP.Client.csproj" />
12+
</ItemGroup>
13+
14+
</Project>

rabbitmq-amqp-dotnet-client.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BatchDispositions", "docs\E
3939
EndProject
4040
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreamFilter", "docs\Examples\StreamFilter\StreamFilter.csproj", "{76B4C910-B9AB-479E-BA77-067CCE0CD37C}"
4141
EndProject
42+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WebSockets", "docs\Examples\WebSockets\WebSockets.csproj", "{7C02823E-7B1C-468D-9549-1DDE1438F862}"
43+
EndProject
4244
Global
4345
GlobalSection(SolutionConfigurationPlatforms) = preSolution
4446
Debug|Any CPU = Debug|Any CPU
@@ -85,6 +87,10 @@ Global
8587
{76B4C910-B9AB-479E-BA77-067CCE0CD37C}.Debug|Any CPU.Build.0 = Debug|Any CPU
8688
{76B4C910-B9AB-479E-BA77-067CCE0CD37C}.Release|Any CPU.ActiveCfg = Release|Any CPU
8789
{76B4C910-B9AB-479E-BA77-067CCE0CD37C}.Release|Any CPU.Build.0 = Release|Any CPU
90+
{7C02823E-7B1C-468D-9549-1DDE1438F862}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
91+
{7C02823E-7B1C-468D-9549-1DDE1438F862}.Debug|Any CPU.Build.0 = Debug|Any CPU
92+
{7C02823E-7B1C-468D-9549-1DDE1438F862}.Release|Any CPU.ActiveCfg = Release|Any CPU
93+
{7C02823E-7B1C-468D-9549-1DDE1438F862}.Release|Any CPU.Build.0 = Release|Any CPU
8894
EndGlobalSection
8995
GlobalSection(SolutionProperties) = preSolution
9096
HideSolutionNode = FALSE
@@ -98,5 +104,6 @@ Global
98104
{C1EA4B66-E60E-4945-A4C6-91B433F9BA65} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
99105
{1FF0D53E-B495-4810-8415-E27DED184C9E} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
100106
{76B4C910-B9AB-479E-BA77-067CCE0CD37C} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
107+
{7C02823E-7B1C-468D-9549-1DDE1438F862} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
101108
EndGlobalSection
102109
EndGlobal

0 commit comments

Comments
 (0)