Skip to content

Commit a4638b5

Browse files
committed
chore: add more metrics
1 parent ea97eb6 commit a4638b5

File tree

9 files changed

+205
-31
lines changed

9 files changed

+205
-31
lines changed

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
3434

3535
private readonly AmqpManagement _management;
3636
private readonly RecordingTopologyListener _recordingTopologyListener = new();
37-
38-
private readonly IMetricsReporter _metricsReporter;
39-
37+
4038
internal readonly IConnectionSettings _connectionSettings;
4139
internal readonly AmqpSessionManagement _nativePubSubSessions;
4240

@@ -98,7 +96,7 @@ public static async Task<IConnection> CreateAsync(IConnectionSettings connection
9896
IMetricsReporter? metricsReporter = null)
9997
{
10098
metricsReporter ??= new NoOpMetricsReporter();
101-
var connection = new AmqpConnection(connectionSettings, metricsReporter);
99+
var connection = new AmqpConnection(connectionSettings);
102100
await connection.OpenAsync()
103101
.ConfigureAwait(false);
104102
return connection;
@@ -127,7 +125,7 @@ await base.OpenAsync()
127125
public IPublisherBuilder PublisherBuilder()
128126
{
129127
ThrowIfClosed();
130-
var publisherBuilder = new AmqpPublisherBuilder(this, _metricsReporter);
128+
var publisherBuilder = new AmqpPublisherBuilder(this);
131129
return publisherBuilder;
132130
}
133131

@@ -223,10 +221,9 @@ await consumer.CloseAsync()
223221
}
224222
}
225223

