Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit df383a5

Browse files
Merge pull request #43 from AntonyVorontsov/feature/connection-retries
Added a retry mechanism for creating an initial connection.
2 parents cd75e77 + 42654d3 commit df383a5

File tree

6 files changed

+266
-11
lines changed

6 files changed

+266
-11
lines changed

docs/rabbit-configuration.md

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ A RabbitMQ client can be configured via a configuration section located in the `
4040
"AutomaticRecoveryEnabled": true,
4141
"TopologyRecoveryEnabled": true,
4242
"RequestedConnectionTimeout": 60000,
43-
"RequestedHeartbeat": 60
43+
"RequestedHeartbeat": 60,
44+
"InitialConnectionRetries": 5,
45+
"InitialConnectionRetryTimeoutMilliseconds": 200
4446
}
4547
}
4648
```
@@ -53,13 +55,15 @@ A RabbitMQ connection can be configured with properties:
5355
- `UserName` - user that connects to the server,
5456
- `Password` - password of the user that connects to the server,
5557
- `ClientProvidedName` - application-specific connection name that will be displayed in the management UI if RabbitMQ server supports it,
56-
- `VirtualHost` - default virtual host,
58+
- `VirtualHost` - the default virtual host,
5759
- `AutomaticRecoveryEnabled` - automatic connection recovery option,
5860
- `TopologyRecoveryEnabled` - topology recovery option,
5961
- `RequestedConnectionTimeout` - timeout for connection attempts,
60-
- `RequestedHeartbeat` - heartbeat timeout.
62+
- `RequestedHeartbeat` - heartbeat timeout,
63+
- `InitialConnectionRetries` - a number of retries which could be attempted while trying to make an initial connection,
64+
- `InitialConnectionRetryTimeoutMilliseconds` - timeout in milliseconds which could be used while trying to make an initial connection.
6165

62-
`ClientProvidedName` is optional and can be null. Options `VirtualHost`, `AutomaticRecoveryEnabled`, `TopologyRecoveryEnabled`, `RequestedConnectionTimeout`, `RequestedHeartbeat` are set with default values, so you can leave them.
66+
`ClientProvidedName` is optional and can be null. Options `VirtualHost`, `AutomaticRecoveryEnabled`, `TopologyRecoveryEnabled`, `RequestedConnectionTimeout`, `RequestedHeartbeat`, `InitialConnectionRetries`, `InitialConnectionRetryTimeoutMilliseconds` are set with default values, so you can leave them.
6367

