Skip to content

Commit bddf349

Browse files
RotvigPliner
andauthored
Fix deadlock when disposing internal consumers (#1021)
* Fix deadlock when disposing internal consumers * fixing some tests * Fixing tests * improved tests * review fixes * Revert BasicConsumer cleanup * review changes * review changes * Review Changes * Rewrite code with SemaphoreSlim and CancellationToken * One more attempt to rewrite ConsumerDispatcher * Fix review comments Co-authored-by: Yury Pliner <[email protected]>
1 parent fdc0737 commit bddf349

File tree

5 files changed

+140
-69
lines changed

5 files changed

+140
-69
lines changed

Source/EasyNetQ.Tests/ConsumerDispatcherFactoryTests.cs

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,12 @@ public void Should_only_create_a_single_IConsumerDispatcher_instance()
3434
dispatcher1.Should().BeSameAs(dispatcher2);
3535
}
3636

37-
[Fact]
38-
public void Should_dispose_dispatcher_when_factory_is_disposed()
39-
{
40-
var dispatcher = dispatcherFactory.GetConsumerDispatcher();
41-
dispatcherFactory.Dispose();
42-
((ConsumerDispatcher)dispatcher).IsDisposed.Should().BeTrue();
43-
}
44-
4537
[Fact]
4638
public void Should_run_actions_on_the_consumer_thread()
4739
{
4840
var dispatcher = dispatcherFactory.GetConsumerDispatcher();
4941
var autoResetEvent = new AutoResetEvent(false);
50-
var threadName = Thread.CurrentThread.Name;
42+
var threadName = "";
5143

5244
dispatcher.QueueAction(() =>
5345
{
@@ -61,7 +53,7 @@ public void Should_run_actions_on_the_consumer_thread()
6153
}
6254

6355
[Fact]
64-
public void Should_clear_queue_on_disconnect()
56+
public void Should_clear_transient_queue_on_disconnect()
6557
{
6658
var dispatcher = dispatcherFactory.GetConsumerDispatcher();
6759
var autoResetEvent1 = new AutoResetEvent(false);
@@ -71,7 +63,7 @@ public void Should_clear_queue_on_disconnect()
7163
// queue first action, we're going to block on this one
7264
dispatcher.QueueAction(() => autoResetEvent1.WaitOne(100));
7365

74-
// queue second action, this should be cleared when
66+
// queue second action, this should be cleared when
7567
// the dispatcher factory's OnDisconnected method is called
7668
// and never run.
7769
dispatcher.QueueAction(() =>
@@ -92,7 +84,41 @@ public void Should_clear_queue_on_disconnect()
9284
// check that the second action was never run
9385
actionExecuted.Should().BeFalse();
9486
}
87+
88+
[Fact]
89+
public void Should_not_clear_durable_queue_on_disconnect()
90+
{
91+
var dispatcher = dispatcherFactory.GetConsumerDispatcher();
92+
var blockingEvent = new AutoResetEvent(false);
93+
var blockingEvent2 = new AutoResetEvent(false);
94+
95+
var actionExecuted = false;
96+
97+
// queue first action, we're going to block on this one
98+
dispatcher.QueueAction(() => blockingEvent.WaitOne(100), true);
99+
100+
// queue second action, this should not be cleared when
101+
// the dispatcher factory's OnDisconnected method is called
102+
// and it should run.
103+
dispatcher.QueueAction(() =>
104+
{
105+
actionExecuted = true;
106+
}, true);
107+
108+
// disconnect
109+
dispatcherFactory.OnDisconnected();
110+
111+
// release the block on the first event
112+
blockingEvent.Set();
113+
114+
// now queue up a new action and wait for it to complete
115+
dispatcher.QueueAction(() => blockingEvent2.Set(), true);
116+
blockingEvent2.WaitOne(100);
117+
118+
// check that the second action was never run
119+
actionExecuted.Should().BeTrue();
120+
}
95121
}
96122
}
97123

98-
// ReSharper restore InconsistentNaming
124+
// ReSharper restore InconsistentNaming

Source/EasyNetQ.Tests/ModelCleanupTests.cs

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,28 @@ namespace EasyNetQ.Tests
99
{
1010
public class ModelCleanupTests
1111
{
12+
public ModelCleanupTests()
13+
{
14+
mockBuilder = new MockBuilder();
15+
bus = mockBuilder.Bus;
16+
waitTime = TimeSpan.FromSeconds(10);
17+
}
18+
1219
private readonly IBus bus;
1320
private readonly MockBuilder mockBuilder;
1421
private readonly TimeSpan waitTime;
1522

16-
public ModelCleanupTests()
23+
private AutoResetEvent WaitForConsumerModelDisposedMessage()
1724
{
18-
mockBuilder = new MockBuilder();
19-
bus = mockBuilder.Bus;
20-
waitTime = TimeSpan.FromMinutes(2);
25+
var are = new AutoResetEvent(false);
26+
27+
mockBuilder.EventBus.Subscribe<ConsumerModelDisposedEvent>(x => are.Set());
28+
29+
return are;
2130
}
2231

2332
[Fact]
24-
public void Should_not_cleanup_publish_model()
33+
public void Should_cleanup_publish_model()
2534
{
2635
bus.Publish(new TestMessage());
2736
bus.Dispose();
@@ -30,68 +39,71 @@ public void Should_not_cleanup_publish_model()
3039
}
3140

3241
[Fact]
33-
public void Should_cleanup_subscribe_model()
42+
public void Should_cleanup_request_response_model()
3443
{
35-
bus.Subscribe<TestMessage>("abc", mgs => {});
44+
var waiter = new CountdownEvent(2);
45+
46+
mockBuilder.EventBus.Subscribe<PublishedMessageEvent>(_ => waiter.Signal());
47+
mockBuilder.EventBus.Subscribe<StartConsumingSucceededEvent>(_ => waiter.Signal());
48+
49+
bus.RequestAsync<TestRequestMessage, TestResponseMessage>(new TestRequestMessage());
50+
if (!waiter.Wait(5000))
51+
throw new TimeoutException();
52+
3653
var are = WaitForConsumerModelDisposedMessage();
3754

3855
bus.Dispose();
3956

40-
bool signalReceived = are.WaitOne(waitTime);
57+
var signalReceived = are.WaitOne(waitTime);
4158
Assert.True(signalReceived, $"Set event was not received within {waitTime.TotalSeconds} seconds");
4259

60+
mockBuilder.Channels[0].DidNotReceive().Dispose();
4361
mockBuilder.Channels[1].Received().Dispose();
4462
}
4563

4664
[Fact]
47-
public void Should_cleanup_subscribe_async_model()
65+
public void Should_cleanup_respond_model()
4866
{
49-
bus.SubscribeAsync<TestMessage>("abc", msg => null);
67+
bus.Respond<TestRequestMessage, TestResponseMessage>(x => (TestResponseMessage) null);
5068
var are = WaitForConsumerModelDisposedMessage();
5169

5270
bus.Dispose();
5371

54-
bool signalReceived = are.WaitOne(waitTime);
72+
var signalReceived = are.WaitOne(waitTime);
5573
Assert.True(signalReceived, $"Set event was not received within {waitTime.TotalSeconds} seconds");
5674

75+
mockBuilder.Channels[0].DidNotReceive().Dispose();
5776
mockBuilder.Channels[1].Received().Dispose();
5877
}
5978

6079
[Fact]
61-
public void Should_cleanup_request_response_model()
80+
public void Should_cleanup_subscribe_async_model()
6281
{
63-
bus.RequestAsync<TestRequestMessage, TestResponseMessage>(new TestRequestMessage());
82+
bus.Subscribe<TestMessage>("abc", msg => { });
6483
var are = WaitForConsumerModelDisposedMessage();
6584

6685
bus.Dispose();
6786

68-
bool signalReceived = are.WaitOne(waitTime);
87+
var signalReceived = are.WaitOne(waitTime);
6988
Assert.True(signalReceived, $"Set event was not received within {waitTime.TotalSeconds} seconds");
7089

90+
mockBuilder.Channels[0].DidNotReceive().Dispose();
7191
mockBuilder.Channels[1].Received().Dispose();
7292
}
7393

7494
[Fact]
75-
public void Should_cleanup_respond_model()
95+
public void Should_cleanup_subscribe_model()
7696
{
77-
bus.Respond<TestRequestMessage, TestResponseMessage>(x => null);
97+
bus.Subscribe<TestMessage>("abc", mgs => { });
7898
var are = WaitForConsumerModelDisposedMessage();
7999

80100
bus.Dispose();
81101

82-
bool signalReceived = are.WaitOne(waitTime);
102+
var signalReceived = are.WaitOne(waitTime);
83103
Assert.True(signalReceived, $"Set event was not received within {waitTime.TotalSeconds} seconds");
84104

105+
mockBuilder.Channels[0].DidNotReceive().Dispose();
85106
mockBuilder.Channels[1].Received().Dispose();
86107
}
87-
88-
private AutoResetEvent WaitForConsumerModelDisposedMessage()
89-
{
90-
var are = new AutoResetEvent(false);
91-
92-
mockBuilder.EventBus.Subscribe<ConsumerModelDisposedEvent>(x => are.Set());
93-
94-
return are;
95-
}
96108
}
97-
}
109+
}

