Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ea97eb6
feat: add message publishing metrics
aygalinc Oct 31, 2024
a4638b5
chore: add more metrics
aygalinc Nov 3, 2024
8f51146
formatting
Gsantomaggio Nov 4, 2024
bb4e334
fix: use only dotnet 8 ImeterFactory for implementation
aygalinc Nov 4, 2024
ea925ab
chore: format
aygalinc Nov 7, 2024
40e0ae6
use rabbitmq:4.0.2-management
Gsantomaggio Nov 7, 2024
653b1b2
use rabbitmq:4.0.2-management
Gsantomaggio Nov 7, 2024
136aa04
merge
Gsantomaggio Nov 8, 2024
0b4b623
merge
Gsantomaggio Nov 8, 2024
19e834f
Modify Tests.csproj to use net462 since (supposedly) `Microsoft.Exten…
lukebakken Nov 11, 2024
2e91b4f
* `dotnet format` fixes
lukebakken Nov 11, 2024
3723713
* Suppress TFM support build warnings in Tests.csproj
lukebakken Nov 11, 2024
9dc565a
* Use `Stopwatch` in a netstandard2.0 compatible way.
lukebakken Nov 12, 2024
ecef22c
* Change `AmqpEnvironment` to non-async `Create` method.
lukebakken Nov 12, 2024
3270d7a
* Only create `Stopwatch` instance if `IMetricsReporter` is not null.
lukebakken Nov 12, 2024
01a24a2
fixup
lukebakken Nov 12, 2024
db3cabe
chore: add example projet to integrate metric in Open telemetry sdk
aygalinc Nov 13, 2024
3ddf4ab
chore: apply format
aygalinc Nov 13, 2024
a3c073c
chore: update example readme
aygalinc Nov 13, 2024
aa4f348
chore: remove unecessary code
aygalinc Nov 14, 2024
e24ca87
* Move some stuff around
lukebakken Nov 14, 2024
abec929
* Combine metrics context data classes into the same class.
lukebakken Nov 14, 2024
330ce0c
* Start bringing the metrics in-line with the Java AMQP 1.0 client.
lukebakken Nov 16, 2024
c39559a
*Combine metrics tests into a single test suite
lukebakken Nov 18, 2024
7fdf08a
* Add elapsed timespan to publish measurements
lukebakken Nov 18, 2024
1206875
fixup
lukebakken Nov 18, 2024
925c6ae
Thanks @aygalinc for noticing that a call to `Pause` is missing in a …
lukebakken Nov 18, 2024
94edb0d
* Collect consume elapsed time duration.
lukebakken Nov 18, 2024
1e566fd
* Use seconds instead of milliseconds.
lukebakken Nov 18, 2024
c594cff
* Ensure all of the new metrics are tested.
lukebakken Nov 18, 2024
75ba9a8
* Misc fixes, add `InternalBugException`
lukebakken Nov 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>

<ItemGroup>
<!-- RabbitMQ.Amqp.Client -->
<PackageVersion Include="AMQPNetLite.Core" Version="2.4.11" />
<PackageVersion Include="Microsoft.Extensions.Diagnostics.Testing" Version="8.10.0" />
<!-- Tests -->
<PackageVersion Include="xunit" Version="2.9.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
Expand All @@ -14,7 +14,6 @@
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.11.0" />
<PackageVersion Include="EasyNetQ.Management.Client" Version="3.0.0" />
</ItemGroup>

<ItemGroup Condition="$(TargetFramework)=='netstandard2.0'">
<!--
Note: do NOT upgrade these dependencies unless necessary
Expand All @@ -25,22 +24,19 @@
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
<PackageVersion Include="System.Runtime.CompilerServices.Unsafe" Version="6.0.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)'=='net6.0'">
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)'=='net8.0'">
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Diagnostics" Version="8.0.1" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFrameworkIdentifier)'=='.NETFramework'">
<GlobalPackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" />
</ItemGroup>

<ItemGroup Condition="'$(IsPackable)'=='true'">
<GlobalPackageReference Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" Version="3.3.4" />
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<GlobalPackageReference Include="MinVer" Version="5.0.0" />
</ItemGroup>
</Project>
</Project>
38 changes: 38 additions & 0 deletions RabbitMQ.AMQP.Client/IMetricsReporter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using Amqp;

namespace RabbitMQ.AMQP.Client
{
public interface IMetricsReporter
{
void ReportMessageSendSuccess(PublisherContext context, long startTimestamp);
void ReportMessageSendFailure(PublisherContext context, long startTimestamp, AmqpException amqpException);
public void ReportMessageDeliverSuccess(ConsumerContext context, long startTimestamp);
sealed class ConsumerContext
{
public ConsumerContext(string? destination, string serverAddress, int serverPort)
{
Destination = destination;
ServerAddress = serverAddress;
ServerPort = serverPort;
}

public string? Destination { get; }
public string ServerAddress { get; }
public int ServerPort { get; }
}

sealed class PublisherContext
{
public PublisherContext(string? destination, string serverAddress, int serverPort)
{
Destination = destination;
ServerAddress = serverAddress;
ServerPort = serverPort;
}

public string? Destination { get; }
public string ServerAddress { get; }
public int ServerPort { get; }
}
}
}
34 changes: 19 additions & 15 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
private readonly AmqpManagement _management;
private readonly RecordingTopologyListener _recordingTopologyListener = new();

private readonly IConnectionSettings _connectionSettings;
internal readonly IConnectionSettings _connectionSettings;
private readonly IMetricsReporter _metricsReporter;
internal readonly AmqpSessionManagement _nativePubSubSessions;

/// <summary>
/// Publishers contains all the publishers created by the connection.
/// Each connection can have multiple publishers.
/// They key is the publisher Id ( a Guid)
/// See <see cref="AmqpPublisher"/>
/// </summary>
internal ConcurrentDictionary<Guid, IPublisher> Publishers { get; } = new();

internal ConcurrentDictionary<Guid, IConsumer> Consumers { get; } = new();

private readonly TaskCompletionSource<bool> _connectionClosedTcs =
Expand All @@ -57,7 +59,6 @@ public IRpcServerBuilder RpcServerBuilder()

public IRpcClientBuilder RpcClientBuilder()
{

return new AmqpRpcClientBuilder(this);
}

Expand All @@ -82,17 +83,20 @@ public ReadOnlyCollection<IConsumer> GetConsumers()
}

public long Id { get; set; }

/// <summary>
/// Creates a new instance of <see cref="AmqpConnection"/>
/// Through the Connection is possible to create:
/// - Management. See <see cref="AmqpManagement"/>
/// - Publishers and Consumers: See <see cref="AmqpPublisherBuilder"/> and <see cref="AmqpConsumerBuilder"/>
/// </summary>
/// <param name="connectionSettings"></param>
/// <param name="metricsReporter"></param>
/// <returns></returns>
public static async Task<IConnection> CreateAsync(IConnectionSettings connectionSettings)
public static async Task<IConnection> CreateAsync(IConnectionSettings connectionSettings,
IMetricsReporter? metricsReporter = default)
{
var connection = new AmqpConnection(connectionSettings);
var connection = new AmqpConnection(connectionSettings, metricsReporter ?? new NoOpMetricsReporter());
await connection.OpenAsync()
.ConfigureAwait(false);
return connection;
Expand All @@ -105,7 +109,7 @@ public IManagement Management()

public IConsumerBuilder ConsumerBuilder()
{
return new AmqpConsumerBuilder(this);
return new AmqpConsumerBuilder(this, _metricsReporter);
}

// TODO cancellation token
Expand All @@ -121,7 +125,7 @@ await base.OpenAsync()
public IPublisherBuilder PublisherBuilder()
{
ThrowIfClosed();
var publisherBuilder = new AmqpPublisherBuilder(this);
var publisherBuilder = new AmqpPublisherBuilder(this, _metricsReporter);
return publisherBuilder;
}

Expand Down Expand Up @@ -184,6 +188,7 @@ protected override void Dispose(bool disposing)
{
_nativeConnection.Closed -= _closedCallback;
}

_semaphoreOpen.Dispose();
_semaphoreClose.Dispose();
}
Expand Down Expand Up @@ -216,9 +221,10 @@ await consumer.CloseAsync()
}
}

private AmqpConnection(IConnectionSettings connectionSettings)
private AmqpConnection(IConnectionSettings connectionSettings, IMetricsReporter metricsReporter)
{
_connectionSettings = connectionSettings;
_metricsReporter = metricsReporter;
_nativePubSubSessions = new AmqpSessionManagement(this, 1);
_management =
new AmqpManagement(new AmqpManagementParameters(this).TopologyListener(_recordingTopologyListener));
Expand All @@ -242,10 +248,7 @@ await _semaphoreOpen.WaitAsync()
HostName = $"vhost:{_connectionSettings.VirtualHost}",
// Note: no need to set cf.AMQP.ContainerId
ContainerId = _connectionSettings.ContainerId,
Properties = new Fields()
{
[new Symbol("connection_name")] = _connectionSettings.ContainerId,
}
Properties = new Fields() { [new Symbol("connection_name")] = _connectionSettings.ContainerId, }
};

if (_connectionSettings.MaxFrameSize > uint.MinValue)
Expand Down Expand Up @@ -300,7 +303,8 @@ void OnOpened(Amqp.IConnection connection, Open open1)
if (_connectionSettings is null)
{
// TODO create "internal bug" exception type?
throw new InvalidOperationException("_connectionSettings is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
throw new InvalidOperationException(
"_connectionSettings is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
else
{
Expand Down Expand Up @@ -447,8 +451,8 @@ await OpenConnectionAsync(CancellationToken.None)
if (false == connected)
{
var notRecoveredError = new Error(ConnectionNotRecoveredCode,
$"{ConnectionNotRecoveredMessage}," +
$"recover status: {_connectionSettings.Recovery}");
$"{ConnectionNotRecoveredMessage}," +
$"recover status: {_connectionSettings.Recovery}");
DoClose(notRecoveredError);
return;
}
Expand Down
20 changes: 17 additions & 3 deletions RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
using Amqp.Types;
using Trace = Amqp.Trace;
using TraceLevel = Amqp.TraceLevel;

namespace RabbitMQ.AMQP.Client.Impl
{
Expand All @@ -28,10 +31,12 @@ private enum PauseStatus
private PauseStatus _pauseStatus = PauseStatus.UNPAUSED;
private readonly UnsettledMessageCounter _unsettledMessageCounter = new();
private readonly ConsumerConfiguration _configuration;
private readonly IMetricsReporter _metricsReporter;

public AmqpConsumer(ConsumerConfiguration configuration)
public AmqpConsumer(ConsumerConfiguration configuration, IMetricsReporter metricsReporter)
{
_configuration = configuration;
_metricsReporter = metricsReporter;
if (false == _configuration.Connection.Consumers.TryAdd(_id, this))
{
// TODO error?
Expand Down Expand Up @@ -120,6 +125,11 @@ await base.OpenAsync()

private async Task ProcessMessages()
{
IMetricsReporter.ConsumerContext consumerContext = new(_configuration.Address,
_configuration.Connection._connectionSettings.Host,
_configuration.Connection._connectionSettings.Port);

long startTimestamp = 0;
try
{
if (_receiverLink is null)
Expand All @@ -130,6 +140,7 @@ private async Task ProcessMessages()

while (_receiverLink is { LinkState: LinkState.Attached })
{
startTimestamp = Stopwatch.GetTimestamp();
// TODO the timeout waiting for messages should be configurable
TimeSpan timeout = TimeSpan.FromSeconds(60);
Message? nativeMessage = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false);
Expand All @@ -143,17 +154,20 @@ private async Task ProcessMessages()
continue;
}

_metricsReporter.ReportMessageDeliverSuccess(consumerContext, startTimestamp);
_unsettledMessageCounter.Increment();

IContext context = new DeliveryContext(_receiverLink, nativeMessage, _unsettledMessageCounter);
var amqpMessage = new AmqpMessage(nativeMessage);

// TODO catch exceptions thrown by handlers,
// then call exception handler?

if (_configuration.Handler != null)
{
await _configuration.Handler(context, amqpMessage).ConfigureAwait(false);
}

}
}
catch (Exception e)
Expand All @@ -162,7 +176,7 @@ private async Task ProcessMessages()
{
return;
}

Trace.WriteLine(TraceLevel.Error, $"{ToString()} Failed to process messages, {e}");
// TODO this is where a Listener should get a closed event
// See the ConsumerShouldBeClosedWhenQueueIsDeleted test
Expand Down
6 changes: 4 additions & 2 deletions RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ public class ConsumerConfiguration
public class AmqpConsumerBuilder : IConsumerBuilder
{
private readonly ConsumerConfiguration _configuration = new();
private readonly IMetricsReporter _metricsReporter;

public AmqpConsumerBuilder(AmqpConnection connection)
public AmqpConsumerBuilder(AmqpConnection connection, IMetricsReporter metricsReporter)
{
_metricsReporter = metricsReporter;
_configuration.Connection = connection;
}

Expand Down Expand Up @@ -78,7 +80,7 @@ public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationTo
throw new ConsumerException("Message handler is not set");
}

AmqpConsumer consumer = new(_configuration);
AmqpConsumer consumer = new(_configuration, _metricsReporter);

// TODO pass cancellationToken
await consumer.OpenAsync()
Expand Down
14 changes: 9 additions & 5 deletions RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,27 @@ namespace RabbitMQ.AMQP.Client.Impl
{
public class AmqpEnvironment : IEnvironment
{
private IConnectionSettings? ConnectionSettings { get; }
private IConnectionSettings ConnectionSettings { get; }
private long _sequentialId = 0;
private readonly ConcurrentDictionary<long, IConnection> _connections = new();
private readonly IMetricsReporter? _metricsReporter;

private AmqpEnvironment(IConnectionSettings connectionSettings)
private AmqpEnvironment(IConnectionSettings connectionSettings, IMetricsReporter? metricsReporter)
{
ConnectionSettings = connectionSettings;
_metricsReporter = metricsReporter;
}

public static Task<IEnvironment> CreateAsync(IConnectionSettings connectionSettings)
public static Task<IEnvironment> CreateAsync(IConnectionSettings connectionSettings,
IMetricsReporter? metricsReporter = default)
{
return Task.FromResult<IEnvironment>(new AmqpEnvironment(connectionSettings));
return Task.FromResult<IEnvironment>(new AmqpEnvironment(connectionSettings,
metricsReporter ?? new NoOpMetricsReporter()));
}

public async Task<IConnection> CreateConnectionAsync(IConnectionSettings connectionSettings)
{
IConnection c = await AmqpConnection.CreateAsync(connectionSettings).ConfigureAwait(false);
IConnection c = await AmqpConnection.CreateAsync(connectionSettings,_metricsReporter).ConfigureAwait(false);
c.Id = Interlocked.Increment(ref _sequentialId);
_connections.TryAdd(c.Id, c);
c.ChangeState += (sender, previousState, currentState, failureCause) =>
Expand Down
Loading
Loading