Skip to content

Commit 3307d05

Browse files
committed
No need for a list of secondary receivers
1 parent 5834772 commit 3307d05

File tree

5 files changed

+38
-32
lines changed

5 files changed

+38
-32
lines changed

src/NServiceBus.RabbitMQ.Tests/RabbitMqContext.cs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,7 @@ public void SetUp()
8686

8787
dequeueStrategy = new RabbitMqDequeueStrategy(connectionManager, null,
8888
new Configure(new SettingsHolder(), new FakeContainer(), new List<Action<IConfigureComponents>>(), new PipelineSettings(new BusConfiguration())),
89-
new SecondaryReceiveConfiguration(s =>
90-
{
91-
var settings = new SecondaryReceiveSettings
92-
{
93-
MaximumConcurrencyLevel = 1
94-
};
95-
96-
settings.SecondaryQueues.Add(CallbackQueue);
97-
98-
return settings;
99-
}));
89+
new SecondaryReceiveConfiguration(s => new SecondaryReceiveSettings(CallbackQueue,1)));
10090

10191

10292
MakeSureQueueAndExchangeExists(ReceiverQueue);

src/NServiceBus.RabbitMQ/Config/RabbitMqTransport.cs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,7 @@ protected override void Configure(FeatureConfigurationContext context, string co
6565
return new SecondaryReceiveSettings();
6666
}
6767

68-
var settings = new SecondaryReceiveSettings
69-
{
70-
MaximumConcurrencyLevel = maxConcurrencyForCallbackReceiver
71-
};
72-
73-
74-
settings.SecondaryQueues.Add(callbackQueue);
75-
76-
return settings;
68+
return new SecondaryReceiveSettings(callbackQueue, maxConcurrencyForCallbackReceiver);
7769
}));
7870

7971
context.Container.ConfigureComponent<RabbitMqDequeueStrategy>(DependencyLifecycle.InstancePerCall);

src/NServiceBus.RabbitMQ/RabbitMqDequeueStrategy.cs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,15 @@ public void Start(int maximumConcurrencyLevel)
6969
{
7070
var secondaryReceiveSettings = secondaryReceiveConfiguration.GetSettings(workQueue);
7171

72-
var actualConcurrencyLevel = maximumConcurrencyLevel +
73-
secondaryReceiveSettings.MaximumConcurrencyLevel * secondaryReceiveSettings.SecondaryQueues.Count;
72+
var actualConcurrencyLevel = maximumConcurrencyLevel;
73+
74+
if (secondaryReceiveSettings.Enabled)
75+
{
76+
actualConcurrencyLevel += secondaryReceiveSettings.MaximumConcurrencyLevel;
77+
}
7478

7579
tokenSource = new CancellationTokenSource();
76-
80+
7781
// We need to add an extra one because if we fail and the count is at zero already, it doesn't allow us to add one more.
7882
countdownEvent = new CountdownEvent(actualConcurrencyLevel + 1);
7983

@@ -82,14 +86,14 @@ public void Start(int maximumConcurrencyLevel)
8286
StartConsumer(workQueue);
8387
}
8488

85-
foreach (var secondaryReceiveQueue in secondaryReceiveSettings.SecondaryQueues)
89+
if (secondaryReceiveSettings.Enabled)
8690
{
8791
for (var i = 0; i < secondaryReceiveSettings.MaximumConcurrencyLevel; i++)
8892
{
89-
StartConsumer(secondaryReceiveQueue);
93+
StartConsumer(secondaryReceiveSettings.SecondaryReceiveQueue);
9094
}
9195

92-
Logger.InfoFormat("Secondary receiver for queue '{0}' initiated with concurrency '{1}'",secondaryReceiveQueue,secondaryReceiveSettings.MaximumConcurrencyLevel);
96+
Logger.InfoFormat("Secondary receiver for queue '{0}' initiated with concurrency '{1}'", secondaryReceiveSettings.SecondaryReceiveQueue, secondaryReceiveSettings.MaximumConcurrencyLevel);
9397
}
9498
}
9599

@@ -263,7 +267,7 @@ void Purge()
263267
static ILog Logger = LogManager.GetLogger(typeof(RabbitMqDequeueStrategy));
264268

265269
RepeatedFailuresOverTimeCircuitBreaker circuitBreaker;
266-
270+
267271
bool autoAck;
268272
CountdownEvent countdownEvent;
269273
Action<TransportMessage, Exception> endProcessMessage;

src/NServiceBus.RabbitMQ/SecondaryReceiveConfiguration.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ public SecondaryReceiveConfiguration(Func<string, SecondaryReceiveSettings> getS
88
{
99
secondaryReceiveSettings = getSecondaryReceiveSettings;
1010
}
11-
12-
11+
1312
public SecondaryReceiveSettings GetSettings(string queue)
1413
{
1514
return secondaryReceiveSettings(queue);
Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,31 @@
11
namespace NServiceBus.Transports.RabbitMQ
22
{
3-
using System.Collections.Generic;
3+
using System;
44

55
class SecondaryReceiveSettings
66
{
7-
public int MaximumConcurrencyLevel;
8-
public List<string> SecondaryQueues = new List<string>();
7+
public SecondaryReceiveSettings()
8+
{
9+
Enabled = false;
10+
}
11+
12+
public SecondaryReceiveSettings(string secondaryReceiveQueue, int maximumConcurrencyLevel)
13+
{
14+
if (maximumConcurrencyLevel < 0)
15+
{
16+
throw new Exception("Concurrency level must be a positive value");
17+
}
18+
19+
Enabled = true;
20+
21+
SecondaryReceiveQueue = secondaryReceiveQueue;
22+
23+
MaximumConcurrencyLevel = maximumConcurrencyLevel;
24+
}
25+
26+
public bool Enabled { get; private set; }
27+
28+
public int MaximumConcurrencyLevel { get; private set; }
29+
public string SecondaryReceiveQueue { get; private set; }
930
}
1031
}

0 commit comments

Comments
 (0)