Source/EasyNetQ/Consumer/ConsumerDispatcher.cs

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,36 @@ namespace EasyNetQ.Consumer
88
public class ConsumerDispatcher : IConsumerDispatcher
99
{
1010
private readonly ILog logger = LogProvider.For<ConsumerDispatcher>();
11-
private readonly BlockingCollection<Action> queue;
12-
private bool disposed;
11+
private readonly CancellationTokenSource cancellation = new CancellationTokenSource();
12+
private readonly BlockingCollection<Action> durableActions = new BlockingCollection<Action>();
13+
private readonly BlockingCollection<Action> transientActions = new BlockingCollection<Action>();
1314

1415
public ConsumerDispatcher(ConnectionConfiguration configuration)
1516
{
1617
Preconditions.CheckNotNull(configuration, "configuration");
1718

18-
queue = new BlockingCollection<Action>();
1919

2020
var thread = new Thread(_ =>
2121
{
22-
while (!disposed && queue.TryTake(out var action, -1))
22+
var blockingCollections = new[] {durableActions, transientActions};
23+
while (!cancellation.IsCancellationRequested)
24+
try
25+
{
26+
BlockingCollection<Action>.TakeFromAny(
27+
blockingCollections, out var action, cancellation.Token
28+
);
29+
action();
30+
}
31+
catch (OperationCanceledException)
32+
{
33+
break;
34+
}
35+
catch (Exception exception)
36+
{
37+
logger.ErrorException(string.Empty, exception);
38+
}
39+
40+
while (BlockingCollection<Action>.TryTakeFromAny(blockingCollections, out var action) >= 0)
2341
{
2442
try
2543
{
@@ -35,26 +53,37 @@ public ConsumerDispatcher(ConnectionConfiguration configuration)
3553
}
3654

3755
public void QueueAction(Action action)
56+
{
57+
QueueAction(action, false);
58+
}
59+
60+
public void QueueAction(Action action, bool surviveDisconnect)
3861
{
3962
Preconditions.CheckNotNull(action, "action");
40-
queue.Add(action);
63+
64+
if (cancellation.IsCancellationRequested)
65+
throw new InvalidOperationException("Consumer dispatcher is stopping or already stopped");
66+
67+
if (surviveDisconnect)
68+
durableActions.Add(action);
69+
else
70+
transientActions.Add(action);
4171
}
4272

4373
public void OnDisconnected()
4474
{
4575
// throw away any queued actions. RabbitMQ will redeliver any in-flight
4676
// messages that have not been acked when the connection is lost.
47-
while (queue.TryTake(out _))
77+
while (transientActions.TryTake(out _))
4878
{
4979
}
5080
}
5181

5282
public void Dispose()
5383
{
54-
queue.CompleteAdding();
55-
disposed = true;
84+
durableActions.CompleteAdding();
85+
transientActions.CompleteAdding();
86+
cancellation.Cancel();
5687
}
57-
58-
public bool IsDisposed => disposed;
5988
}
60-
}
89+
}

Source/EasyNetQ/Consumer/IConsumerDispatcher.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ namespace EasyNetQ.Consumer
55
public interface IConsumerDispatcher : IDisposable
66
{
77
void QueueAction(Action action);
8+
void QueueAction(Action action, bool surviveDisconnect);
89
void OnDisconnected();
910
}
10-
}
11+
}

