Skip to content

Commit 2cae9e9

Browse files
committed
RabbitMq health checks
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
1 parent 987f95d commit 2cae9e9

File tree

9 files changed

+870
-164
lines changed

9 files changed

+870
-164
lines changed

src/SlimMessageBus.Host.RabbitMQ/HealthChecks/README.md

Lines changed: 148 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,35 @@
11
# RabbitMQ Health Check
22

3-
The RabbitMQ health check provides a way to monitor the health of your RabbitMQ message bus connection and channel.
3+
The RabbitMQ health check provides a way to monitor the health of your RabbitMQ message bus connection and channel, especially useful with the enhanced connection resilience features.
4+
5+
## Architecture
6+
7+
The RabbitMQ message bus now uses a modular architecture with clear separation of concerns:
8+
9+
- **`RabbitMqMessageBus`**: Main message bus implementation that handles message production and consumption
10+
- **`RabbitMqChannelManager`**: Dedicated connection and channel management with automatic recovery (implements `IRabbitMqChannel`)
11+
- **`RabbitMqHealthCheck`**: Health monitoring for connection status (depends on `IRabbitMqChannel`)
12+
13+
This separation improves maintainability, testability, and allows for better error handling and recovery mechanisms. The health check now depends on the `IRabbitMqChannel` interface instead of the concrete `RabbitMqMessageBus` class, providing better decoupling.
414

515
## Features
616

717
- Checks if the RabbitMQ channel is available and open
818
- Provides detailed diagnostics information
919
- Integrates with ASP.NET Core Health Checks
1020
- Returns appropriate health status based on connection state
21+
- Works with the enhanced connection retry mechanism
22+
- Uses `IRabbitMqChannel` interface for better separation of concerns
23+
24+
## Connection Resilience
25+
26+
The `RabbitMqChannelManager` provides enhanced connection resilience:
27+
28+
- **Continuous Retry**: Unlike the previous 3-attempt limit, the connection now retries indefinitely in the background
29+
- **Automatic Recovery**: Integrates with RabbitMQ's built-in automatic recovery features
30+
- **Connection Monitoring**: Monitors connection shutdown events and triggers reconnection
31+
- **Health Integration**: The health check provides visibility into connection retry status
32+
- **Thread-Safe Operations**: All channel operations are properly synchronized
1133

1234
## Usage
1335

@@ -32,32 +54,36 @@ services.AddHealthChecks()
3254
timeout: TimeSpan.FromSeconds(10));
3355
```
3456

35-
### 3. With Specific Message Bus Instance
57+
### 3. With Specific IRabbitMqChannel Instance
3658

3759
```csharp
3860
services.AddHealthChecks()
3961
.AddRabbitMq(
40-
serviceProvider => serviceProvider.GetRequiredService<RabbitMqMessageBus>(),
62+
serviceProvider => serviceProvider.GetRequiredService<IRabbitMqChannel>(),
4163
name: "rabbitmq-primary");
4264
```
4365

44-
### 4. Complete Example
66+
### 4. Complete Example with Enhanced Resilience
4567

4668
```csharp
4769
var builder = WebApplication.CreateBuilder(args);
4870

49-
// Configure RabbitMQ message bus
71+
// Configure RabbitMQ message bus with enhanced resilience
5072
builder.Services.AddSlimMessageBus(mbb =>
5173
{
5274
mbb.WithProvider<RabbitMqMessageBusSettings>(settings =>
5375
{
5476
settings.ConnectionString = "amqp://localhost";
77+
// The connection factory's NetworkRecoveryInterval will be used for retry timing
78+
settings.ConnectionFactory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
79+
settings.ConnectionFactory.AutomaticRecoveryEnabled = true; // This is set automatically
5580
})
5681
.AddHealthChecks()
5782
.AddRabbitMq();
5883
});
5984

60-
// Register health check service
85+
// Register health check service - this automatically resolves the IRabbitMqChannel
86+
// from the registered RabbitMqMessageBus (which implements IRabbitMqChannel)
6187
builder.Services.AddRabbitMqHealthCheck();
6288

6389
var app = builder.Build();
@@ -72,22 +98,68 @@ app.Run();
7298

7399
### Healthy
74100
- Channel is available and open
75-
- Returns diagnostic data including channel number
101+
- Returns diagnostic data including channel number and connection status
76102

