Skip to content

Commit b74ea05

Browse files
authored
Python: Improve orchestration exception handling (#12716)
### Motivation and Context <!-- Thank you for your contribution to the semantic-kernel repo! Please help reviewers and future users, providing the following information: 1. Why is this change required? 2. What problem does it solve? 3. What scenario does it contribute to? 4. If it fixes an open issue, please link to the issue here. --> Currently, exceptions that occur inside the orchestration actors are not properly surfaced to the caller as it happens. Fixes: #12719 ### Description <!-- Describe your changes, the overall approach, the underlying design. These notes will help understanding how your code works. Thanks! --> This PR addresses the issue by introducing a new exception_callback that is hidden from the user but will raise exceptions that occurs inside the orchestration actors to the caller. ### Contribution Checklist <!-- Before submitting this PR, please make sure: --> - [x] The code builds clean without any errors or warnings - [x] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [x] All unit tests pass, and I have added new tests where possible - [x] I didn't break anyone 😄
1 parent d16fed4 commit b74ea05

File tree

15 files changed

+314
-36
lines changed

15 files changed

+314
-36
lines changed

python/semantic_kernel/agents/orchestration/agent_actor_base.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33

44
import inspect
5+
import logging
56
import sys
67
from collections.abc import Awaitable, Callable
8+
from functools import wraps
79
from typing import Any
810

911
from semantic_kernel.agents.agent import Agent, AgentThread
@@ -18,11 +20,27 @@
1820
else:
1921
from typing_extensions import override # pragma: no cover
2022

23+
logger: logging.Logger = logging.getLogger(__name__)
24+
2125

2226
@experimental
2327
class ActorBase(RoutedAgent):
2428
"""A base class for actors running in the AgentRuntime."""
2529

30+
def __init__(
31+
self,
32+
description: str,
33+
exception_callback: Callable[[BaseException], None],
34+
):
35+
"""Initialize the actor with a description and an exception callback.
36+
37+
Args:
38+
description (str): A description of the actor.
39+
exception_callback (Callable[[BaseException], None]): A callback function to handle exceptions.
40+
"""
41+
super().__init__(description=description)
42+
self._exception_callback = exception_callback
43+
2644
@override
2745
async def on_message_impl(self, message: Any, ctx: MessageContext) -> Any | None:
2846
"""Handle a message.
@@ -34,6 +52,46 @@ async def on_message_impl(self, message: Any, ctx: MessageContext) -> Any | None
3452

3553
return await super().on_message_impl(message, ctx)
3654

55+
@staticmethod
56+
def exception_handler(func: Callable[..., Any]) -> Callable[..., Any]:
57+
"""Decorator that wraps a function in a try-catch block and calls the exception callback on errors.
58+
59+
This decorator can be used on both synchronous and asynchronous functions. When an exception
60+
occurs during function execution, it will call the exception_callback with the exception
61+
and then re-raise the exception.
62+
63+
Args:
64+
func: The function to be wrapped.
65+
66+
Returns:
67+
The wrapped function.
68+
"""
69+
log_message_template = "Exception occurred in agent {agent_id}:\n{exception}"
70+
71+
if inspect.iscoroutinefunction(func):
72+
73+
@wraps(func)
74+
async def async_wrapper(self, *args, **kwargs):
75+
try:
76+
return await func(self, *args, **kwargs)
77+
except BaseException as e:
78+
self._exception_callback(e)
79+
logger.error(log_message_template.format(agent_id=self.id, exception=e))
80+
raise
81+
82+
return async_wrapper
83+
84+
@wraps(func)
85+
def sync_wrapper(self, *args, **kwargs):
86+
try:
87+
return func(self, *args, **kwargs)
88+
except BaseException as e:
89+
self._exception_callback(e)
90+
logger.error(log_message_template.format(agent_id=self.id, exception=e))
91+
raise
92+
93+
return sync_wrapper
94+
3795

3896
@experimental
3997
class AgentActorBase(ActorBase):
@@ -43,6 +101,7 @@ def __init__(
43101
self,
44102
agent: Agent,
45103
internal_topic_type: str,
104+
exception_callback: Callable[[BaseException], None],
46105
agent_response_callback: Callable[[DefaultTypeAlias], Awaitable[None] | None] | None = None,
47106
streaming_agent_response_callback: Callable[[StreamingChatMessageContent, bool], Awaitable[None] | None]
48107
| None = None,
@@ -52,6 +111,7 @@ def __init__(
52111
Args:
53112
agent (Agent): An agent to be run in the container.
54113
internal_topic_type (str): The topic type of the internal topic.
114+
exception_callback (Callable): A function that is called when an exception occurs.
55115
agent_response_callback (Callable | None): A function that is called when a full response is produced
56116
by the agents.
57117
streaming_agent_response_callback (Callable | None): A function that is called when a streaming response
@@ -66,7 +126,7 @@ def __init__(
66126
# Chat history to temporarily store messages before each invoke.
67127
self._message_cache: ChatHistory = ChatHistory()
68128

69-
ActorBase.__init__(self, description=agent.description or "Semantic Kernel Agent")
129+
super().__init__(agent.description or "Semantic Kernel Actor", exception_callback)
70130

71131
async def _call_agent_response_callback(self, message: DefaultTypeAlias) -> None:
72132
"""Call the agent_response_callback function if it is set.
@@ -97,6 +157,7 @@ async def _call_streaming_agent_response_callback(
97157
else:
98158
self._streaming_agent_response_callback(message_chunk, is_final)
99159

160+
@ActorBase.exception_handler
100161
async def _invoke_agent(self, additional_messages: DefaultTypeAlias | None = None, **kwargs) -> ChatMessageContent:
101162
"""Invoke the agent with the current chat history or thread and optionally additional messages.
102163

python/semantic_kernel/agents/orchestration/concurrent.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def __init__(
5656
agent: Agent,
5757
internal_topic_type: str,
5858
collection_agent_type: str,
59+
exception_callback: Callable[[BaseException], None],
5960
agent_response_callback: Callable[[DefaultTypeAlias], Awaitable[None] | None] | None = None,
6061
streaming_agent_response_callback: Callable[[StreamingChatMessageContent, bool], Awaitable[None] | None]
6162
| None = None,
@@ -66,13 +67,15 @@ def __init__(
6667
agent: The agent to be executed.
6768
internal_topic_type: The internal topic type for the actor.
6869
collection_agent_type: The collection agent type for the actor.
70+
exception_callback: A callback function to handle exceptions.
6971
agent_response_callback: A callback function to handle the full response from the agent.
7072
streaming_agent_response_callback: A callback function to handle streaming responses from the agent.
7173
"""
7274
self._collection_agent_type = collection_agent_type
7375
super().__init__(
7476
agent=agent,
7577
internal_topic_type=internal_topic_type,
78+
exception_callback=exception_callback,
7679
agent_response_callback=agent_response_callback,
7780
streaming_agent_response_callback=streaming_agent_response_callback,
7881
)
@@ -102,6 +105,7 @@ def __init__(
102105
self,
103106
description: str,
104107
expected_answer_count: int,
108+
exception_callback: Callable[[BaseException], None],
105109
result_callback: Callable[[DefaultTypeAlias], Awaitable[None]] | None = None,
106110
) -> None:
107111
"""Initialize the collection agent container."""
@@ -110,7 +114,7 @@ def __init__(
110114
self._results: list[ChatMessageContent] = []
111115
self._lock = asyncio.Lock()
112116

113-
super().__init__(description=description)
117+
super().__init__(description, exception_callback)
114118

115119
@message_handler
116120
async def _handle_message(self, message: ConcurrentResponseMessage, _: MessageContext) -> None:
@@ -147,17 +151,20 @@ async def _prepare(
147151
self,
148152
runtime: CoreRuntime,
149153
internal_topic_type: str,
150-
result_callback: Callable[[DefaultTypeAlias], Awaitable[None]] | None = None,
154+
exception_callback: Callable[[BaseException], None],
155+
result_callback: Callable[[DefaultTypeAlias], Awaitable[None]],
151156
) -> None:
152157
"""Register the actors and orchestrations with the runtime and add the required subscriptions."""
153158
await asyncio.gather(*[
154159
self._register_members(
155160
runtime,
156161
internal_topic_type,
162+
exception_callback,
157163
),
158164
self._register_collection_actor(
159165
runtime,
160166
internal_topic_type,
167+
exception_callback,
161168
result_callback=result_callback,
162169
),
163170
self._add_subscriptions(
@@ -170,6 +177,7 @@ async def _register_members(
170177
self,
171178
runtime: CoreRuntime,
172179
internal_topic_type: str,
180+
exception_callback: Callable[[BaseException], None],
173181
) -> None:
174182
"""Register the members."""
175183

@@ -180,7 +188,8 @@ async def _internal_helper(agent: Agent) -> None:
180188
lambda agent=agent: ConcurrentAgentActor( # type: ignore[misc]
181189
agent,
182190
internal_topic_type,
183-
collection_agent_type=self._get_collection_actor_type(internal_topic_type),
191+
self._get_collection_actor_type(internal_topic_type),
192+
exception_callback,
184193
agent_response_callback=self._agent_response_callback,
185194
streaming_agent_response_callback=self._streaming_agent_response_callback,
186195
),
@@ -192,6 +201,7 @@ async def _register_collection_actor(
192201
self,
193202
runtime: CoreRuntime,
194203
internal_topic_type: str,
204+
exception_callback: Callable[[BaseException], None],
195205
result_callback: Callable[[DefaultTypeAlias], Awaitable[None]] | None = None,
196206
) -> None:
197207
await CollectionActor.register(
@@ -200,6 +210,7 @@ async def _register_collection_actor(
200210
lambda: CollectionActor(
201211
description="An internal agent that is responsible for collection results",
202212
expected_answer_count=len(self._members),
213+
exception_callback=exception_callback,
203214
result_callback=result_callback,
204215
),
205216
)

python/semantic_kernel/agents/orchestration/group_chat.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ def __init__(
257257
manager: GroupChatManager,
258258
internal_topic_type: str,
259259
participant_descriptions: dict[str, str],
260+
exception_callback: Callable[[BaseException], None],
260261
result_callback: Callable[[DefaultTypeAlias], Awaitable[None]] | None = None,
261262
):
262263
"""Initialize the group chat manager actor.
@@ -265,8 +266,7 @@ def __init__(
265266
manager (GroupChatManager): The group chat manager that manages the flow of the group chat.
266267
internal_topic_type (str): The topic type of the internal topic.
267268
participant_descriptions (dict[str, str]): The descriptions of the participants in the group chat.
268-
agent_response_callback (Callable | None): A function that is called when a response is produced
269-
by the agents.
269+
exception_callback (Callable[[BaseException], None]): A function that is called when an exception occurs.
270270
result_callback (Callable | None): A function that is called when the group chat manager produces a result.
271271
"""
272272
self._manager = manager
@@ -275,7 +275,7 @@ def __init__(
275275
self._participant_descriptions = participant_descriptions
276276
self._result_callback = result_callback
277277

278-
super().__init__(description="An actor for the group chat manager.")
278+
super().__init__("An actor for the group chat manager.", exception_callback)
279279

280280
@message_handler
281281
async def _handle_start_message(self, message: GroupChatStartMessage, ctx: MessageContext) -> None:
@@ -304,6 +304,7 @@ async def _handle_response_message(self, message: GroupChatResponseMessage, ctx:
304304

305305
await self._determine_state_and_take_action(ctx.cancellation_token)
306306

307+
@ActorBase.exception_handler
307308
async def _determine_state_and_take_action(self, cancellation_token: CancellationToken) -> None:
308309
"""Determine the state of the group chat and take action accordingly."""
309310
# User input state
@@ -448,14 +449,20 @@ async def _prepare(
448449
self,
449450
runtime: CoreRuntime,
450451
internal_topic_type: str,
452+
exception_callback: Callable[[BaseException], None],
451453
result_callback: Callable[[DefaultTypeAlias], Awaitable[None]],
452454
) -> None:
453455
"""Register the actors and orchestrations with the runtime and add the required subscriptions."""
454-
await self._register_members(runtime, internal_topic_type)
455-
await self._register_manager(runtime, internal_topic_type, result_callback=result_callback)
456+
await self._register_members(runtime, internal_topic_type, exception_callback)
457+
await self._register_manager(runtime, internal_topic_type, exception_callback, result_callback)
456458
await self._add_subscriptions(runtime, internal_topic_type)
457459

458-
async def _register_members(self, runtime: CoreRuntime, internal_topic_type: str) -> None:
460+
async def _register_members(
461+
self,
462+
runtime: CoreRuntime,
463+
internal_topic_type: str,
464+
exception_callback: Callable[[BaseException], None],
465+
) -> None:
459466
"""Register the agents."""
460467
await asyncio.gather(*[
461468
GroupChatAgentActor.register(
@@ -464,6 +471,7 @@ async def _register_members(self, runtime: CoreRuntime, internal_topic_type: str
464471
lambda agent=agent: GroupChatAgentActor( # type: ignore[misc]
465472
agent,
466473
internal_topic_type,
474+
exception_callback=exception_callback,
467475
agent_response_callback=self._agent_response_callback,
468476
streaming_agent_response_callback=self._streaming_agent_response_callback,
469477
),
@@ -475,6 +483,7 @@ async def _register_manager(
475483
self,
476484
runtime: CoreRuntime,
477485
internal_topic_type: str,
486+
exception_callback: Callable[[BaseException], None],
478487
result_callback: Callable[[DefaultTypeAlias], Awaitable[None]] | None = None,
479488
) -> None:
480489
"""Register the group chat manager."""
@@ -485,6 +494,7 @@ async def _register_manager(
485494
self._manager,
486495
internal_topic_type=internal_topic_type,
487496
participant_descriptions={agent.name: agent.description for agent in self._members}, # type: ignore[misc]
497+
exception_callback=exception_callback,
488498
result_callback=result_callback,
489499
),
490500
)

python/semantic_kernel/agents/orchestration/handoffs.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from functools import partial
1010

1111
from semantic_kernel.agents.agent import Agent
12-
from semantic_kernel.agents.orchestration.agent_actor_base import AgentActorBase
12+
from semantic_kernel.agents.orchestration.agent_actor_base import ActorBase, AgentActorBase
1313
from semantic_kernel.agents.orchestration.orchestration_base import DefaultTypeAlias, OrchestrationBase, TIn, TOut
1414
from semantic_kernel.agents.runtime.core.cancellation_token import CancellationToken
1515
from semantic_kernel.agents.runtime.core.core_runtime import CoreRuntime
@@ -161,6 +161,7 @@ def __init__(
161161
agent: Agent,
162162
internal_topic_type: str,
163163
handoff_connections: AgentHandoffs,
164+
exception_callback: Callable[[BaseException], None],
164165
result_callback: Callable[[DefaultTypeAlias], Awaitable[None]] | None = None,
165166
agent_response_callback: Callable[[DefaultTypeAlias], Awaitable[None] | None] | None = None,
166167
streaming_agent_response_callback: Callable[[StreamingChatMessageContent, bool], Awaitable[None] | None]
@@ -181,6 +182,7 @@ def __init__(
181182
super().__init__(
182183
agent=agent,
183184
internal_topic_type=internal_topic_type,
185+
exception_callback=exception_callback,
184186
agent_response_callback=agent_response_callback,
185187
streaming_agent_response_callback=streaming_agent_response_callback,
186188
)
@@ -316,6 +318,7 @@ async def _call_human_response_function(self) -> ChatMessageContent:
316318
return await self._human_response_function()
317319
return self._human_response_function() # type: ignore[return-value]
318320

321+
@ActorBase.exception_handler
319322
async def _invoke_agent_with_potentially_no_response(
320323
self, additional_messages: DefaultTypeAlias | None = None, **kwargs
321324
) -> ChatMessageContent | None:
@@ -453,16 +456,18 @@ async def _prepare(
453456
self,
454457
runtime: CoreRuntime,
455458
internal_topic_type: str,
459+
exception_callback: Callable[[BaseException], None],
456460
result_callback: Callable[[DefaultTypeAlias], Awaitable[None]],
457461
) -> None:
458462
"""Register the actors and orchestrations with the runtime and add the required subscriptions."""
459-
await self._register_members(runtime, internal_topic_type, result_callback)
463+
await self._register_members(runtime, internal_topic_type, exception_callback, result_callback)
460464
await self._add_subscriptions(runtime, internal_topic_type)
461465

462466
async def _register_members(
463467
self,
464468
runtime: CoreRuntime,
465469
internal_topic_type: str,
470+
exception_callback: Callable[[BaseException], None],
466471
result_callback: Callable[[DefaultTypeAlias], Awaitable[None]] | None = None,
467472
) -> None:
468473
"""Register the members with the runtime."""
@@ -476,6 +481,7 @@ async def _register_helper(agent: Agent) -> None:
476481
agent,
477482
internal_topic_type,
478483
handoff_connections,
484+
exception_callback,
479485
result_callback=result_callback,
480486
agent_response_callback=self._agent_response_callback,
481487
streaming_agent_response_callback=self._streaming_agent_response_callback,

0 commit comments

Comments
 (0)