6468
```json
6569
{

src/RabbitMQ.Client.Core.DependencyInjection/Configuration/RabbitMqClientOptions.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,15 @@ public class RabbitMqClientOptions
8383
/// Heartbeat timeout.
8484
/// </summary>
8585
public TimeSpan RequestedHeartbeat { get; set; } = TimeSpan.FromSeconds(60);
86+
87+
/// <summary>
88+
/// The number of retries for opening an initial connection.
89+
/// </summary>
90+
public int InitialConnectionRetries { get; set; } = 5;
91+
92+
/// <summary>
93+
/// Timeout for initial connection opening retries.
94+
/// </summary>
95+
public int InitialConnectionRetryTimeoutMilliseconds { get; set; } = 200;
8696
}
8797
}

src/RabbitMQ.Client.Core.DependencyInjection/Configuration/RabbitMqTcpEndpoint.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ public class RabbitMqTcpEndpoint
1313
/// <summary>
1414
/// Tcp connection port.
1515
/// </summary>
16-
public int Port { get; set; }
16+
public int Port { get; set; } = 5672;
1717
}
1818
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using System;
2+
3+
namespace RabbitMQ.Client.Core.DependencyInjection.Exceptions
4+
{
5+
/// <summary>
6+
/// An exception that is thrown when an initial connection could not be established even with retry mechanism.
7+
/// </summary>
8+
public class InitialConnectionException : Exception
9+
{
10+
/// <summary>
11+
/// The number of retries which has been attempted.
12+
/// </summary>
13+
public int NumberOfRetries { get; set; }
14+
15+
public InitialConnectionException(string message, Exception innerException) : base(message, innerException)
16+
{
17+
}
18+
}
19+
}

src/RabbitMQ.Client.Core.DependencyInjection/Services/RabbitMqConnectionFactory.cs

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
using System;
12
using System.Linq;
3+
using System.Threading;
24
using RabbitMQ.Client.Core.DependencyInjection.Configuration;
5+
using RabbitMQ.Client.Core.DependencyInjection.Exceptions;
36
using RabbitMQ.Client.Events;
7+
using RabbitMQ.Client.Exceptions;
48

59
namespace RabbitMQ.Client.Core.DependencyInjection.Services
610
{
@@ -38,15 +42,14 @@ public IConnection CreateRabbitMqConnection(RabbitMqClientOptions options)
3842
if (options.TcpEndpoints?.Any() == true)
3943
{
4044
var clientEndpoints = options.TcpEndpoints.Select(x => new AmqpTcpEndpoint(x.HostName, x.Port)).ToList();
41-
return factory.CreateConnection(clientEndpoints);
45+
return TryToCreateConnection(() => factory.CreateConnection(clientEndpoints), options.InitialConnectionRetries, options.InitialConnectionRetryTimeoutMilliseconds);
4246
}
4347

4448
return string.IsNullOrEmpty(options.ClientProvidedName)
4549
? CreateConnection(options, factory)
4650
: CreateNamedConnection(options, factory);
4751
}
4852

49-
5053
/// <summary>
5154
/// Create a consumer depending on the connection channel.
5255
/// </summary>
@@ -58,22 +61,65 @@ static IConnection CreateNamedConnection(RabbitMqClientOptions options, Connecti
5861
{
5962
if (options.HostNames?.Any() == true)
6063
{
61-
return factory.CreateConnection(options.HostNames.ToList(), options.ClientProvidedName);
64+
return TryToCreateConnection(() => factory.CreateConnection(options.HostNames.ToList(), options.ClientProvidedName), options.InitialConnectionRetries, options.InitialConnectionRetryTimeoutMilliseconds);
6265
}
6366

6467
factory.HostName = options.HostName;
65-
return factory.CreateConnection(options.ClientProvidedName);
68+
return TryToCreateConnection(() => factory.CreateConnection(options.ClientProvidedName), options.InitialConnectionRetries, options.InitialConnectionRetryTimeoutMilliseconds);
6669
}
6770

6871
static IConnection CreateConnection(RabbitMqClientOptions options, ConnectionFactory factory)
6972
{
7073
if (options.HostNames?.Any() == true)
7174
{
72-
return factory.CreateConnection(options.HostNames.ToList());
75+
return TryToCreateConnection(() => factory.CreateConnection(options.HostNames.ToList()), options.InitialConnectionRetries, options.InitialConnectionRetryTimeoutMilliseconds);
7376
}
7477

7578
factory.HostName = options.HostName;
76-
return factory.CreateConnection();
79+
return TryToCreateConnection(factory.CreateConnection, options.InitialConnectionRetries, options.InitialConnectionRetryTimeoutMilliseconds);
80+
}
81+
82+
static IConnection TryToCreateConnection(Func<IConnection> connectionFunction, int numberOfRetries, int timeoutMilliseconds)
83+
{
84+
ValidateArguments(numberOfRetries, timeoutMilliseconds);
85+
86+
var attempts = 0;
87+
BrokerUnreachableException latestException = null;
88+
while (attempts < numberOfRetries)
89+
{
90+
try
91+
{
92+
if (attempts > 0)
93+
{
94+
Thread.Sleep(timeoutMilliseconds);
95+
}
96+
97+
return connectionFunction();
98+
}
99+
catch (BrokerUnreachableException exception)
100+
{
101+
attempts++;
102+
latestException = exception;
103+
}
104+
}
105+
106+
throw new InitialConnectionException($"Could not establish an initial connection in {numberOfRetries} retries", latestException)
107+
{
108+
NumberOfRetries = attempts
109+
};
110+
}
111+
112+
static void ValidateArguments(int numberOfRetries, int timeoutMilliseconds)
113+
{
114+
if (numberOfRetries < 1)
115+
{
116+
throw new ArgumentException("Number of retries should be a positive number.", nameof(numberOfRetries));
117+
}
118+
119+
if (timeoutMilliseconds < 1)
120+
{
121+
throw new ArgumentException("Initial reconnection timeout should be a positive number.", nameof(timeoutMilliseconds));
122+
}
77123
}
78124
}
79125
}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
using System.Collections.Generic;
2+
using RabbitMQ.Client.Core.DependencyInjection.Configuration;
3+
using RabbitMQ.Client.Core.DependencyInjection.Exceptions;
4+
using RabbitMQ.Client.Core.DependencyInjection.Services;
5+
using Xunit;
6+
7+
namespace RabbitMQ.Client.Core.DependencyInjection.Tests.IntegrationTests
8+
{
9+
public class RabbitMqConnectionFactoryTests
10+
{
11+
[Theory]
12+
[InlineData(1)]
13+
[InlineData(5)]
14+
[InlineData(10)]
15+
public void ShouldProperlyRetryCreatingInitialConnection(int retries)
16+
{
17+
var connectionOptions = new RabbitMqClientOptions
18+
{
19+
HostName = "anotherHost",
20+
InitialConnectionRetries = retries,
21+
InitialConnectionRetryTimeoutMilliseconds = 20
22+
};
23+
ExecuteUnsuccessfulConnectionCreationAndAssertResults(connectionOptions);
24+
}
25+
26+
[Theory]
27+
[InlineData(1)]
28+
[InlineData(5)]
29+
[InlineData(10)]
30+
public void ShouldProperlyRetryCreatingInitialConnectionWithConnectionName(int retries)
31+
{
32+
var connectionOptions = new RabbitMqClientOptions
33+
{
34+
HostName = "anotherHost",
35+
ClientProvidedName = "connectionName",
36+
InitialConnectionRetries = retries,
37+
InitialConnectionRetryTimeoutMilliseconds = 20
38+
};
39+
ExecuteUnsuccessfulConnectionCreationAndAssertResults(connectionOptions);
40+
}
41+
42+
[Theory]
43+
[InlineData(1)]
44+
[InlineData(5)]
45+
[InlineData(10)]
46+
public void ShouldProperlyRetryCreatingInitialConnectionWithTcpEndpoints(int retries)
47+
{
48+
var connectionOptions = new RabbitMqClientOptions
49+
{
50+
TcpEndpoints = new List<RabbitMqTcpEndpoint>
51+
{
52+
new RabbitMqTcpEndpoint
53+
{
54+
HostName = "anotherHost"
55+
}
56+
},
57+
InitialConnectionRetries = retries,
58+
InitialConnectionRetryTimeoutMilliseconds = 20
59+
};
60+
ExecuteUnsuccessfulConnectionCreationAndAssertResults(connectionOptions);
61+
}
62+
63+
[Theory]
64+
[InlineData(1)]
65+
[InlineData(5)]
66+
[InlineData(10)]
67+
public void ShouldProperlyRetryCreatingInitialConnectionWithHostNames(int retries)
68+
{
69+
var connectionOptions = new RabbitMqClientOptions
70+
{
71+
HostNames = new List<string> { "anotherHost" },
72+
InitialConnectionRetries = retries,
73+
InitialConnectionRetryTimeoutMilliseconds = 20
74+
};
75+
ExecuteUnsuccessfulConnectionCreationAndAssertResults(connectionOptions);
76+
}
77+
78+
[Theory]
79+
[InlineData(1)]
80+
[InlineData(5)]
81+
[InlineData(10)]
82+
public void ShouldProperlyRetryCreatingInitialConnectionWithHostNamesAndNamedConnection(int retries)
83+
{
84+
var connectionOptions = new RabbitMqClientOptions
85+
{
86+
HostNames = new List<string> { "anotherHost" },
87+
ClientProvidedName = "connectionName",
88+
InitialConnectionRetries = retries,
89+
InitialConnectionRetryTimeoutMilliseconds = 20
90+
};
91+
ExecuteUnsuccessfulConnectionCreationAndAssertResults(connectionOptions);
92+
}
93+
94+
[Fact]
95+
public void ShouldProperlyCreateInitialConnection()
96+
{
97+
var connectionOptions = new RabbitMqClientOptions
98+
{
99+
HostName = "rabbitmq",
100+
InitialConnectionRetries = 1,
101+
InitialConnectionRetryTimeoutMilliseconds = 20
102+
};
103+
ExecuteSuccessfulConnectionCreationAndAssertResults(connectionOptions);
104+
}
105+
106+
[Fact]
107+
public void ShouldProperlyCreateInitialConnectionWithConnectionName()
108+
{
109+
var connectionOptions = new RabbitMqClientOptions
110+
{
111+
HostName = "rabbitmq",
112+
ClientProvidedName = "connectionName",
113+
InitialConnectionRetries = 3,
114+
InitialConnectionRetryTimeoutMilliseconds = 20
115+
};
116+
ExecuteSuccessfulConnectionCreationAndAssertResults(connectionOptions);
117+
}
118+
119+
[Fact]
120+
public void ShouldProperlyCreateInitialConnectionWithTcpEndpoints()
121+
{
122+
var connectionOptions = new RabbitMqClientOptions
123+
{
124+
TcpEndpoints = new List<RabbitMqTcpEndpoint>
125+
{
126+
new RabbitMqTcpEndpoint
127+
{
128+
HostName = "rabbitmq"
129+
}
130+
},
131+
InitialConnectionRetries = 3,
132+
InitialConnectionRetryTimeoutMilliseconds = 20
133+
};
134+
ExecuteSuccessfulConnectionCreationAndAssertResults(connectionOptions);
135+
}
136+
137+
[Fact]
138+
public void ShouldProperlyCreateInitialConnectionWithHostNames()
139+
{
140+
var connectionOptions = new RabbitMqClientOptions
141+
{
142+
HostNames = new List<string> { "rabbitmq" },
143+
InitialConnectionRetries = 3,
144+
InitialConnectionRetryTimeoutMilliseconds = 20
145+
};
146+
ExecuteSuccessfulConnectionCreationAndAssertResults(connectionOptions);
147+
}
148+
149+
[Fact]
150+
public void ShouldProperlyCreateInitialConnectionWithHostNamesAndNamedConnection()
151+
{
152+
var connectionOptions = new RabbitMqClientOptions
153+
{
154+
HostNames = new List<string> { "rabbitmq" },
155+
ClientProvidedName = "connectionName",
156+
InitialConnectionRetries = 3,
157+
InitialConnectionRetryTimeoutMilliseconds = 20
158+
};
159+
ExecuteSuccessfulConnectionCreationAndAssertResults(connectionOptions);
160+
}
161+
162+
static void ExecuteUnsuccessfulConnectionCreationAndAssertResults(RabbitMqClientOptions connectionOptions)
163+
{
164+
var connectionFactory = new RabbitMqConnectionFactory();
165+
var exception = Assert.Throws<InitialConnectionException>(() => connectionFactory.CreateRabbitMqConnection(connectionOptions));
166+
Assert.Equal(connectionOptions.InitialConnectionRetries, exception.NumberOfRetries);
167+
}
168+
169+
static void ExecuteSuccessfulConnectionCreationAndAssertResults(RabbitMqClientOptions connectionOptions)
170+
{
171+
var connectionFactory = new RabbitMqConnectionFactory();
172+
using var connection = connectionFactory.CreateRabbitMqConnection(connectionOptions);
173+
Assert.True(connection.IsOpen);
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)