Source/EasyNetQ/Consumer/InternalConsumer.cs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,8 @@ public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redel
107107
/// </summary>
108108
private void Cancel()
109109
{
110-
// copy to temp variable to be thread safe.
111-
var localCancelled = cancelled;
112-
localCancelled?.Invoke(this);
113-
114-
var consumerCancelled = ConsumerCancelled;
115-
consumerCancelled?.Invoke(this, new ConsumerEventArgs(new [] {ConsumerTag}));
110+
cancelled?.Invoke(this);
111+
ConsumerCancelled?.Invoke(this, new ConsumerEventArgs(new [] {ConsumerTag}));
116112
}
117113

118114
public void HandleBasicCancelOk(string consumerTag)
@@ -131,6 +127,7 @@ public void HandleBasicCancel(string consumerTag)
131127

132128
public void HandleModelShutdown(object model, ShutdownEventArgs reason)
133129
{
130+
Cancel();
134131
logger.InfoFormat(
135132
"Consumer with consumerTag {consumerTag} on queue {queue} has shutdown with reason {reason}",
136133
ConsumerTag,
@@ -332,22 +329,28 @@ private void SingleBasicConsumerCancelled(BasicConsumer consumer)
332329
public void Dispose()
333330
{
334331
if (disposed) return;
332+
335333
disposed = true;
336334

337335
var model = Model;
338-
if (model != null)
336+
if (model == null) return;
337+
// Queued because we may be on the RabbitMQ.Client dispatch thread.
338+
var disposedEvent = new AutoResetEvent(false);
339+
consumerDispatcher.QueueAction(() =>
339340
{
340-
// Queued because we may be on the RabbitMQ.Client dispatch thread.
341-
var disposedEvent = new AutoResetEvent(false);
342-
consumerDispatcher.QueueAction(() =>
343-
{
344-
foreach (var c in basicConsumers)
345-
c.Dispose();
346-
model.Dispose();
347-
disposedEvent.Set();
348-
});
349-
disposedEvent.WaitOne();
350-
}
341+
try
342+
{
343+
foreach (var c in basicConsumers)
344+
c.Dispose();
345+
model.Dispose();
346+
}
347+
finally
348+
{
349+
disposedEvent.Set();
350+
}
351+
}, surviveDisconnect: true);
352+
353+
disposedEvent.WaitOne();
351354
}
352355
}
353356
}

0 commit comments

Comments
 (0)