77103
### Unhealthy
78-
- Channel is null (connection failed during initialization)
79-
- Channel is closed
104+
- Channel is null (connection failed and retry is in progress)
105+
- Channel is closed (connection retry may be in progress)
80106
- Exception occurred during health check
81107

82108
## Diagnostic Data
83109

84110
The health check provides the following diagnostic information:
85111

86-
- `ChannelNumber`: The RabbitMQ channel number
112+
- `ChannelAvailable`: Whether the channel object exists
113+
- `ChannelNumber`: The RabbitMQ channel number (when available)
87114
- `ChannelIsOpen`: Whether the channel is currently open
115+
- `ConnectionStatus`: Overall connection status description
88116
- `CloseReason`: Reason for channel closure (when applicable)
89117
- `Exception`: Exception message (when applicable)
90118

119+
## Enhanced Resilience Features
120+
121+
### RabbitMqChannelManager
122+
123+
The new `RabbitMqChannelManager` class provides:
124+
125+
#### **Continuous Connection Retry**
126+
- **No Retry Limit**: Unlike the previous 3-attempt limit during startup, connection retries now continue indefinitely
127+
- **Background Retry**: Failed connections trigger a background timer that continuously attempts reconnection
128+
- **Configurable Interval**: Retry interval uses the `NetworkRecoveryInterval` from the connection factory (default 10 seconds)
129+
130+
#### **Automatic Recovery Integration**
131+
- **Built-in Recovery**: Leverages RabbitMQ client's automatic recovery features
132+
- **Event Monitoring**: Monitors connection shutdown events to trigger custom retry logic
133+
- **Graceful Handling**: Distinguishes between expected shutdowns (during disposal) and unexpected failures
134+
135+
#### **Thread-Safe Channel Operations**
136+
- **ExecuteWithChannel**: Provides safe access to channel operations with automatic locking
137+
- **EnsureChannel**: Validates channel availability and triggers reconnection if needed
138+
- **Proper Synchronization**: All channel access is synchronized to prevent race conditions
139+
140+
#### **Improved Error Handling**
141+
- **Immediate Retry Trigger**: When `EnsureChannel()` detects a null channel, it triggers immediate reconnection
142+
- **Better Logging**: Enhanced logging provides clear visibility into connection state and retry attempts
143+
- **Health Check Integration**: Health checks provide real-time visibility into connection status
144+
145+
### Example Usage in Message Bus
146+
147+
The message bus now uses the channel manager for all operations:
148+
149+
```csharp
150+
// Safe channel execution with automatic retry
151+
_channelManager.ExecuteWithChannel(channel =>
152+
{
153+
var batch = channel.CreateBasicPublishBatch();
154+
// ... batch operations
155+
batch.Publish();
156+
});
157+
158+
// Function execution with return value
159+
var properties = _channelManager.ExecuteWithChannel(channel =>
160+
channel.CreateBasicProperties());
161+
```
162+
91163
## Integration with Circuit Breaker
92164

