Skip to content

Commit 19ca98c

Browse files
authored
Merge pull request #253 from Scooletz/no-lock-contention-at-consumer-work-service
No lock contention at consumer work service
2 parents 1164b1e + bbc83f4 commit 19ca98c

File tree

7 files changed

+87
-499
lines changed

7 files changed

+87
-499
lines changed

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ public TimeSpan ContinuationTimeout
238238
/// Task scheduler connections created by this factory will use when
239239
/// dispatching consumer operations, such as message deliveries.
240240
/// </summary>
241+
[Obsolete("This scheduler is no longer used for dispatching consumer operations and will be removed in the next major version.", false)]
241242
public TaskScheduler TaskScheduler { get; set; } = TaskScheduler.Default;
242243

243244
/// <summary>

projects/client/RabbitMQ.Client/src/client/api/IConnectionFactory.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public interface IConnectionFactory
133133
///
134134
/// What task scheduler should consumer dispatcher use.
135135
/// </summary>
136+
[Obsolete("This scheduler is no longer used for dispatching consumer operations and will be removed in the next major version.", false)]
136137
TaskScheduler TaskScheduler { get; set; }
137138

138139
/// <summary>

projects/client/RabbitMQ.Client/src/client/impl/ConcurrentConsumerDispatcher.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ public ConcurrentConsumerDispatcher(ModelBase model, ConsumerWorkService ws)
1313
{
1414
this.model = model;
1515
this.workService = ws;
16-
this.workService.RegisterKey(model);
1716
this.IsShutdown = false;
1817
}
1918

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,12 @@ public class Connection : IConnection
113113

114114
public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHandler, string clientProvidedName = null)
115115
{
116-
this.ClientProvidedName = clientProvidedName;
116+
ClientProvidedName = clientProvidedName;
117117
KnownHosts = null;
118118
FrameMax = 0;
119119
m_factory = factory;
120120
m_frameHandler = frameHandler;
121-
this.ConsumerWorkService = new ConsumerWorkService(factory.TaskScheduler);
121+
ConsumerWorkService = new ConsumerWorkService();
122122

123123
m_sessionManager = new SessionManager(this, 0);
124124
m_session0 = new MainSession(this) { Handler = NotifyReceivedCloseOk };
Lines changed: 83 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,112 @@
1-
using RabbitMQ.Util;
2-
using System;
3-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Concurrent;
43
using System.Threading;
5-
using System.Threading.Tasks;
64

75
namespace RabbitMQ.Client
86
{
97
public class ConsumerWorkService
108
{
11-
public const int MAX_THUNK_EXECUTION_BATCH_SIZE = 16;
12-
private TaskScheduler scheduler;
13-
private BatchingWorkPool<IModel, Action> workPool;
9+
readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();
1410

15-
public ConsumerWorkService() :
16-
this(TaskScheduler.Default) { }
17-
18-
public ConsumerWorkService(TaskScheduler scheduler)
19-
{
20-
this.scheduler = scheduler;
21-
this.workPool = new BatchingWorkPool<IModel, Action>();
22-
}
23-
24-
public void ExecuteThunk()
11+
public void AddWork(IModel model, Action fn)
2512
{
26-
var actions = new List<Action>(MAX_THUNK_EXECUTION_BATCH_SIZE);
13+
// two step approach is taken, as TryGetValue does not aquire locks
14+
// if this fails, GetOrAdd is called, which takes a lock
2715

28-
try
16+
WorkPool workPool;
17+
if (workPools.TryGetValue(model, out workPool) == false)
2918
{
30-
IModel key = this.workPool.NextWorkBlock(ref actions, MAX_THUNK_EXECUTION_BATCH_SIZE);
31-
if (key == null) { return; }
19+
var newWorkPool = new WorkPool(model);
20+
workPool = workPools.GetOrAdd(model, newWorkPool);
3221

33-
try
34-
{
35-
foreach (var fn in actions)
36-
{
37-
fn();
38-
}
39-
}
40-
finally
22+
// start if it's only the workpool that has been just created
23+
if (newWorkPool == workPool)
4124
{
42-
if (this.workPool.FinishWorkBlock(key))
43-
{
44-
var t = new Task(new Action(ExecuteThunk));
45-
t.Start(this.scheduler);
46-
}
25+
newWorkPool.Start();
4726
}
4827
}
49-
catch (Exception)
50-
{
51-
#if NETFX_CORE
52-
// To end a task, return
53-
return;
54-
#else
55-
//Thread.CurrentThread.Interrupt(); //TODO: what to do?
56-
#endif
57-
}
28+
29+
workPool.Enqueue(fn);
5830
}
5931

60-
public void AddWork(IModel model, Action fn)
32+
public void StopWork(IModel model)
6133
{
62-
if (this.workPool.AddWorkItem(model, fn))
34+
WorkPool workPool;
35+
if (workPools.TryRemove(model, out workPool))
6336
{
64-
var t = new Task(new Action(ExecuteThunk));
65-
t.Start(this.scheduler);
37+
workPool.Stop();
6638
}
6739
}
6840

69-
public void RegisterKey(IModel model)
41+
public void StopWork()
7042
{
71-
this.workPool.RegisterKey(model);
43+
foreach (var model in workPools.Keys)
44+
{
45+
StopWork(model);
46+
}
7247
}
7348

74-
public void StopWork(IModel model)
49+
class WorkPool
7550
{
76-
this.workPool.UnregisterKey(model);
77-
}
51+
readonly ConcurrentQueue<Action> actions;
52+
readonly AutoResetEvent messageArrived;
53+
readonly TimeSpan waitTime;
54+
readonly CancellationTokenSource tokenSource;
55+
readonly string name;
7856

79-
public void StopWork()
80-
{
81-
this.workPool.UnregisterAllKeys();
57+
public WorkPool(IModel model)
58+
{
59+
name = model.ToString();
60+
actions = new ConcurrentQueue<Action>();
61+
messageArrived = new AutoResetEvent(false);
62+
waitTime = TimeSpan.FromMilliseconds(100);
63+
tokenSource = new CancellationTokenSource();
64+
}
65+
66+
public void Start()
67+
{
68+
#if NETFX_CORE
69+
Task.Factory.StartNew(Loop, TaskCreationOptions.LongRunning);
70+
#else
71+
var thread = new Thread(Loop)
72+
{
73+
Name = "WorkPool-" + name,
74+
IsBackground = true
75+
};
76+
thread.Start();
77+
#endif
78+
}
79+
80+
public void Enqueue(Action action)
81+
{
82+
actions.Enqueue(action);
83+
messageArrived.Set();
84+
}
85+
86+
void Loop()
87+
{
88+
while (tokenSource.IsCancellationRequested == false)
89+
{
90+
Action action;
91+
while (actions.TryDequeue(out action))
92+
{
93+
try
94+
{
95+
action();
96+
}
97+
catch (Exception)
98+
{
99+
}
100+
}
101+
102+
messageArrived.WaitOne(waitTime);
103+
}
104+
}
105+
106+
public void Stop()
107+
{
108+
tokenSource.Cancel();
109+
}
82110
}
83111
}
84112
}

0 commit comments

Comments
 (0)