Skip to content

Commit d0fed70

Browse files
authored
.Net Agent Runtime Fix: Allow multiple subscribers to handle events concurrently (#12713)
### Motivation and Context <!-- Thank you for your contribution to the semantic-kernel repo! Please help reviewers and future users, providing the following information: 1. Why is this change required? 2. What problem does it solve? 3. What scenario does it contribute to? 4. If it fixes an open issue, please link to the issue here. --> Enable multiple subscribers to to handle events concurrently (instead of sequentially) Fixes: #12701 ### Description <!-- Describe your changes, the overall approach, the underlying design. These notes will help understanding how your code works. Thanks! --> The event handling loop of the `InProcessRuntime` is waiting on each subscriber to handle the event. This conflicts with performance expectations. ### Contribution Checklist <!-- Before submitting this PR, please make sure: --> - [X] The code builds clean without any errors or warnings - [X] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [X] All unit tests pass, and I have added new tests where possible - [X] I didn't break anyone 😄
1 parent 9fe95bf commit d0fed70

File tree

2 files changed

+26
-38
lines changed

2 files changed

+26
-38
lines changed

dotnet/src/Agents/Runtime/InProcess.Tests/PublishMessageTests.cs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright (c) Microsoft. All rights reserved.
22

33
using System;
4-
using System.Linq;
54
using System.Threading.Tasks;
65
using FluentAssertions;
76
using Xunit;
@@ -56,10 +55,7 @@ public async Task Test_PublishMessage_MultipleFailures()
5655
Func<Task> publishTask = async () => await fixture.RunPublishTestAsync(new TopicId("TestTopic"), new BasicMessage { Content = "1" });
5756

5857
// What we are really testing here is that a single exception does not prevent sending to the remaining agents
59-
(await publishTask.Should().ThrowAsync<AggregateException>())
60-
.Which.Should().Match<AggregateException>(
61-
exception => exception.InnerExceptions.Count == 2 &&
62-
exception.InnerExceptions.All(exception => exception is TestException));
58+
await publishTask.Should().ThrowAsync<TestException>();
6359

6460
fixture.GetAgentInstances<ErrorAgent>().Values
6561
.Should().HaveCount(2)
@@ -81,11 +77,7 @@ public async Task Test_PublishMessage_MixedSuccessFailure()
8177
Func<Task> publicTask = async () => await fixture.RunPublishTestAsync(new TopicId("TestTopic"), new BasicMessage { Content = "1" });
8278

8379
// What we are really testing here is that raising exceptions does not prevent sending to the remaining agents
84-
(await publicTask.Should().ThrowAsync<AggregateException>())
85-
.Which.Should().Match<AggregateException>(
86-
exception => exception.InnerExceptions.Count == 2 &&
87-
exception.InnerExceptions.All(
88-
exception => exception is TestException));
80+
await publicTask.Should().ThrowAsync<TestException>();
8981

9082
fixture.GetAgentInstances<ReceiverAgent>().Values
9183
.Should().HaveCount(2, "Two ReceiverAgents should have been created")

dotnet/src/Agents/Runtime/InProcess/InProcessRuntime.cs

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -352,45 +352,41 @@ private async ValueTask PublishMessageServicerAsync(MessageEnvelope envelope, Ca
352352
throw new InvalidOperationException("Message must have a topic to be published.");
353353
}
354354

355-
List<Exception> exceptions = [];
355+
List<Task>? tasks = null;
356356
TopicId topic = envelope.Topic.Value;
357357
foreach (ISubscriptionDefinition subscription in this._subscriptions.Values.Where(subscription => subscription.Matches(topic)))
358358
{
359-
try
360-
{
361-
deliveryToken.ThrowIfCancellationRequested();
359+
(tasks ??= []).Add(ProcessSubscriptionAsync(envelope, topic, subscription, deliveryToken));
360+
}
362361

363-
AgentId? sender = envelope.Sender;
362+
if (tasks is not null)
363+
{
364+
await Task.WhenAll(tasks).ConfigureAwait(false);
365+
}
364366

365-
using CancellationTokenSource combinedSource = CancellationTokenSource.CreateLinkedTokenSource(envelope.Cancellation, deliveryToken);
366-
MessageContext messageContext = new(envelope.MessageId, combinedSource.Token)
367-
{
368-
Sender = sender,
369-
Topic = topic,
370-
IsRpc = false
371-
};
367+
async Task ProcessSubscriptionAsync(MessageEnvelope envelope, TopicId topic, ISubscriptionDefinition subscription, CancellationToken deliveryToken)
368+
{
369+
deliveryToken.ThrowIfCancellationRequested();
372370

373-
AgentId agentId = subscription.MapToAgent(topic);
374-
if (!this.DeliverToSelf && sender.HasValue && sender == agentId)
375-
{
376-
continue;
377-
}
371+
AgentId? sender = envelope.Sender;
378372

379-
IHostableAgent agent = await this.EnsureAgentAsync(agentId).ConfigureAwait(false);
373+
using CancellationTokenSource combinedSource = CancellationTokenSource.CreateLinkedTokenSource(envelope.Cancellation, deliveryToken);
374+
MessageContext messageContext = new(envelope.MessageId, combinedSource.Token)
375+
{
376+
Sender = sender,
377+
Topic = topic,
378+
IsRpc = false
379+
};
380380

381-
// TODO: Cancellation propagation!
382-
await agent.OnMessageAsync(envelope.Message, messageContext).ConfigureAwait(false);
383-
}
384-
catch (Exception ex) when (!ex.IsCriticalException())
381+
AgentId agentId = subscription.MapToAgent(topic);
382+
if (!this.DeliverToSelf && sender.HasValue && sender == agentId)
385383
{
386-
exceptions.Add(ex);
384+
return;
387385
}
388-
}
389386

390-
if (exceptions.Count > 0)
391-
{
392-
// TODO: Unwrap TargetInvocationException?
393-
throw new AggregateException("One or more exceptions occurred while processing the message.", exceptions);
387+
IHostableAgent agent = await this.EnsureAgentAsync(agentId).ConfigureAwait(false);
388+
389+
await agent.OnMessageAsync(envelope.Message, messageContext).ConfigureAwait(false);
394390
}
395391
}
396392

0 commit comments

Comments
 (0)