Skip to content

Commit f47744a

Browse files
aygalincGsantomaggiolukebakken
authored
feat: introduce metrics based on system diagnostics (#84)
* feat: introduce metrics based on system diagnostics * closes: #58 --------- Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Luke Bakken <[email protected]>
1 parent bcdee01 commit f47744a

30 files changed

+950
-78
lines changed

Directory.Packages.props

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,15 @@
55
<ItemGroup>
66
<!-- RabbitMQ.Amqp.Client -->
77
<PackageVersion Include="AMQPNetLite.Core" Version="2.4.11" />
8+
<PackageVersion Include="OpenTelemetry" Version="1.10.0" />
9+
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.10.0" />
10+
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.0" />
811
<!-- HAClient -->
912
<PackageVersion Include="DotNext.Threading" Version="5.15.0" />
1013
<!-- Tests -->
14+
<PackageVersion Include="Microsoft.Extensions.Diagnostics" Version="9.0.0" />
15+
<PackageVersion Include="Microsoft.Extensions.Diagnostics.Testing" Version="9.0.0" />
16+
<PackageVersion Include="System.Text.Json" Version="9.0.0" />
1117
<PackageVersion Include="xunit" Version="2.9.2" />
1218
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
1319
<PackageVersion Include="Xunit.SkippableFact" Version="1.4.13" />
@@ -22,11 +28,7 @@
2228
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299
2329
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1594
2430
-->
25-
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
26-
<PackageVersion Include="System.Runtime.CompilerServices.Unsafe" Version="6.0.0" />
27-
</ItemGroup>
28-
<ItemGroup Condition="'$(TargetFramework)'=='net8.0'">
29-
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
31+
<PackageVersion Include="System.Runtime.CompilerServices.Unsafe" Version="6.1.0" />
3032
</ItemGroup>
3133
<ItemGroup Condition="'$(TargetFrameworkIdentifier)'=='.NETFramework'">
3234
<GlobalPackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" />
@@ -36,4 +38,4 @@
3638
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
3739
<GlobalPackageReference Include="MinVer" Version="6.0.0" />
3840
</ItemGroup>
39-
</Project>
41+
</Project>

RabbitMQ.AMQP.Client/ILifeCycle.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public override string ToString()
3535

3636
public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);
3737

38+
// TODO consider adding IAsyncDisposable that could call CloseAsync()
3839
public interface ILifeCycle : IDisposable
3940
{
4041
Task CloseAsync();
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
5+
using System;
6+
7+
namespace RabbitMQ.AMQP.Client
8+
{
9+
public interface IMetricsReporter
10+
{
11+
enum PublishDispositionValue
12+
{
13+
ACCEPTED,
14+
REJECTED,
15+
RELEASED
16+
};
17+
18+
enum ConsumeDispositionValue
19+
{
20+
ACCEPTED,
21+
DISCARDED,
22+
REQUEUED
23+
};
24+
25+
void ConnectionOpened();
26+
void ConnectionClosed();
27+
28+
void PublisherOpened();
29+
void PublisherClosed();
30+
31+
void ConsumerOpened();
32+
void ConsumerClosed();
33+
34+
void Published(TimeSpan elapsed);
35+
void PublishDisposition(PublishDispositionValue disposition);
36+
37+
void Consumed(TimeSpan elapsed);
38+
void ConsumeDisposition(ConsumeDispositionValue disposition);
39+
}
40+
}

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
3434
private readonly AmqpManagement _management;
3535
private readonly RecordingTopologyListener _recordingTopologyListener = new();
3636

37-
private readonly IConnectionSettings _connectionSettings;
37+
internal readonly IConnectionSettings _connectionSettings;
38+
private readonly IMetricsReporter? _metricsReporter;
3839
internal readonly AmqpSessionManagement _nativePubSubSessions;
3940

4041
private readonly Dictionary<string, object> _connectionProperties = new();
@@ -104,7 +105,6 @@ public IRpcServerBuilder RpcServerBuilder()
104105

105106
public IRpcClientBuilder RpcClientBuilder()
106107
{
107-
108108
return new AmqpRpcClientBuilder(this);
109109
}
110110

@@ -131,17 +131,22 @@ public IEnumerable<IConsumer> Consumers
131131
public IReadOnlyDictionary<string, object> Properties => _connectionProperties;
132132

133133
public long Id { get; set; }
134+
134135
/// <summary>
135136
/// Creates a new instance of <see cref="AmqpConnection"/>
136137
/// Through the Connection is possible to create:
137138
/// - Management. See <see cref="AmqpManagement"/>
138139
/// - Publishers and Consumers: See <see cref="AmqpPublisherBuilder"/> and <see cref="AmqpConsumerBuilder"/>
139140
/// </summary>
140141
/// <param name="connectionSettings"></param>
142+
/// <param name="metricsReporter"></param>
141143
/// <returns></returns>
142-
public static async Task<IConnection> CreateAsync(IConnectionSettings connectionSettings)
144+
// TODO to play nicely with IoC containers, we should not have static Create methods
145+
// TODO rename to CreateAndOpenAsync
146+
public static async Task<IConnection> CreateAsync(IConnectionSettings connectionSettings,
147+
IMetricsReporter? metricsReporter = default)
143148
{
144-
var connection = new AmqpConnection(connectionSettings);
149+
var connection = new AmqpConnection(connectionSettings, metricsReporter);
145150
await connection.OpenAsync()
146151
.ConfigureAwait(false);
147152
return connection;
@@ -154,7 +159,7 @@ public IManagement Management()
154159

155160
public IConsumerBuilder ConsumerBuilder()
156161
{
157-
return new AmqpConsumerBuilder(this);
162+
return new AmqpConsumerBuilder(this, _metricsReporter);
158163
}
159164

160165
// TODO cancellation token
@@ -170,7 +175,7 @@ await base.OpenAsync()
170175
public IPublisherBuilder PublisherBuilder()
171176
{
172177
ThrowIfClosed();
173-
var publisherBuilder = new AmqpPublisherBuilder(this);
178+
var publisherBuilder = new AmqpPublisherBuilder(this, _metricsReporter);
174179
return publisherBuilder;
175180
}
176181

@@ -235,6 +240,7 @@ protected override void Dispose(bool disposing)
235240
{
236241
_nativeConnection.Closed -= _closedCallback;
237242
}
243+
238244
_semaphoreOpen.Dispose();
239245
_semaphoreClose.Dispose();
240246
}
@@ -265,9 +271,10 @@ await consumer.CloseAsync()
265271
}
266272
}
267273

