Skip to content

Commit 6a07989

Browse files
committed
The send task will be executed by the .NET thread pool, and whether the consume task is executed in parallel by the thread pool depends on the EnableConsumerPrefetch configuration. (#1380)
1 parent 70d39e4 commit 6a07989

File tree

3 files changed

+35
-37
lines changed

3 files changed

+35
-37
lines changed

src/DotNetCore.CAP/CAP.Options.cs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ public CapOptions()
2424
FailedRetryCount = 50;
2525
ConsumerThreadCount = 1;
2626
EnableConsumerPrefetch = false;
27-
ProducerThreadCount = 1;
2827
Extensions = new List<ICapOptionsExtension>();
2928
Version = "v1";
3029
DefaultGroupName = "cap.queue." + Assembly.GetEntryAssembly()?.GetName().Name!.ToLower();
@@ -91,7 +90,7 @@ public CapOptions()
9190
public int ConsumerThreadCount { get; set; }
9291

9392
/// <summary>
94-
/// If true, the message will be pre fetch to memory queue for execute.
93+
/// If true, the message will be pre fetch to memory queue for parallel execute by thread pool.
9594
/// <para>Not available when <see cref="UseDispatchingPerGroup"/> true.</para>
9695
/// Default is false
9796
/// </summary>
@@ -103,14 +102,9 @@ public CapOptions()
103102
/// <para>If true, the <see cref="EnableConsumerPrefetch"/> is not available.</para>
104103
/// Default is false.
105104
/// </summary>
105+
[Obsolete("Use EnableConsumerPrefetch instead. Setting it to true means that each consumer is now executed concurrently by thread pool, regardless of whether they are in different groups.")]
106106
public bool UseDispatchingPerGroup { get; set; }
107107

108-
/// <summary>
109-
/// The number of producer thread connections.
110-
/// Default is 1
111-
/// </summary>
112-
public int ProducerThreadCount { get; set; }
113-
114108
/// <summary>
115109
/// The interval of the collector processor deletes expired messages.
116110
/// Default is 300 seconds.

src/DotNetCore.CAP/Processor/IDispatcher.Default.cs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,22 +53,20 @@ public async Task Start(CancellationToken stoppingToken)
5353
_tasksCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, CancellationToken.None);
5454
_tasksCts.Token.Register(() => _delayCts.Cancel());
5555

56-
var capacity = _options.ProducerThreadCount * 500;
5756
_publishedChannel = Channel.CreateBounded<MediumMessage>(
58-
new BoundedChannelOptions(capacity > 5000 ? 5000 : capacity)
57+
new BoundedChannelOptions(5000)
5958
{
6059
AllowSynchronousContinuations = true,
61-
SingleReader = _options.ProducerThreadCount == 1,
60+
SingleReader = true,
6261
SingleWriter = true,
6362
FullMode = BoundedChannelFullMode.Wait
6463
});
6564

66-
await Task.WhenAll(Enumerable.Range(0, _options.ProducerThreadCount)
67-
.Select(_ => Task.Run(Sending, _tasksCts.Token)).ToArray()).ConfigureAwait(false);
65+
await Task.Run(Sending, _tasksCts.Token).ConfigureAwait(false); //here return valuetask
6866

