Skip to content

Commit dfe74b9

Browse files
committed
RabbitMq health checks
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
1 parent 883a86d commit dfe74b9

File tree

1 file changed

+156
-28
lines changed

1 file changed

+156
-28
lines changed

src/SlimMessageBus.Host.RabbitMQ/RabbitMqChannelManager.cs

Lines changed: 156 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
11
namespace SlimMessageBus.Host.RabbitMQ;
22

3-
using Microsoft.Extensions.Logging;
43
using global::RabbitMQ.Client;
54

5+
using Microsoft.Extensions.Logging;
6+
67
/// <summary>
78
/// Manages RabbitMQ channel connection, reconnection, retry logic, and background timer.
89
/// Acts as an IModel wrapper with automatic connection recovery.
910
/// </summary>
10-
internal class RabbitMqChannelManager : IRabbitMqChannel, IDisposable
11+
internal partial class RabbitMqChannelManager : IRabbitMqChannel, IDisposable
1112
{
13+
private readonly ILoggerFactory _loggerFactory;
1214
private readonly ILogger _logger;
1315
private readonly RabbitMqMessageBusSettings _providerSettings;
1416
private readonly MessageBusSettings _settings;
15-
private readonly ILoggerFactory _loggerFactory;
1617
private readonly Func<bool> _isDisposingOrDisposed;
1718

1819
private IConnection _connection;
1920
private IModel _channel;
2021
private readonly object _channelLock = new();
21-
22+
2223
private readonly Timer _connectionRetryTimer;
2324
private readonly TimeSpan _connectionRetryInterval;
2425
private volatile bool _isConnecting;
@@ -33,16 +34,16 @@ public RabbitMqChannelManager(
3334
Func<bool> isDisposingOrDisposed)
3435
{
3536
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
37+
_logger = loggerFactory.CreateLogger<RabbitMqChannelManager>();
3638
_providerSettings = providerSettings ?? throw new ArgumentNullException(nameof(providerSettings));
3739
_settings = settings ?? throw new ArgumentNullException(nameof(settings));
3840
_isDisposingOrDisposed = isDisposingOrDisposed ?? throw new ArgumentNullException(nameof(isDisposingOrDisposed));
3941

40-
_logger = _loggerFactory.CreateLogger<RabbitMqChannelManager>();
4142

4243
// Set up connection retry interval (default to NetworkRecoveryInterval or 10 seconds)
4344
var networkRecoveryInterval = _providerSettings.ConnectionFactory.NetworkRecoveryInterval;
4445
_connectionRetryInterval = networkRecoveryInterval == TimeSpan.Zero ? TimeSpan.FromSeconds(10) : networkRecoveryInterval;
45-
46+
4647
// Initialize the retry timer (but don't start it yet)
4748
_connectionRetryTimer = new Timer(OnConnectionRetryTimer, null, Timeout.Infinite, Timeout.Infinite);
4849
}
@@ -67,7 +68,7 @@ await Retry.WithDelay(
6768
{
6869
if (ex is global::RabbitMQ.Client.Exceptions.BrokerUnreachableException && attempt < initialRetryCount)
6970
{
70-
_logger.LogInformation(ex, "Retrying {Retry} of {RetryCount} connection to RabbitMQ...", attempt, initialRetryCount);
71+
LogRetryingConnection(attempt, initialRetryCount, ex);
7172
return true;
7273
}
7374
return false;
@@ -78,15 +79,15 @@ await Retry.WithDelay(
7879

7980
if (connectionEstablished)
8081
{
81-
_logger.LogInformation("RabbitMQ connection established successfully");
82+
LogConnectionEstablishedSuccessfully();
8283
// Stop retry timer if connection was successful
8384
_connectionRetryTimer.Change(Timeout.Infinite, Timeout.Infinite);
8485
}
8586
}
8687
catch (Exception e)
8788
{
88-
_logger.LogError(e, "Could not initialize RabbitMQ connection after {RetryCount} attempts: {ErrorMessage}", initialRetryCount, e.Message);
89-
89+
LogCouldNotInitializeConnection(initialRetryCount, e.Message, e);
90+
9091
// Start continuous retry mechanism
9192
StartConnectionRetryTimer();
9293
}
@@ -102,10 +103,10 @@ public void EnsureChannel()
102103
// If channel is null, trigger immediate reconnection attempt
103104
if (!_isConnecting)
104105
{
105-
_logger.LogWarning("Channel is not available, attempting immediate reconnection");
106+
LogChannelNotAvailableAttemptingReconnection();
106107
Task.Run(() => OnConnectionRetryTimer(null));
107108
}
108-
109+
109110
throw new ProducerMessageBusException("The Channel is not available at this time");
110111
}
111112
}
@@ -116,7 +117,7 @@ public void EnsureChannel()
116117
public void ExecuteWithChannel(Action<IModel> action)
117118
{
118119
EnsureChannel();
119-
120+
120121
lock (_channelLock)
121122
{
122123
action(_channel);
@@ -129,7 +130,7 @@ public void ExecuteWithChannel(Action<IModel> action)
129130
public T ExecuteWithChannel<T>(Func<IModel, T> func)
130131
{
131132
EnsureChannel();
132-
133+
133134
lock (_channelLock)
134135
{
135136
return func(_channel);
@@ -178,7 +179,7 @@ private bool EstablishConnection()
178179
}
179180
catch (Exception ex)
180181
{
181-
_logger.LogWarning(ex, "Failed to establish RabbitMQ connection: {ErrorMessage}", ex.Message);
182+
LogFailedToEstablishConnection(ex.Message, ex);
182183
return false;
183184
}
184185
}
@@ -205,7 +206,7 @@ private void ProvisionTopology()
205206
}
206207
catch (Exception ex)
207208
{
208-
_logger.LogError(ex, "Failed to provision RabbitMQ topology: {ErrorMessage}", ex.Message);
209+
LogFailedToProvisionTopology(ex.Message, ex);
209210
throw;
210211
}
211212
}
@@ -214,9 +215,8 @@ private void OnConnectionShutdown(object sender, ShutdownEventArgs e)
214215
{
215216
if (!_isDisposingOrDisposed())
216217
{
217-
_logger.LogWarning("RabbitMQ connection shutdown detected. Reason: {Reason}, Initiator: {Initiator}",
218-
e.ReplyText, e.Initiator);
219-
218+
LogConnectionShutdownDetected(e.ReplyText, e.Initiator);
219+
220220
// Start retry timer if not already running
221221
StartConnectionRetryTimer();
222222
}
@@ -226,7 +226,7 @@ private void StartConnectionRetryTimer()
226226
{
227227
if (!_isConnecting && !_isDisposingOrDisposed())
228228
{
229-
_logger.LogInformation("Starting RabbitMQ connection retry timer with interval: {Interval}", _connectionRetryInterval);
229+
LogStartingConnectionRetryTimer(_connectionRetryInterval);
230230
_connectionRetryTimer.Change(_connectionRetryInterval, _connectionRetryInterval);
231231
}
232232
}
@@ -241,7 +241,7 @@ private void OnConnectionRetryTimer(object state)
241241
{
242242
if (_connection?.IsOpen == true && _channel?.IsOpen == true)
243243
{
244-
_logger.LogDebug("RabbitMQ connection is healthy, stopping retry timer");
244+
LogConnectionIsHealthy();
245245
_connectionRetryTimer.Change(Timeout.Infinite, Timeout.Infinite);
246246
return;
247247
}
@@ -250,22 +250,22 @@ private void OnConnectionRetryTimer(object state)
250250
_isConnecting = true;
251251
try
252252
{
253-
_logger.LogInformation("Attempting to reconnect to RabbitMQ...");
254-
253+
LogAttemptingToReconnect();
254+
255255
var success = EstablishConnection();
256256
if (success)
257257
{
258-
_logger.LogInformation("RabbitMQ reconnection successful");
258+
LogReconnectionSuccessful();
259259
_connectionRetryTimer.Change(Timeout.Infinite, Timeout.Infinite);
260260
}
261261
else
262262
{
263-
_logger.LogWarning("RabbitMQ reconnection failed, will retry in {Interval}", _connectionRetryInterval);
263+
LogReconnectionFailed(_connectionRetryInterval);
264264
}
265265
}
266266
catch (Exception ex)
267267
{
268-
_logger.LogError(ex, "Error during RabbitMQ reconnection attempt: {ErrorMessage}", ex.Message);
268+
LogErrorDuringReconnectionAttempt(ex.Message, ex);
269269
}
270270
finally
271271
{
@@ -315,11 +315,139 @@ public void Dispose()
315315
{
316316
// Unsubscribe from events before disposing
317317
_connection.ConnectionShutdown -= OnConnectionShutdown;
318-
318+
319319
CloseAndDisposeConnection(_connection);
320320
_connection = null;
321321
}
322322

323323
GC.SuppressFinalize(this);
324324
}
325-
}
325+
326+
#region Logging
327+
328+
[LoggerMessage(
329+
EventId = 0,
330+
Level = LogLevel.Information,
331+
Message = "Retrying {Retry} of {RetryCount} connection to RabbitMQ...")]
332+
private partial void LogRetryingConnection(int retry, int retryCount, Exception ex);
333+
334+
[LoggerMessage(
335+
EventId = 1,
336+
Level = LogLevel.Information,
337+
Message = "RabbitMQ connection established successfully")]
338+
private partial void LogConnectionEstablishedSuccessfully();
339+
340+
[LoggerMessage(
341+
EventId = 2,
342+
Level = LogLevel.Error,
343+
Message = "Could not initialize RabbitMQ connection after {RetryCount} attempts: {ErrorMessage}")]
344+
private partial void LogCouldNotInitializeConnection(int retryCount, string errorMessage, Exception ex);
345+
346+
[LoggerMessage(
347+
EventId = 3,
348+
Level = LogLevel.Warning,
349+
Message = "Channel is not available, attempting immediate reconnection")]
350+
private partial void LogChannelNotAvailableAttemptingReconnection();
351+
352+
[LoggerMessage(
353+
EventId = 4,
354+
Level = LogLevel.Warning,
355+
Message = "Failed to establish RabbitMQ connection: {ErrorMessage}")]
356+
private partial void LogFailedToEstablishConnection(string errorMessage, Exception ex);
357+
358+
[LoggerMessage(
359+
EventId = 5,
360+
Level = LogLevel.Error,
361+
Message = "Failed to provision RabbitMQ topology: {ErrorMessage}")]
362+
private partial void LogFailedToProvisionTopology(string errorMessage, Exception ex);
363+
364+
[LoggerMessage(
365+
EventId = 6,
366+
Level = LogLevel.Warning,
367+
Message = "RabbitMQ connection shutdown detected. Reason: {Reason}, Initiator: {Initiator}")]
368+
private partial void LogConnectionShutdownDetected(string reason, object initiator);
369+
370+
[LoggerMessage(
371+
EventId = 7,
372+
Level = LogLevel.Information,
373+
Message = "Starting RabbitMQ connection retry timer with interval: {Interval}")]
374+
private partial void LogStartingConnectionRetryTimer(TimeSpan interval);
375+
376+
[LoggerMessage(
377+
EventId = 8,
378+
Level = LogLevel.Debug,
379+
Message = "RabbitMQ connection is healthy, stopping retry timer")]
380+
private partial void LogConnectionIsHealthy();
381+
382+
[LoggerMessage(
383+
EventId = 9,
384+
Level = LogLevel.Information,
385+
Message = "Attempting to reconnect to RabbitMQ...")]
386+
private partial void LogAttemptingToReconnect();
387+
388+
[LoggerMessage(
389+
EventId = 10,
390+
Level = LogLevel.Information,
391+
Message = "RabbitMQ reconnection successful")]
392+
private partial void LogReconnectionSuccessful();
393+
394+
[LoggerMessage(
395+
EventId = 11,
396+
Level = LogLevel.Warning,
397+
Message = "RabbitMQ reconnection failed, will retry in {Interval}")]
398+
private partial void LogReconnectionFailed(TimeSpan interval);
399+
400+
[LoggerMessage(
401+
EventId = 12,
402+
Level = LogLevel.Error,
403+
Message = "Error during RabbitMQ reconnection attempt: {ErrorMessage}")]
404+
private partial void LogErrorDuringReconnectionAttempt(string errorMessage, Exception ex);
405+
406+
#endregion
407+
}
408+
409+
#if NETSTANDARD2_0
410+
411+
internal partial class RabbitMqChannelManager
412+
{
413+
private partial void LogRetryingConnection(int retry, int retryCount, Exception ex)
414+
=> _logger.LogInformation(ex, "Retrying {Retry} of {RetryCount} connection to RabbitMQ...", retry, retryCount);
415+
416+
private partial void LogConnectionEstablishedSuccessfully()
417+
=> _logger.LogInformation("RabbitMQ connection established successfully");
418+
419+
private partial void LogCouldNotInitializeConnection(int retryCount, string errorMessage, Exception ex)
420+
=> _logger.LogError(ex, "Could not initialize RabbitMQ connection after {RetryCount} attempts: {ErrorMessage}", retryCount, errorMessage);
421+
422+
private partial void LogChannelNotAvailableAttemptingReconnection()
423+
=> _logger.LogWarning("Channel is not available, attempting immediate reconnection");
424+
425+
private partial void LogFailedToEstablishConnection(string errorMessage, Exception ex)
426+
=> _logger.LogWarning(ex, "Failed to establish RabbitMQ connection: {ErrorMessage}", errorMessage);
427+
428+
private partial void LogFailedToProvisionTopology(string errorMessage, Exception ex)
429+
=> _logger.LogError(ex, "Failed to provision RabbitMQ topology: {ErrorMessage}", errorMessage);
430+
431+
private partial void LogConnectionShutdownDetected(string reason, object initiator)
432+
=> _logger.LogWarning("RabbitMQ connection shutdown detected. Reason: {Reason}, Initiator: {Initiator}", reason, initiator);
433+
434+
private partial void LogStartingConnectionRetryTimer(TimeSpan interval)
435+
=> _logger.LogInformation("Starting RabbitMQ connection retry timer with interval: {Interval}", interval);
436+
437+
private partial void LogConnectionIsHealthy()
438+
=> _logger.LogDebug("RabbitMQ connection is healthy, stopping retry timer");
439+
440+
private partial void LogAttemptingToReconnect()
441+
=> _logger.LogInformation("Attempting to reconnect to RabbitMQ...");
442+
443+
private partial void LogReconnectionSuccessful()
444+
=> _logger.LogInformation("RabbitMQ reconnection successful");
445+
446+
private partial void LogReconnectionFailed(TimeSpan interval)
447+
=> _logger.LogWarning("RabbitMQ reconnection failed, will retry in {Interval}", interval);
448+
449+
private partial void LogErrorDuringReconnectionAttempt(string errorMessage, Exception ex)
450+
=> _logger.LogError(ex, "Error during RabbitMQ reconnection attempt: {ErrorMessage}", errorMessage);
451+
}
452+
453+
#endif

0 commit comments

Comments
 (0)