Skip to content

Commit 7f84af5

Browse files
committed
The options EnableConsumerPrefetch and UseDispatchingPerGroup will work together without interference. (#1399)
1 parent ff76dea commit 7f84af5

File tree

3 files changed

+13
-4
lines changed

3 files changed

+13
-4
lines changed

src/DotNetCore.CAP/CAP.Options.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,15 @@ public CapOptions()
9191

9292
/// <summary>
9393
/// If true, the message will be pre fetch to memory queue for parallel execute by thread pool.
94-
/// <para>Not available when <see cref="UseDispatchingPerGroup"/> true.</para>
9594
/// Default is false
9695
/// </summary>
9796
public bool EnableConsumerPrefetch { get; set; }
9897

9998
/// <summary>
10099
/// If true then each message group will have own independent dispatching pipeline. Each pipeline use as many threads as
101100
/// <see cref="ConsumerThreadCount" /> value is.
102-
/// <para>If true, the <see cref="EnableConsumerPrefetch"/> is not available.</para>
103101
/// Default is false.
104102
/// </summary>
105-
[Obsolete("Use EnableConsumerPrefetch instead. Setting it to true means that each consumer is now executed concurrently by thread pool, regardless of whether they are in different groups.")]
106103
public bool UseDispatchingPerGroup { get; set; }
107104

108105
/// <summary>

src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public void Execute()
119119
ICollection<string> topics;
120120
try
121121
{
122+
// ReSharper disable once ConvertToUsingDeclaration
122123
using (var client = _consumerClientFactory.Create(matchGroup.Key))
123124
{
124125
client.OnLogCallback = WriteLog;
@@ -139,6 +140,7 @@ public void Execute()
139140
{
140141
try
141142
{
143+
// ReSharper disable once ConvertToUsingDeclaration
142144
using (var client = _consumerClientFactory.Create(matchGroup.Key))
143145
{
144146
_serverAddress = client.BrokerAddress;

src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ internal class DispatcherPerGroup : IDispatcher
2727
private readonly IMessageSender _sender;
2828
private readonly IDataStorage _storage;
2929
private readonly PriorityQueue<MediumMessage, DateTime> _schedulerQueue;
30+
private readonly bool _enablePrefetch;
3031

3132
private Channel<MediumMessage> _publishedChannel = default!;
3233
private ConcurrentDictionary<string, Channel<(MediumMessage, ConsumerExecutorDescriptor?)>> _receivedChannels = default!;
@@ -45,6 +46,7 @@ public DispatcherPerGroup(ILogger<Dispatcher> logger,
4546
_executor = executor;
4647
_schedulerQueue = new PriorityQueue<MediumMessage, DateTime>();
4748
_storage = storage;
49+
_enablePrefetch = options.Value.EnableConsumerPrefetch;
4850
}
4951

5052
public async Task Start(CancellationToken stoppingToken)
@@ -242,7 +244,15 @@ private async ValueTask Processing(string group, Channel<(MediumMessage, Consume
242244

243245
var item1 = message.Item1;
244246
var item2 = message.Item2;
245-
_ = Task.Run(() => _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false));
247+
248+
if (_enablePrefetch)
249+
{
250+
_ = Task.Run(() => _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false));
251+
}
252+
else
253+
{
254+
await _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false);
255+
}
246256
}
247257
catch (OperationCanceledException)
248258
{

0 commit comments

Comments
 (0)