Skip to content

Commit 3787d61

Browse files
authored
Merge pull request #1624 from rabbitmq/rabbitmq-dotnet-client-1623
Fix issue when recovery takes longer than recovery interval
2 parents 350a14f + 7cb2c1d commit 3787d61

File tree

2 files changed

+65
-3
lines changed

2 files changed

+65
-3
lines changed

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -240,15 +240,27 @@ await _innerConnection.AbortAsync(Constants.InternalError, "FailedAutoRecovery",
240240

241241
private async ValueTask<bool> TryRecoverConnectionDelegateAsync(CancellationToken cancellationToken)
242242
{
243+
Connection? maybeNewInnerConnection = null;
243244
try
244245
{
245246
Connection defunctConnection = _innerConnection;
247+
246248
IFrameHandler fh = await _endpoints.SelectOneAsync(_config.FrameHandlerFactoryAsync, cancellationToken)
247249
.ConfigureAwait(false);
248-
_innerConnection = new Connection(_config, fh);
249-
await _innerConnection.OpenAsync(cancellationToken)
250+
251+
maybeNewInnerConnection = new Connection(_config, fh);
252+
253+
await maybeNewInnerConnection.OpenAsync(cancellationToken)
250254
.ConfigureAwait(false);
251-
_innerConnection.TakeOver(defunctConnection);
255+
maybeNewInnerConnection.TakeOver(defunctConnection);
256+
257+
/*
258+
* Note: do this last in case something above throws an exception during re-connection
259+
* We don't want to lose te old defunct connection in this case, since we have to take
260+
* over its data / event handlers / etc when the re-connect eventually succeeds.
261+
* https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1623
262+
*/
263+
_innerConnection = maybeNewInnerConnection;
252264
return true;
253265
}
254266
catch (Exception e)
@@ -260,6 +272,8 @@ await _innerConnection.OpenAsync(cancellationToken)
260272
// Note: recordedEntities semaphore is _NOT_ held at this point
261273
_connectionRecoveryErrorWrapper.Invoke(this, new ConnectionRecoveryErrorEventArgs(e));
262274
}
275+
276+
maybeNewInnerConnection?.Dispose();
263277
}
264278

265279
return false;

projects/Test/SequentialIntegration/TestConnectionRecovery.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,58 @@ public async Task TestShutdownEventHandlersRecoveryOnConnectionAfterDelayedServe
266266

267267
await WaitAsync(shutdownLatch, WaitSpan, "connection shutdown");
268268
await WaitAsync(recoveryLatch, WaitSpan, "connection recovery");
269+
269270
Assert.True(_conn.IsOpen);
270271
Assert.True(counter >= 1);
271272
}
272273

274+
[Fact]
275+
public async Task TestShutdownEventHandlersRecoveryOnConnectionAfterTwoDelayedServerRestarts_GH1623()
276+
{
277+
const int restartCount = 2;
278+
int counter = 0;
279+
TimeSpan delaySpan = TimeSpan.FromSeconds(_connFactory.NetworkRecoveryInterval.TotalSeconds * 2);
280+
281+
AutorecoveringConnection aconn = (AutorecoveringConnection)_conn;
282+
283+
aconn.ConnectionRecoveryError += (c, args) =>
284+
{
285+
// Uncomment for debugging
286+
// _output.WriteLine("[INFO] ConnectionRecoveryError: {0}", args.Exception);
287+
};
288+
289+
aconn.ConnectionShutdown += (c, args) => Interlocked.Increment(ref counter);
290+
291+
Assert.True(_conn.IsOpen);
292+
293+
TaskCompletionSource<bool> recoveryLatch = null;
294+
295+
for (int i = 0; i < restartCount; i++)
296+
{
297+
if (i == (restartCount - 1))
298+
{
299+
recoveryLatch = PrepareForRecovery(aconn);
300+
}
301+
302+
try
303+
{
304+
await StopRabbitMqAsync();
305+
await Task.Delay(delaySpan);
306+
}
307+
finally
308+
{
309+
await StartRabbitMqAsync();
310+
// Ensure recovery has a chance to connect!
311+
await Task.Delay(delaySpan);
312+
}
313+
}
314+
315+
await WaitAsync(recoveryLatch, WaitSpan, "connection recovery");
316+
317+
Assert.True(aconn.IsOpen);
318+
Assert.Equal(restartCount, counter);
319+
}
320+
273321
[Fact]
274322
public async Task TestUnblockedListenersRecovery()
275323
{

0 commit comments

Comments
 (0)