Skip to content

Commit 2c9cbea

Browse files
authored
Suppress context of threads of ClientCommandDispatcher and ConsumerDispatcher (#1046)
* ConsumerDispatcher refactor - add logs - addUseWaitJoin option - suppress flow of execution context * remove UseWaitJoin * remove try/catch * tune ClientCommandDispatcherSingleton * Info -> Debug * add logs * fix test * indentation
1 parent 8bc743b commit 2c9cbea

File tree

5 files changed

+60
-40
lines changed

5 files changed

+60
-40
lines changed

Source/EasyNetQ.Tests/ClientCommandDispatcherTests/When_an_action_is_invoked.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ public void Should_invoke_the_action()
6464
[Fact]
6565
public void Should_invoke_the_action_on_the_dispatcher_thread()
6666
{
67-
actionThreadName.Should().Be("Client Command Dispatcher Thread");
67+
actionThreadName.Should().Be("EasyNetQ client command dispatch thread");
6868
}
6969
}
7070
}
7171

72-
// ReSharper restore InconsistentNaming
72+
// ReSharper restore InconsistentNaming

Source/EasyNetQ/Consumer/ConsumerDispatcher.cs

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,46 @@ public ConsumerDispatcher(ConnectionConfiguration configuration)
1616
{
1717
Preconditions.CheckNotNull(configuration, "configuration");
1818

19-
var thread = new Thread(_ =>
19+
using (ExecutionContext.SuppressFlow())
2020
{
21-
var blockingCollections = new[] {durableActions, transientActions};
22-
while (!cancellation.IsCancellationRequested)
23-
try
24-
{
25-
BlockingCollection<Action>.TakeFromAny(
26-
blockingCollections, out var action, cancellation.Token
27-
);
28-
action();
29-
}
30-
catch (OperationCanceledException) when (cancellation.IsCancellationRequested)
31-
{
32-
break;
33-
}
34-
catch (Exception exception)
35-
{
36-
logger.ErrorException(string.Empty, exception);
37-
}
38-
39-
while (BlockingCollection<Action>.TryTakeFromAny(blockingCollections, out var action) >= 0)
21+
var thread = new Thread(_ =>
4022
{
41-
try
42-
{
43-
action();
44-
}
45-
catch (Exception exception)
23+
var blockingCollections = new[] { durableActions, transientActions };
24+
while (!cancellation.IsCancellationRequested)
25+
try
26+
{
27+
BlockingCollection<Action>.TakeFromAny(
28+
blockingCollections, out var action, cancellation.Token
29+
);
30+
action();
31+
}
32+
catch (OperationCanceledException) when (cancellation.IsCancellationRequested)
33+
{
34+
break;
35+
}
36+
catch (Exception exception)
37+
{
38+
logger.ErrorException(string.Empty, exception);
39+
}
40+
41+
while (BlockingCollection<Action>.TryTakeFromAny(blockingCollections, out var action) >= 0)
4642
{
47-
logger.ErrorException(string.Empty, exception);
43+
try
44+
{
45+
action();
46+
}
47+
catch (Exception exception)
48+
{
49+
logger.ErrorException(string.Empty, exception);
50+
}
4851
}
49-
}
50-
}) {Name = "EasyNetQ consumer dispatch thread", IsBackground = configuration.UseBackgroundThreads};
51-
thread.Start();
52+
logger.Debug("EasyNetQ consumer dispatch thread finished");
53+
})
54+
{ Name = "EasyNetQ consumer dispatch thread", IsBackground = configuration.UseBackgroundThreads };
55+
56+
thread.Start();
57+
logger.Debug("EasyNetQ consumer dispatch thread started");
58+
}
5259
}
5360

5461
public void QueueAction(Action action, bool surviveDisconnect = false)
@@ -66,11 +73,17 @@ public void QueueAction(Action action, bool surviveDisconnect = false)
6673

6774
public void OnDisconnected()
6875
{
76+
int count = 0;
77+
6978
// throw away any queued actions. RabbitMQ will redeliver any in-flight
7079
// messages that have not been acked when the connection is lost.
7180
while (transientActions.TryTake(out _))
7281
{
82+
++count;
7383
}
84+
85+
if (count > 0)
86+
logger.Debug("{count} queued transient actions were thrown", count);
7487
}
7588

7689
public void Dispose()

Source/EasyNetQ/Producer/ClientCommandDispatcher.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public Task InvokeAsync(Action<IModel> channelAction)
4848

4949
public void Dispose()
5050
{
51-
if(dispatcher.IsValueCreated) dispatcher.Value.Dispose();
51+
if (dispatcher.IsValueCreated) dispatcher.Value.Dispose();
5252
}
5353
}
54-
}
54+
}

Source/EasyNetQ/Producer/ClientCommandDispatcherSingleton.cs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
using System;
1+
using EasyNetQ.Logging;
2+
using RabbitMQ.Client;
3+
using System;
24
using System.Collections.Concurrent;
35
using System.Threading;
46
using System.Threading.Tasks;
5-
using EasyNetQ.Internals;
6-
using RabbitMQ.Client;
77

88
namespace EasyNetQ.Producer
99
{
1010
public class ClientCommandDispatcherSingleton : IClientCommandDispatcher
1111
{
12+
private readonly ILog logger = LogProvider.For<ClientCommandDispatcherSingleton>();
1213
private readonly CancellationTokenSource cancellation = new CancellationTokenSource();
1314
private readonly IPersistentChannel persistentChannel;
1415
private readonly BlockingCollection<Action> queue;
@@ -25,7 +26,8 @@ public ClientCommandDispatcherSingleton(
2526
queue = new BlockingCollection<Action>(configuration.DispatcherQueueSize);
2627
persistentChannel = persistentChannelFactory.CreatePersistentChannel(connection);
2728

28-
StartDispatcherThread(configuration);
29+
using (ExecutionContext.SuppressFlow())
30+
StartDispatcherThread(configuration);
2931
}
3032

3133
public T Invoke<T>(Func<IModel, T> channelAction)
@@ -86,6 +88,7 @@ public Task InvokeAsync(Action<IModel> channelAction)
8688

8789
public void Dispose()
8890
{
91+
queue.CompleteAdding();
8992
cancellation.Cancel();
9093
persistentChannel.Dispose();
9194
}
@@ -101,13 +104,16 @@ private void StartDispatcherThread(ConnectionConfiguration configuration)
101104
var channelAction = queue.Take(cancellation.Token);
102105
channelAction();
103106
}
104-
catch (OperationCanceledException)
107+
catch (OperationCanceledException) when (cancellation.IsCancellationRequested)
105108
{
106109
break;
107110
}
108111
}
109-
}) {Name = "Client Command Dispatcher Thread", IsBackground = configuration.UseBackgroundThreads};
112+
logger.Debug("EasyNetQ client command dispatch thread finished");
113+
})
114+
{ Name = "EasyNetQ client command dispatch thread", IsBackground = configuration.UseBackgroundThreads };
110115
thread.Start();
116+
logger.Debug("EasyNetQ client command dispatch thread started");
111117
}
112118

113119
private struct NoContentStruct

hall_of_fame.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,5 @@ No particular order. Don't forget to add your name with your pull request.
6666
* Alina Popa
6767
* Connie Yau
6868
* Marcus Hellsten
69-
* Thomas Mutton
69+
* Thomas Mutton
70+
* Ivan Maximov

0 commit comments

Comments
 (0)