6967
if (_enablePrefetch)
7068
{
71-
capacity = _options.ConsumerThreadCount * 300;
69+
var capacity = _options.ConsumerThreadCount * 300;
7270
_receivedChannel = Channel.CreateBounded<(MediumMessage, ConsumerExecutorDescriptor?)>(
7371
new BoundedChannelOptions(capacity > 3000 ? 3000 : capacity)
7472
{
@@ -99,7 +97,7 @@ await Task.WhenAll(Enumerable.Range(0, _options.ConsumerThreadCount)
9997
_logger.LogWarning(e, "Update storage fails of delayed message in memory queue!");
10098
}
10199
});
102-
100+
103101
while (!_tasksCts.Token.IsCancellationRequested)
104102
{
105103
try
@@ -170,6 +168,8 @@ public async ValueTask EnqueueToExecute(MediumMessage message, ConsumerExecutorD
170168
{
171169
try
172170
{
171+
if (_tasksCts!.IsCancellationRequested) return;
172+
173173
if (_enablePrefetch)
174174
{
175175
if (!_receivedChannel.Writer.TryWrite((message, descriptor)))
@@ -207,14 +207,18 @@ private async ValueTask Sending()
207207
while (_publishedChannel.Reader.TryRead(out var message))
208208
try
209209
{
210-
var result = await _sender.SendAsync(message).ConfigureAwait(false);
211-
if (!result.Succeeded)
212-
_logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception);
210+
var item = message;
211+
_ = Task.Run(async () =>
212+
{
213+
var result = await _sender.SendAsync(item).ConfigureAwait(false);
214+
if (!result.Succeeded)
215+
_logger.MessagePublishException(item.Origin.GetId(), result.ToString(), result.Exception);
216+
});
213217
}
214218
catch (Exception ex)
215219
{
216220
_logger.LogError(ex,
217-
$"An exception occurred when sending a message to the MQ. Id:{message.DbId}");
221+
$"An exception occurred when sending a message to the transport. Id:{message.DbId}");
218222
}
219223
}
220224
catch (OperationCanceledException)
@@ -231,16 +235,17 @@ private async ValueTask Processing()
231235
while (_receivedChannel.Reader.TryRead(out var message))
232236
try
233237
{
234-
await _executor.ExecuteAsync(message.Item1, message.Item2, _tasksCts.Token).ConfigureAwait(false);
238+
var item1 = message.Item1;
239+
var item2 = message.Item2;
240+
_ = Task.Run(() => _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false));
235241
}
236242
catch (OperationCanceledException)
237243
{
238244
//expected
239245
}
240246
catch (Exception e)
241247
{
242-
_logger.LogError(e,
243-
$"An exception occurred when invoke subscriber. MessageId:{message.Item1.DbId}");
248+
_logger.LogError(e, $"An exception occurred when invoke subscriber. MessageId:{message.Item1.DbId}");
244249
}
245250
}
246251
catch (OperationCanceledException)

src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,16 @@ public async Task Start(CancellationToken stoppingToken)
5353
_tasksCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, CancellationToken.None);
5454
_tasksCts.Token.Register(() => _delayCts.Cancel());
5555

56-
var capacity = _options.ProducerThreadCount * 500;
5756
_publishedChannel = Channel.CreateBounded<MediumMessage>(
58-
new BoundedChannelOptions(capacity > 5000 ? 5000 : capacity)
57+
new BoundedChannelOptions(5000)
5958
{
6059
AllowSynchronousContinuations = true,
61-
SingleReader = _options.ProducerThreadCount == 1,
60+
SingleReader = true,
6261
SingleWriter = true,
6362
FullMode = BoundedChannelFullMode.Wait
6463
});
6564

66-
await Task.WhenAll(Enumerable.Range(0, _options.ProducerThreadCount)
67-
.Select(_ => Task.Run(Sending, _tasksCts.Token)).ToArray());
65+
await Task.Run(Sending, _tasksCts.Token).ConfigureAwait(false); //here return Value task
6866

6967
_receivedChannels =
7068
new ConcurrentDictionary<string, Channel<(MediumMessage, ConsumerExecutorDescriptor?)>>(
@@ -214,16 +212,16 @@ private async ValueTask Sending()
214212
while (_publishedChannel.Reader.TryRead(out var message))
215213
try
216214
{
217-
var result = await _sender.SendAsync(message).ConfigureAwait(false);
218-
if (!result.Succeeded)
219-
_logger.MessagePublishException(
220-
message.Origin.GetId(),
221-
result.ToString(),
222-
result.Exception);
215+
_ = Task.Run(async () =>
216+
{
217+
var result = await _sender.SendAsync(message).ConfigureAwait(false);
218+
if (!result.Succeeded)
219+
_logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception);
220+
});
223221
}
224222
catch (Exception ex)
225223
{
226-
_logger.LogError(ex, $"An exception occurred when sending a message to the MQ. Id:{message.DbId}");
224+
_logger.LogError(ex, $"An exception occurred when sending a message to the broker. Id:{message.DbId}");
227225
}
228226
}
229227
catch (OperationCanceledException)
@@ -240,10 +238,11 @@ private async ValueTask Processing(string group, Channel<(MediumMessage, Consume
240238
while (channel.Reader.TryRead(out var message))
241239
try
242240
{
243-
if (_logger.IsEnabled(LogLevel.Debug))
244-
_logger.LogDebug("Dispatching message for group {ConsumerGroup}", group);
241+
_logger.LogDebug("Dispatching message for group {ConsumerGroup}", group);
245242

246-
await _executor.ExecuteAsync(message.Item1, message.Item2, _tasksCts.Token).ConfigureAwait(false);
243+
var item1 = message.Item1;
244+
var item2 = message.Item2;
245+
_ = Task.Run(() => _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false));
247246
}
248247
catch (OperationCanceledException)
249248
{

0 commit comments

Comments
 (0)