268-
private AmqpConnection(IConnectionSettings connectionSettings)
274+
private AmqpConnection(IConnectionSettings connectionSettings, IMetricsReporter? metricsReporter)
269275
{
270276
_connectionSettings = connectionSettings;
277+
_metricsReporter = metricsReporter;
271278
_nativePubSubSessions = new AmqpSessionManagement(this, 1);
272279
_management =
273280
new AmqpManagement(new AmqpManagementParameters(this).TopologyListener(_recordingTopologyListener));
@@ -291,10 +298,7 @@ await _semaphoreOpen.WaitAsync(cancellationToken)
291298
HostName = $"vhost:{_connectionSettings.VirtualHost}",
292299
// Note: no need to set cf.AMQP.ContainerId
293300
ContainerId = _connectionSettings.ContainerId,
294-
Properties = new Fields()
295-
{
296-
[new Symbol("connection_name")] = _connectionSettings.ContainerId,
297-
}
301+
Properties = new Fields() { [new Symbol("connection_name")] = _connectionSettings.ContainerId, }
298302
};
299303

300304
if (_connectionSettings.MaxFrameSize > uint.MinValue)
@@ -350,7 +354,8 @@ void OnOpened(Amqp.IConnection connection, Open openOnOpened)
350354
if (_connectionSettings is null)
351355
{
352356
// TODO create "internal bug" exception type?
353-
throw new InvalidOperationException("_connectionSettings is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
357+
throw new InvalidOperationException(
358+
"_connectionSettings is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
354359
}
355360
else
356361
{
@@ -497,8 +502,8 @@ await OpenConnectionAsync(CancellationToken.None)
497502
if (false == connected)
498503
{
499504
var notRecoveredError = new Error(ConnectionNotRecoveredCode,
500-
$"{ConnectionNotRecoveredMessage}," +
501-
$"recover status: {_connectionSettings.Recovery}");
505+
$"{ConnectionNotRecoveredMessage}," +
506+
$"recover status: {_connectionSettings.Recovery}");
502507
DoClose(notRecoveredError);
503508
return;
504509
}

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

55
using System;
6+
using System.Diagnostics;
67
using System.Runtime.CompilerServices;
78
using System.Threading;
89
using System.Threading.Tasks;
910
using Amqp;
1011
using Amqp.Framing;
12+
using Trace = Amqp.Trace;
13+
using TraceLevel = Amqp.TraceLevel;
1114

1215
namespace RabbitMQ.AMQP.Client.Impl
1316
{
@@ -28,12 +31,13 @@ private enum PauseStatus
2831
private PauseStatus _pauseStatus = PauseStatus.UNPAUSED;
2932
private readonly UnsettledMessageCounter _unsettledMessageCounter = new();
3033
private readonly ConsumerConfiguration _configuration;
34+
private readonly IMetricsReporter? _metricsReporter;
3135

32-
internal AmqpConsumer(AmqpConnection amqpConnection, ConsumerConfiguration configuration)
36+
internal AmqpConsumer(AmqpConnection amqpConnection, ConsumerConfiguration configuration, IMetricsReporter? metricsReporter)
3337
{
3438
_amqpConnection = amqpConnection;
3539
_configuration = configuration;
36-
40+
_metricsReporter = metricsReporter;
3741
_amqpConnection.AddConsumer(_id, this);
3842
}
3943

@@ -127,11 +131,21 @@ private async Task ProcessMessages()
127131
return;
128132
}
129133

134+
Stopwatch? stopwatch = null;
135+
if (_metricsReporter is not null)
136+
{
137+
stopwatch = new();
138+
}
139+
130140
while (_receiverLink is { LinkState: LinkState.Attached })
131141
{
142+
stopwatch?.Restart();
143+
132144
// TODO the timeout waiting for messages should be configurable
133145
TimeSpan timeout = TimeSpan.FromSeconds(60);
134-
Message? nativeMessage = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false);
146+
Message? nativeMessage = await _receiverLink.ReceiveAsync(timeout)
147+
.ConfigureAwait(false);
148+
135149
if (nativeMessage is null)
136150
{
137151
// this is not a problem, it is just a timeout.
@@ -144,15 +158,24 @@ private async Task ProcessMessages()
144158

145159
_unsettledMessageCounter.Increment();
146160

147-
IContext context = new DeliveryContext(_receiverLink, nativeMessage, _unsettledMessageCounter);
161+
IContext context = new DeliveryContext(_receiverLink, nativeMessage,
162+
_unsettledMessageCounter, _metricsReporter);
148163
var amqpMessage = new AmqpMessage(nativeMessage);
149164

150165
// TODO catch exceptions thrown by handlers,
151166
// then call exception handler?
152-
if (_configuration.Handler != null)
167+
if (_configuration.Handler is not null)
153168
{
154-
await _configuration.Handler(context, amqpMessage).ConfigureAwait(false);
169+
await _configuration.Handler(context, amqpMessage)
170+
.ConfigureAwait(false);
155171
}
172+
173+
if (_metricsReporter is not null && stopwatch is not null)
174+
{
175+
stopwatch.Stop();
176+
_metricsReporter.Consumed(stopwatch.Elapsed);
177+
}
178+
156179
}
157180
}
158181
catch (Exception e)

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ internal sealed class ConsumerConfiguration
1919
public string Address { get; set; } = "";
2020
public int InitialCredits { get; set; } = 100; // TODO use constant, check with Java lib
2121
public Map Filters { get; set; } = new();
22+
// TODO is a MessageHandler *really* optional???
2223
public MessageHandler? Handler { get; set; }
2324
// TODO re-name to ListenerContextAction? Callback?
2425
public Action<IConsumerBuilder.ListenerContext>? ListenerContext = null;
@@ -32,10 +33,12 @@ public class AmqpConsumerBuilder : IConsumerBuilder
3233
{
3334
private readonly ConsumerConfiguration _configuration = new();
3435
private readonly AmqpConnection _amqpConnection;
36+
private readonly IMetricsReporter? _metricsReporter;
3537

36-
public AmqpConsumerBuilder(AmqpConnection connection)
38+
public AmqpConsumerBuilder(AmqpConnection connection, IMetricsReporter? metricsReporter)
3739
{
3840
_amqpConnection = connection;
41+
_metricsReporter = metricsReporter;
3942
}
4043

4144
public IConsumerBuilder Queue(IQueueSpecification queueSpec)
@@ -81,7 +84,7 @@ public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationTo
8184
throw new ConsumerException("Message handler is not set");
8285
}
8386

84-
AmqpConsumer consumer = new(_amqpConnection, _configuration);
87+
AmqpConsumer consumer = new(_amqpConnection, _configuration, _metricsReporter);
8588

8689
// TODO pass cancellationToken
8790
await consumer.OpenAsync()

RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,26 @@ namespace RabbitMQ.AMQP.Client.Impl
1212
{
1313
public class AmqpEnvironment : IEnvironment
1414
{
15-
private IConnectionSettings? ConnectionSettings { get; }
15+
private IConnectionSettings ConnectionSettings { get; }
1616
private long _sequentialId = 0;
1717
private readonly ConcurrentDictionary<long, IConnection> _connections = new();
18+
private readonly IMetricsReporter? _metricsReporter;
1819

19-
private AmqpEnvironment(IConnectionSettings connectionSettings)
20+
private AmqpEnvironment(IConnectionSettings connectionSettings, IMetricsReporter? metricsReporter = default)
2021
{
2122
ConnectionSettings = connectionSettings;
23+
_metricsReporter = metricsReporter;
2224
}
2325

24-
public static Task<IEnvironment> CreateAsync(IConnectionSettings connectionSettings)
26+
// TODO to play nicely with IoC containers, we should not have static Create methods
27+
public static IEnvironment Create(IConnectionSettings connectionSettings, IMetricsReporter? metricsReporter = default)
2528
{
26-
return Task.FromResult<IEnvironment>(new AmqpEnvironment(connectionSettings));
29+
return new AmqpEnvironment(connectionSettings, metricsReporter);
2730
}
2831

2932
public async Task<IConnection> CreateConnectionAsync(IConnectionSettings connectionSettings)
3033
{
31-
IConnection c = await AmqpConnection.CreateAsync(connectionSettings).ConfigureAwait(false);
34+
IConnection c = await AmqpConnection.CreateAsync(connectionSettings, _metricsReporter).ConfigureAwait(false);
3235
c.Id = Interlocked.Increment(ref _sequentialId);
3336
_connections.TryAdd(c.Id, c);
3437
c.ChangeState += (sender, previousState, currentState, failureCause) =>

0 commit comments

Comments
 (0)