Skip to content

Commit 0ba5c7e

Browse files
committed
Optimize Dispatcher message processing logic
Refactor the `Dispatcher` class to improve performance and reduce thread pool scheduling overhead: - Replace `Task.Run` with direct async calls to `SendMessageAsync` for message sending. - Ensure `batchSize` has a minimum value of 1 and preallocate the `tasks` list for efficiency. - Add a conditional check to invoke `Task.WhenAll` only when tasks exist. - Simplify non-parallel send logic by reusing `SendMessageAsync`. - Introduce the `SendMessageAsync` method to encapsulate message sending, handle exceptions, and log errors. - Use `.ConfigureAwait(false)` consistently for asynchronous calls. These changes enhance maintainability, readability, and performance by reducing redundant code and optimizing asynchronous operations.
1 parent f4c74d9 commit 0ba5c7e

File tree

1 file changed

+26
-25
lines changed

1 file changed

+26
-25
lines changed

src/DotNetCore.CAP/Processor/IDispatcher.Default.cs

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -249,39 +249,24 @@ private async ValueTask Sending()
249249
{
250250
if (_enableParallelSend)
251251
{
252-
var tasks = new List<Task>();
253-
var batchSize = _pChannelSize / 50;
252+
// Use direct async sends instead of Task.Run to reduce thread pool scheduling and allocations
253+
var batchSize = Math.Max(1, _pChannelSize / 50);
254+
var tasks = new List<Task>(batchSize);
255+
254256
for (var i = 0; i < batchSize && _publishedChannel.Reader.TryRead(out var message); i++)
255257
{
256-
var item = message;
257-
tasks.Add(Task.Run(async () =>
258-
{
259-
try
260-
{
261-
var result = await _sender.SendAsync(item).ConfigureAwait(false);
262-
if (!result.Succeeded) _logger.MessagePublishException(item.Origin.GetId(), result.ToString(), result.Exception);
263-
}
264-
catch (Exception ex)
265-
{
266-
_logger.LogError(ex, $"An exception occurred when sending a message to the transport. Id:{message.DbId}");
267-
}
268-
}));
258+
tasks.Add(SendMessageAsync(message));
269259
}
270260

271-
await Task.WhenAll(tasks);
261+
if (tasks.Count > 0)
262+
{
263+
await Task.WhenAll(tasks).ConfigureAwait(false);
264+
}
272265
}
273266
else
274267
{
275268
while (_publishedChannel.Reader.TryRead(out var message))
276-
try
277-
{
278-
var result = await _sender.SendAsync(message).ConfigureAwait(false);
279-
if (!result.Succeeded) _logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception);
280-
}
281-
catch (Exception ex)
282-
{
283-
_logger.LogError(ex, $"An exception occurred when sending a message to the transport. Id:{message.DbId}");
284-
}
269+
await SendMessageAsync(message).ConfigureAwait(false);
285270
}
286271
}
287272

@@ -292,6 +277,22 @@ private async ValueTask Sending()
292277
}
293278
}
294279

280+
private async Task SendMessageAsync(MediumMessage message)
281+
{
282+
try
283+
{
284+
var result = await _sender.SendAsync(message).ConfigureAwait(false);
285+
if (!result.Succeeded)
286+
{
287+
_logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception);
288+
}
289+
}
290+
catch (Exception ex)
291+
{
292+
_logger.LogError(ex, "An exception occurred when sending a message to the transport. Id:{MessageId}", message.DbId);
293+
}
294+
}
295+
295296
private async ValueTask Processing()
296297
{
297298
try

0 commit comments

Comments
 (0)