226-
private AmqpConnection(IConnectionSettings connectionSettings, IMetricsReporter metricsReporter)
224+
private AmqpConnection(IConnectionSettings connectionSettings)
227225
{
228226
_connectionSettings = connectionSettings;
229-
_metricsReporter = metricsReporter;
230227
_nativePubSubSessions = new AmqpSessionManagement(this, 1);
231228
_management =
232229
new AmqpManagement(new AmqpManagementParameters(this).TopologyListener(_recordingTopologyListener));

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

55
using System;
6+
using System.Diagnostics;
67
using System.Runtime.CompilerServices;
78
using System.Threading;
89
using System.Threading.Tasks;
910
using Amqp;
1011
using Amqp.Framing;
1112
using Amqp.Types;
13+
using Trace = Amqp.Trace;
14+
using TraceLevel = Amqp.TraceLevel;
1215

1316
namespace RabbitMQ.AMQP.Client.Impl
1417
{
@@ -28,10 +31,12 @@ private enum PauseStatus
2831
private PauseStatus _pauseStatus = PauseStatus.UNPAUSED;
2932
private readonly UnsettledMessageCounter _unsettledMessageCounter = new();
3033
private readonly ConsumerConfiguration _configuration;
34+
private readonly IMetricsReporter _metricsReporter;
3135

32-
public AmqpConsumer(ConsumerConfiguration configuration)
36+
public AmqpConsumer(ConsumerConfiguration configuration, IMetricsReporter metricsReporter)
3337
{
3438
_configuration = configuration;
39+
_metricsReporter = metricsReporter;
3540
if (false == _configuration.Connection.Consumers.TryAdd(_id, this))
3641
{
3742
// TODO error?
@@ -120,6 +125,11 @@ await base.OpenAsync()
120125

121126
private async Task ProcessMessages()
122127
{
128+
IMetricsReporter.ConsumerContext consumerContext = new(_configuration.Address,
129+
_configuration.Connection._connectionSettings.Host,
130+
_configuration.Connection._connectionSettings.Port);
131+
132+
long startTimestamp = 0;
123133
try
124134
{
125135
if (_receiverLink is null)
@@ -130,6 +140,7 @@ private async Task ProcessMessages()
130140

131141
while (_receiverLink is { LinkState: LinkState.Attached })
132142
{
143+
startTimestamp = Stopwatch.GetTimestamp();
133144
// TODO the timeout waiting for messages should be configurable
134145
TimeSpan timeout = TimeSpan.FromSeconds(60);
135146
Message? nativeMessage = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false);
@@ -154,6 +165,8 @@ private async Task ProcessMessages()
154165
{
155166
await _configuration.Handler(context, amqpMessage).ConfigureAwait(false);
156167
}
168+
169+
_metricsReporter.ReportMessageDeliverSuccess(consumerContext, startTimestamp);
157170
}
158171
}
159172
catch (Exception e)
@@ -163,6 +176,7 @@ private async Task ProcessMessages()
163176
return;
164177
}
165178

179+
_metricsReporter.ReportMessageDeliverFailure(consumerContext, startTimestamp, e);
166180
Trace.WriteLine(TraceLevel.Error, $"{ToString()} Failed to process messages, {e}");
167181
// TODO this is where a Listener should get a closed event
168182
// See the ConsumerShouldBeClosedWhenQueueIsDeleted test

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,13 @@ public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationTo
7777
{
7878
throw new ConsumerException("Message handler is not set");
7979
}
80+
#if NET6_0_OR_GREATER
81+
IMetricsReporter metricsReporter = new MetricsReporter();
82+
#else
83+
IMetricsReporter metricsReporter = new NoOpMetricsReporter();
84+
#endif
8085

81-
AmqpConsumer consumer = new(_configuration);
86+
AmqpConsumer consumer = new(_configuration, metricsReporter);
8287

8388
// TODO pass cancellationToken
8489
await consumer.OpenAsync()

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

55
using System;
6+
using System.Diagnostics;
67
using System.Threading;
78
using System.Threading.Tasks;
89
using Amqp;
@@ -127,6 +128,7 @@ public async Task<PublishResult> PublishAsync(IMessage message, CancellationToke
127128
IMetricsReporter.PublisherContext context =
128129
new(_address, _connection._connectionSettings.Host,
129130
_connection._connectionSettings.Port);
131+
long startTimestamp = Stopwatch.GetTimestamp();
130132
try
131133
{
132134
TaskCompletionSource<PublishOutcome> messagePublishedTcs =
@@ -182,12 +184,12 @@ void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object st
182184
// PublishOutcome publishOutcome = await messagePublishedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken)
183185
PublishOutcome publishOutcome = await messagePublishedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5))
184186
.ConfigureAwait(false);
185-
_metricsReporter.ReportMessageSendSuccess(context);
187+
_metricsReporter.ReportMessageSendSuccess(context, startTimestamp);
186188
return new PublishResult(message, publishOutcome);
187189
}
188190
catch (AmqpException ex)
189191
{
190-
_metricsReporter.ReportMessageSendFailure(context, ex);
192+
_metricsReporter.ReportMessageSendFailure(context, startTimestamp, ex);
191193
var publishOutcome = new PublishOutcome(OutcomeState.Rejected, Utils.ConvertError(ex.Error));
192194
return new PublishResult(message, publishOutcome);
193195
}

RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,10 @@ public class AmqpPublisherBuilder : IPublisherBuilder
1515
private string? _key = null;
1616
private string? _queue = null;
1717
private TimeSpan _timeout = TimeSpan.FromSeconds(10);
18-
private IMetricsReporter _metricsReporter;
1918

20-
public AmqpPublisherBuilder(AmqpConnection connection, IMetricsReporter metricsReporter)
19+
public AmqpPublisherBuilder(AmqpConnection connection)
2120
{
2221
_connection = connection;
23-
_metricsReporter = metricsReporter;
2422
}
2523

2624
public IPublisherBuilder Exchange(IExchangeSpecification exchangeSpec)
@@ -69,8 +67,12 @@ public async Task<IPublisher> BuildAsync(CancellationToken cancellationToken = d
6967
{
7068
address = AddressBuilderHelper.AddressBuilder().Exchange(_exchange).Queue(_queue).Key(_key).Address();
7169
}
72-
73-
AmqpPublisher publisher = new(_connection, address, _metricsReporter, _timeout);
70+
#if NET6_0_OR_GREATER
71+
IMetricsReporter metricsReporter = new MetricsReporter();
72+
#else
73+
IMetricsReporter metricsReporter = new NoOpMetricsReporter();
74+
#endif
75+
AmqpPublisher publisher = new(_connection, address, metricsReporter, _timeout);
7476

7577
// TODO pass cancellationToken
7678
await publisher.OpenAsync()

RabbitMQ.AMQP.Client/Impl/IMetricsReporter.cs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,28 @@
1+
using System;
12
using Amqp;
23

34
namespace RabbitMQ.AMQP.Client.Impl
45
{
56
public interface IMetricsReporter
67
{
7-
void ReportMessageSendSuccess(PublisherContext context);
8-
void ReportMessageSendFailure(PublisherContext context, AmqpException amqpException);
8+
void ReportMessageSendSuccess(PublisherContext context, long startTimestamp);
9+
void ReportMessageSendFailure(PublisherContext context, long startTimestamp, AmqpException amqpException);
10+
public void ReportMessageDeliverSuccess(ConsumerContext context, long startTimestamp);
11+
void ReportMessageDeliverFailure(ConsumerContext consumerContext, long startTimestamp, Exception exception);
12+
sealed class ConsumerContext
13+
{
14+
public ConsumerContext(string? destination, string serverAddress, int serverPort)
15+
{
16+
Destination = destination;
17+
ServerAddress = serverAddress;
18+
ServerPort = serverPort;
19+
}
920

21+
public string? Destination { get; }
22+
public string ServerAddress { get; }
23+
public int ServerPort { get; }
24+
}
25+
1026
sealed class PublisherContext
1127
{
1228
public PublisherContext(string? destination, string serverAddress, int serverPort)
Lines changed: 127 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,52 @@
1+
#if NET6_0_OR_GREATER
2+
using System;
13
using System.Collections.Generic;
24
using System.Diagnostics;
35
using System.Diagnostics.Metrics;
46
using Amqp;
7+
#endif
58

69
namespace RabbitMQ.AMQP.Client.Impl
710
{
11+
#if NET6_0_OR_GREATER
812
// .NET docs on metric instrumentation: https://learn.microsoft.com/en-us/dotnet/core/diagnostics/metrics-instrumentation
913
// OpenTelemetry semantic conventions for messaging metric: https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics
10-
internal sealed class SystemDiagnosticMetricsReporter : IMetricsReporter
14+
internal sealed class MetricsReporter : IMetricsReporter
1115
{
1216
const string Version = "0.1.0";
1317

1418
static readonly Meter Meter;
1519

1620
static readonly Counter<int> MessagingClientSentMessages;
21+
static readonly Histogram<double> MessagingClientOperationDuration;
1722

18-
readonly KeyValuePair<string, object?> _messagingOperationSystemTag = new(MessagingSystem, "rabbitmq");
23+
static readonly Counter<int> MessagingClientConsumedMessages;
24+
static readonly Histogram<double> MessagingProcessDuration;
1925

26+
readonly KeyValuePair<string, object?>
27+
_messagingOperationSystemTag = new(MessagingSystem, MessagingSystemValue);
28+
29+
readonly KeyValuePair<string, object?> _publishOperationName = new(MessagingOperationName, PublishOperation);
30+
readonly KeyValuePair<string, object?> _sendOperationType = new(MessagingOperationType, SendOperation);
31+
32+
readonly KeyValuePair<string, object?> _deliverOperationName = new(MessagingOperationName, DeliverOperation);
33+
readonly KeyValuePair<string, object?> _processOperationType = new(MessagingOperationType, ProcessOperation);
34+
35+
private const string MessagingOperationName = "messaging.operation.name";
36+
private const string MessagingOperationType = "messaging.operation.type";
2037
private const string MessagingSystem = "messaging.system";
2138
private const string ErrorType = "error.type";
2239
private const string MessageDestinationName = "messaging.destination.name";
2340
private const string ServerAddress = "server.adress";
2441
private const string ServerPort = "server.port";
2542

43+
private const string ProcessOperation = "process";
44+
private const string DeliverOperation = "deliver";
45+
private const string PublishOperation = "publish";
46+
private const string SendOperation = "send";
47+
private const string MessagingSystemValue = "rabbitmq";
2648

27-
static SystemDiagnosticMetricsReporter()
49+
static MetricsReporter()
2850
{
2951
Meter = new("RabbitMQ.Amqp", Version);
3052

@@ -33,26 +55,124 @@ static SystemDiagnosticMetricsReporter()
3355
unit: "{message}",
3456
description:
3557
"Number of messages producer attempted to send to the broker.");
58+
59+
MessagingClientOperationDuration = Meter.CreateHistogram<double>(
60+
"messaging.client.operation.duration",
61+
unit: "s",
62+
description:
63+
"Duration of messaging operation initiated by a producer or consumer client.");
64+
65+
MessagingClientConsumedMessages = Meter.CreateCounter<int>(
66+
"messaging.client.consumed.messages",
67+
unit: "{message}",
68+
description:
69+
"Number of messages that were delivered to the application. ");
70+
71+
MessagingProcessDuration = Meter.CreateHistogram<double>(
72+
"messaging.process.duration",
73+
unit: "s",
74+
description:
75+
"Duration of processing operation. ");
3676
}
3777

3878

39-
public void ReportMessageSendSuccess(IMetricsReporter.PublisherContext context)
79+
public void ReportMessageSendSuccess(IMetricsReporter.PublisherContext context, long startTimestamp)
4080
{
4181
var serverAddress = new KeyValuePair<string, object?>(ServerAddress, context.ServerAddress);
4282
var serverPort = new KeyValuePair<string, object?>(ServerPort, context.ServerPort);
4383
var destination = new KeyValuePair<string, object?>(MessageDestinationName, context.Destination);
44-
MessagingClientSentMessages.Add(1, serverAddress, serverPort, destination, _messagingOperationSystemTag);
84+
85+
MessagingClientSentMessages.Add(1, serverAddress, serverPort, destination, _messagingOperationSystemTag,
86+
_sendOperationType, _publishOperationName);
87+
if (startTimestamp > 0)
88+
{
89+
#if NET7_0_OR_GREATER
90+
var duration = Stopwatch.GetElapsedTime(startTimestamp);
91+
#else
92+
var duration =
93+
new TimeSpan((long)((Stopwatch.GetTimestamp() - startTimestamp) * StopWatchTickFrequency));
94+
#endif
95+
MessagingClientOperationDuration.Record(duration.TotalSeconds, serverAddress, serverPort, destination,
96+
_messagingOperationSystemTag, _sendOperationType, _publishOperationName);
97+
}
4598
}
4699

47100

48-
public void ReportMessageSendFailure(IMetricsReporter.PublisherContext context, AmqpException amqpException)
101+
public void ReportMessageSendFailure(IMetricsReporter.PublisherContext context, long startTimestamp,
102+
AmqpException amqpException)
49103
{
50104
var errorType = new KeyValuePair<string, object?>(ErrorType, amqpException.GetType().Name);
51105
var serverAddress = new KeyValuePair<string, object?>(ServerAddress, context.ServerAddress);
52106
var serverPort = new KeyValuePair<string, object?>(ServerPort, context.ServerPort);
53107
var destination = new KeyValuePair<string, object?>(MessageDestinationName, context.Destination);
54108
MessagingClientSentMessages.Add(1, errorType, serverAddress, serverPort, destination,
55-
_messagingOperationSystemTag);
109+
_messagingOperationSystemTag, _sendOperationType, _publishOperationName);
110+
111+
if (startTimestamp > 0)
112+
{
113+
#if NET7_0_OR_GREATER
114+
var duration = Stopwatch.GetElapsedTime(startTimestamp);
115+
#else
116+
var duration =
117+
new TimeSpan((long)((Stopwatch.GetTimestamp() - startTimestamp) * StopWatchTickFrequency));
118+
#endif
119+
MessagingClientOperationDuration.Record(duration.TotalSeconds, errorType, serverAddress, serverPort,
120+
destination,
121+
_messagingOperationSystemTag, _sendOperationType, _publishOperationName);
122+
}
123+
}
124+
125+
public void ReportMessageDeliverSuccess(IMetricsReporter.ConsumerContext context, long startTimestamp)
126+
{
127+
var serverAddress = new KeyValuePair<string, object?>(ServerAddress, context.ServerAddress);
128+
var serverPort = new KeyValuePair<string, object?>(ServerPort, context.ServerPort);
129+
var destination = new KeyValuePair<string, object?>(MessageDestinationName, context.Destination);
130+
MessagingClientConsumedMessages.Add(1, serverAddress, serverPort, destination, _messagingOperationSystemTag,
131+
_processOperationType, _deliverOperationName);
132+
if (startTimestamp > 0)
133+
{
134+
#if NET7_0_OR_GREATER
135+
var duration = Stopwatch.GetElapsedTime(startTimestamp);
136+
#else
137+
var duration =
138+
new TimeSpan((long)((Stopwatch.GetTimestamp() - startTimestamp) * StopWatchTickFrequency));
139+
#endif
140+
MessagingProcessDuration.Record(duration.TotalSeconds, serverAddress, serverPort,
141+
destination,
142+
_messagingOperationSystemTag, _sendOperationType, _publishOperationName);
143+
}
144+
}
145+
146+
public void ReportMessageDeliverFailure(IMetricsReporter.ConsumerContext context, long startTimestamp,
147+
Exception exception)
148+
{
149+
var errorType = new KeyValuePair<string, object?>(ErrorType, exception.GetType().Name);
150+
var serverAddress = new KeyValuePair<string, object?>(ServerAddress, context.ServerAddress);
151+
var serverPort = new KeyValuePair<string, object?>(ServerPort, context.ServerPort);
152+
var destination = new KeyValuePair<string, object?>(MessageDestinationName, context.Destination);
153+
MessagingClientConsumedMessages.Add(1, errorType, serverAddress, serverPort, destination,
154+
_messagingOperationSystemTag,
155+
_processOperationType, _deliverOperationName);
156+
if (startTimestamp > 0)
157+
{
158+
#if NET7_0_OR_GREATER
159+
var duration = Stopwatch.GetElapsedTime(startTimestamp);
160+
#else
161+
var duration =
162+
new TimeSpan((long)((Stopwatch.GetTimestamp() - startTimestamp) * StopWatchTickFrequency));
163+
#endif
164+
MessagingProcessDuration.Record(duration.TotalSeconds, errorType, serverAddress, serverPort,
165+
destination,
166+
_messagingOperationSystemTag, _sendOperationType, _publishOperationName);
167+
}
56168
}
169+
#if !NET7_0_OR_GREATER
170+
const long TicksPerMicrosecond = 10;
171+
const long TicksPerMillisecond = TicksPerMicrosecond * 1000;
172+
const long TicksPerSecond = TicksPerMillisecond * 1000; // 10,000,000
173+
static readonly double StopWatchTickFrequency = (double)TicksPerSecond / Stopwatch.Frequency;
174+
#endif
57175
}
176+
#else
177+
#endif
58178
}

0 commit comments

Comments
 (0)