Skip to content

Commit 024c7c5

Browse files
committed
Updated ConsumerWorkService no longer suffers from high lock conention on modern CPUs. This is resolved by applying a new work pool approach in the CWS itself and creation of a custom Thread per model.
1 parent 8fb6a42 commit 024c7c5

File tree

5 files changed

+89
-504
lines changed

5 files changed

+89
-504
lines changed

projects/client/RabbitMQ.Client/src/client/api/IConnectionFactory.cs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,6 @@ public interface IConnectionFactory
128128
/// <returns></returns>
129129
IConnection CreateConnection(IList<string> hostnames, String clientProvidedName);
130130

131-
/// <summary>
132-
/// Advanced option.
133-
///
134-
/// What task scheduler should consumer dispatcher use.
135-
/// </summary>
136-
TaskScheduler TaskScheduler { get; set; }
137-
138131
/// <summary>
139132
/// Amount of time protocol handshake operations are allowed to take before
140133
/// timing out.

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,12 @@ public class Connection : IConnection
113113

114114
public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHandler, string clientProvidedName = null)
115115
{
116-
this.ClientProvidedName = clientProvidedName;
116+
ClientProvidedName = clientProvidedName;
117117
KnownHosts = null;
118118
FrameMax = 0;
119119
m_factory = factory;
120120
m_frameHandler = frameHandler;
121-
this.ConsumerWorkService = new ConsumerWorkService(factory.TaskScheduler);
121+
ConsumerWorkService = new ConsumerWorkService();
122122

123123
m_sessionManager = new SessionManager(this, 0);
124124
m_session0 = new MainSession(this) { Handler = NotifyReceivedCloseOk };
Lines changed: 87 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,117 @@
1-
using RabbitMQ.Util;
2-
using System;
3-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Concurrent;
43
using System.Threading;
5-
using System.Threading.Tasks;
64

75
namespace RabbitMQ.Client
86
{
97
public class ConsumerWorkService
108
{
11-
public const int MAX_THUNK_EXECUTION_BATCH_SIZE = 16;
12-
private TaskScheduler scheduler;
13-
private BatchingWorkPool<IModel, Action> workPool;
9+
readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();
1410

15-
public ConsumerWorkService() :
16-
this(TaskScheduler.Default) { }
17-
18-
public ConsumerWorkService(TaskScheduler scheduler)
11+
public void AddWork(IModel model, Action fn)
1912
{
20-
this.scheduler = scheduler;
21-
this.workPool = new BatchingWorkPool<IModel, Action>();
13+
WorkPool workPool;
14+
if (workPools.TryGetValue(model, out workPool))
15+
{
16+
workPool.Enqueue(fn);
17+
}
2218
}
2319

24-
public void ExecuteThunk()
20+
public void RegisterKey(IModel model)
2521
{
26-
var actions = new List<Action>(MAX_THUNK_EXECUTION_BATCH_SIZE);
27-
28-
try
22+
// the main model can be skipped, as it will not use CWS anyway
23+
if (model.ChannelNumber == 0)
2924
{
30-
IModel key = this.workPool.NextWorkBlock(ref actions, MAX_THUNK_EXECUTION_BATCH_SIZE);
31-
if (key == null) { return; }
32-
33-
try
34-
{
35-
foreach (var fn in actions)
36-
{
37-
fn();
38-
}
39-
}
40-
finally
41-
{
42-
if (this.workPool.FinishWorkBlock(key))
43-
{
44-
var t = new Task(new Action(ExecuteThunk));
45-
t.Start(this.scheduler);
46-
}
47-
}
25+
return;
4826
}
49-
catch (Exception)
27+
28+
var workPool = new WorkPool(model);
29+
if (workPools.TryAdd(model, workPool))
5030
{
51-
#if NETFX_CORE
52-
// To end a task, return
53-
return;
54-
#else
55-
//Thread.CurrentThread.Interrupt(); //TODO: what to do?
56-
#endif
31+
workPool.Start();
5732
}
5833
}
5934

60-
public void AddWork(IModel model, Action fn)
35+
public void StopWork(IModel model)
6136
{
62-
if (this.workPool.AddWorkItem(model, fn))
37+
if (model.ChannelNumber == 0)
6338
{
64-
var t = new Task(new Action(ExecuteThunk));
65-
t.Start(this.scheduler);
39+
return;
6640
}
67-
}
6841

69-
public void RegisterKey(IModel model)
70-
{
71-
this.workPool.RegisterKey(model);
42+
WorkPool workPool;
43+
if (workPools.TryRemove(model, out workPool))
44+
{
45+
workPool.Stop();
46+
}
7247
}
7348

74-
public void StopWork(IModel model)
49+
public void StopWork()
7550
{
76-
this.workPool.UnregisterKey(model);
51+
foreach (var model in workPools.Keys)
52+
{
53+
StopWork(model);
54+
}
7755
}
7856

79-
public void StopWork()
57+
58+
class WorkPool
8059
{
81-
this.workPool.UnregisterAllKeys();
60+
readonly ConcurrentQueue<Action> actions;
61+
readonly AutoResetEvent messageArrived;
62+
readonly TimeSpan waitTime;
63+
readonly CancellationTokenSource tokenSource;
64+
readonly string name;
65+
66+
public WorkPool(IModel model)
67+
{
68+
name = model.ToString();
69+
actions = new ConcurrentQueue<Action>();
70+
messageArrived = new AutoResetEvent(false);
71+
waitTime = TimeSpan.FromMilliseconds(100);
72+
tokenSource = new CancellationTokenSource();
73+
}
74+
75+
public void Start()
76+
{
77+
var thread = new Thread(Loop)
78+
{
79+
Name = "WorkPool-" + name,
80+
IsBackground = true
81+
};
82+
thread.Start();
83+
}
84+
85+
public void Enqueue(Action action)
86+
{
87+
actions.Enqueue(action);
88+
messageArrived.Set();
89+
}
90+
91+
void Loop()
92+
{
93+
while (tokenSource.IsCancellationRequested == false)
94+
{
95+
Action action;
96+
while (actions.TryDequeue(out action))
97+
{
98+
try
99+
{
100+
action();
101+
}
102+
catch (Exception)
103+
{
104+
}
105+
}
106+
107+
messageArrived.WaitOne(waitTime);
108+
}
109+
}
110+
111+
public void Stop()
112+
{
113+
tokenSource.Cancel();
114+
}
82115
}
83116
}
84117
}

0 commit comments

Comments
 (0)