93165
The RabbitMQ health check can be used with the circuit breaker pattern to automatically disable consumers when the connection is unhealthy:
@@ -106,4 +178,69 @@ services.AddSlimMessageBus(mbb =>
106178
});
107179
```
108180

109-
This ensures that consumers are automatically paused when RabbitMQ is unhealthy, preventing message loss and improving system resilience.
181+
This ensures that consumers are automatically paused when RabbitMQ is unhealthy, preventing message loss and improving system resilience.
182+
183+
## Configuration Options
184+
185+
### Retry Timing
186+
The retry interval is controlled by the RabbitMQ connection factory's `NetworkRecoveryInterval`:
187+
188+
```csharp
189+
settings.ConnectionFactory.NetworkRecoveryInterval = TimeSpan.FromSeconds(30); // 30-second retry interval
190+
```
191+
192+
### Monitoring and Logging
193+
Enhanced logging provides visibility into:
194+
- Initial connection attempts and failures
195+
- Background retry attempts
196+
- Connection recovery events
197+
- Health check status changes
198+
199+
Example log messages:
200+
```
201+
[INFO] RabbitMQ connection established successfully
202+
[WARN] RabbitMQ connection shutdown detected. Reason: Connection forced, Initiator: Library
203+
[INFO] Starting RabbitMQ connection retry timer with interval: 00:00:10
204+
[INFO] Attempting to reconnect to RabbitMQ...
205+
[INFO] RabbitMQ reconnection successful
206+
```
207+
208+
## Testing
209+
210+
The modular architecture improves testability:
211+
212+
- **RabbitMqChannelManager**: Can be tested independently with mocked connections
213+
- **RabbitMqMessageBus**: Simpler to test with the channel management logic extracted
214+
- **RabbitMqHealthCheck**: Now uses `IRabbitMqChannel` interface, making it easier to mock and test
215+
216+
Example unit test for health check:
217+
```csharp
218+
[Fact]
219+
public async Task CheckHealthAsync_Should_Return_Healthy_When_Channel_Is_Open()
220+
{
221+
// Arrange
222+
var channelMock = new Mock<IModel>();
223+
channelMock.SetupGet(x => x.IsOpen).Returns(true);
224+
channelMock.SetupGet(x => x.ChannelNumber).Returns(456);
225+
226+
var rabbitMqChannelMock = new Mock<IRabbitMqChannel>();
227+
rabbitMqChannelMock.Setup(x => x.Channel).Returns(channelMock.Object);
228+
229+
var healthCheck = new RabbitMqHealthCheck(rabbitMqChannelMock.Object, loggerMock.Object);
230+
231+
// Act
232+
var result = await healthCheck.CheckHealthAsync(context);
233+
234+
// Assert
235+
result.Status.Should().Be(HealthStatus.Healthy);
236+
}
237+
```
238+
239+
## Interface-Based Design
240+
241+
The health check now depends on `IRabbitMqChannel` instead of the concrete `RabbitMqMessageBus`:
242+
243+
- **Better Decoupling**: Health check doesn't need to know about message bus internals
244+
- **Easier Testing**: Can mock `IRabbitMqChannel` interface instead of complex message bus
245+
- **Flexible Implementation**: Any class implementing `IRabbitMqChannel` can be health-checked
246+
- **Single Responsibility**: Health check focuses solely on channel health, not message bus operations

src/SlimMessageBus.Host.RabbitMQ/HealthChecks/RabbitMqHealthCheck.cs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ namespace SlimMessageBus.Host.RabbitMQ.HealthChecks;
55
/// <summary>
66
/// Health check for RabbitMQ message bus to verify connection and channel availability.
77
/// </summary>
8-
public class RabbitMqHealthCheck(RabbitMqMessageBus messageBus, ILogger<RabbitMqHealthCheck> logger) : IHealthCheck
8+
public class RabbitMqHealthCheck(IRabbitMqChannel rabbitMqChannel, ILogger<RabbitMqHealthCheck> logger) : IHealthCheck
99
{
1010
public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
1111
{
@@ -14,29 +14,36 @@ public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, Canc
1414
var data = new Dictionary<string, object>();
1515

1616
// Check if the channel is available
17-
if (messageBus.Channel == null)
17+
if (rabbitMqChannel.Channel == null)
1818
{
1919
logger.LogWarning("RabbitMQ health check failed: Channel is not available");
20+
data["ChannelAvailable"] = false;
21+
data["ConnectionStatus"] = "Unknown - Channel is null";
22+
2023
return Task.FromResult(HealthCheckResult.Unhealthy(
21-
"RabbitMQ channel is not available. Connection may have failed during initialization.",
24+
"RabbitMQ channel is not available. Connection may have failed during initialization or is being retried.",
2225
data: data));
2326
}
2427

28+
// Channel is available, gather diagnostic information
29+
data["ChannelAvailable"] = true;
30+
data["ChannelNumber"] = rabbitMqChannel.Channel.ChannelNumber;
31+
2532
// Check if the channel is open
26-
if (!messageBus.Channel.IsOpen)
33+
if (!rabbitMqChannel.Channel.IsOpen)
2734
{
2835
logger.LogWarning("RabbitMQ health check failed: Channel is closed");
29-
data["ChannelNumber"] = messageBus.Channel.ChannelNumber;
30-
data["CloseReason"] = messageBus.Channel.CloseReason?.ToString() ?? "Unknown";
36+
data["ChannelIsOpen"] = false;
37+
data["CloseReason"] = rabbitMqChannel.Channel.CloseReason?.ToString() ?? "Unknown";
3138

3239
return Task.FromResult(HealthCheckResult.Unhealthy(
33-
"RabbitMQ channel is closed",
40+
"RabbitMQ channel is closed. Connection retry may be in progress.",
3441
data: data));
3542
}
3643

3744
// All checks passed - channel is available and open
38-
data["ChannelNumber"] = messageBus.Channel.ChannelNumber;
39-
data["ChannelIsOpen"] = messageBus.Channel.IsOpen;
45+
data["ChannelIsOpen"] = true;
46+
data["ConnectionStatus"] = "Connected";
4047

4148
logger.LogDebug("RabbitMQ health check passed");
4249
return Task.FromResult(HealthCheckResult.Healthy(

src/SlimMessageBus.Host.RabbitMQ/HealthChecks/RabbitMqHealthCheckExtensions.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,18 @@ public static IHealthChecksBuilder AddRabbitMq(
3333
}
3434

3535
/// <summary>
36-
/// Adds a health check for a specific RabbitMQ message bus instance.
36+
/// Adds a health check for a specific RabbitMQ channel instance.
3737
/// </summary>
3838
/// <param name="builder">The health checks builder.</param>
39-
/// <param name="messageBusFactory">A factory function to resolve the specific RabbitMQ message bus instance.</param>
39+
/// <param name="channelFactory">A factory function to resolve the specific RabbitMQ channel instance.</param>
4040
/// <param name="name">The health check name. If null, "rabbitmq" will be used.</param>
4141
/// <param name="failureStatus">The health status that should be reported when the health check fails. If null, <see cref="HealthStatus.Unhealthy"/> will be reported.</param>
4242
/// <param name="tags">A list of tags that can be used to filter sets of health checks.</param>
4343
/// <param name="timeout">An optional <see cref="TimeSpan"/> representing the timeout of the check.</param>
4444
/// <returns>The health checks builder.</returns>
4545
public static IHealthChecksBuilder AddRabbitMq(
4646
this IHealthChecksBuilder builder,
47-
Func<IServiceProvider, RabbitMqMessageBus> messageBusFactory,
47+
Func<IServiceProvider, IRabbitMqChannel> channelFactory,
4848
string name = null,
4949
HealthStatus? failureStatus = null,
5050
IEnumerable<string> tags = null,
@@ -54,9 +54,9 @@ public static IHealthChecksBuilder AddRabbitMq(
5454
name ?? "rabbitmq",
5555
serviceProvider =>
5656
{
57-
var messageBus = messageBusFactory(serviceProvider);
57+
var channel = channelFactory(serviceProvider);
5858
var logger = serviceProvider.GetRequiredService<ILogger<RabbitMqHealthCheck>>();
59-
return new RabbitMqHealthCheck(messageBus, logger);
59+
return new RabbitMqHealthCheck(channel, logger);
6060
},
6161
failureStatus,
6262
tags,
@@ -79,17 +79,17 @@ public static IServiceCollection AddRabbitMqHealthCheck(this IServiceCollection
7979
/// Registers a specific RabbitMQ health check service in the dependency injection container.
8080
/// </summary>
8181
/// <param name="services">The service collection.</param>
82-
/// <param name="messageBusFactory">A factory function to resolve the specific RabbitMQ message bus instance.</param>
82+
/// <param name="channelFactory">A factory function to resolve the specific RabbitMQ channel instance.</param>
8383
/// <returns>The service collection.</returns>
8484
public static IServiceCollection AddRabbitMqHealthCheck(
8585
this IServiceCollection services,
86-
Func<IServiceProvider, RabbitMqMessageBus> messageBusFactory)
86+
Func<IServiceProvider, IRabbitMqChannel> channelFactory)
8787
{
8888
services.AddTransient(serviceProvider =>
8989
{
90-
var messageBus = messageBusFactory(serviceProvider);
90+
var channel = channelFactory(serviceProvider);
9191
var logger = serviceProvider.GetRequiredService<ILogger<RabbitMqHealthCheck>>();
92-
return new RabbitMqHealthCheck(messageBus, logger);
92+
return new RabbitMqHealthCheck(channel, logger);
9393
});
9494
return services;
9595
}

0 commit comments

Comments
 (0)