Skip to content

Commit 1c736ed

Browse files
committed
The lazy start for ConsumerWorkService's threads introduced. A disambiguation between the main model and others no longer needed - first AddWork will spawn a working thread.
1 parent 024c7c5 commit 1c736ed

File tree

2 files changed

+12
-22
lines changed

2 files changed

+12
-22
lines changed

projects/client/RabbitMQ.Client/src/client/impl/ConcurrentConsumerDispatcher.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ public ConcurrentConsumerDispatcher(ModelBase model, ConsumerWorkService ws)
1313
{
1414
this.model = model;
1515
this.workService = ws;
16-
this.workService.RegisterKey(model);
1716
this.IsShutdown = false;
1817
}
1918

projects/client/RabbitMQ.Client/src/client/impl/ConsumerWorkService.cs

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,35 +10,27 @@ public class ConsumerWorkService
1010

1111
public void AddWork(IModel model, Action fn)
1212
{
13+
// two step approach is taken, as TryGetValue does not aquire locks
14+
// if this fails, GetOrAdd is called, which takes a lock
15+
1316
WorkPool workPool;
14-
if (workPools.TryGetValue(model, out workPool))
17+
if (workPools.TryGetValue(model, out workPool) == false)
1518
{
16-
workPool.Enqueue(fn);
17-
}
18-
}
19+
var newWorkPool = new WorkPool(model);
20+
workPool = workPools.GetOrAdd(model, newWorkPool);
1921

20-
public void RegisterKey(IModel model)
21-
{
22-
// the main model can be skipped, as it will not use CWS anyway
23-
if (model.ChannelNumber == 0)
24-
{
25-
return;
22+
// start if it's only the workpool that has been just created
23+
if (newWorkPool == workPool)
24+
{
25+
newWorkPool.Start();
26+
}
2627
}
2728

28-
var workPool = new WorkPool(model);
29-
if (workPools.TryAdd(model, workPool))
30-
{
31-
workPool.Start();
32-
}
29+
workPool.Enqueue(fn);
3330
}
3431

3532
public void StopWork(IModel model)
3633
{
37-
if (model.ChannelNumber == 0)
38-
{
39-
return;
40-
}
41-
4234
WorkPool workPool;
4335
if (workPools.TryRemove(model, out workPool))
4436
{
@@ -54,7 +46,6 @@ public void StopWork()
5446
}
5547
}
5648

57-
5849
class WorkPool
5950
{
6051
readonly ConcurrentQueue<Action> actions;

0 commit comments

Comments
 (0)