Skip to content

Commit 9c09457

Browse files
authored
Python: Add declarative workflow runtime (#2815)
* Further support for declarative python workflows * Add tests. Clean up for typing and formatting * Improvements and cleanup * Typing cleanup. Improve docstrings * Proper code in docstrings * Fix malformed code-block directive in docstring * Remove dead links * PR feedback * Address PR feedback * Address PR feedback * Remove sl * Update devui frontend * More cleanup * Fix uv lock * Skip Py 3.14 tests as powerfx doesn't support it * Fix mypy error * Fix for tool calls * Removed stale docstring * Fix lint * Standardize on .NET namespaces. Revert DevUI changes (bring in later) * Implement remaining items for Python declarative support to match dotnet
1 parent b2893fb commit 9c09457

File tree

79 files changed

+18312
-113
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+18312
-113
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,3 +226,4 @@ local.settings.json
226226

227227
# Database files
228228
*.db
229+
python/dotnet-ref

python/.vscode/settings.json

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818
},
1919
"python.analysis.autoFormatStrings": true,
2020
"python.analysis.importFormat": "relative",
21-
"python.analysis.exclude": [
22-
"samples/semantic-kernel-migration"
23-
],
2421
"python.analysis.packageIndexDepths": [
2522
{
2623
"name": "agent_framework",

python/packages/core/agent_framework/_workflows/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
Case,
2222
Default,
2323
Edge,
24+
EdgeCondition,
2425
FanInEdgeGroup,
2526
FanOutEdgeGroup,
2627
SingleEdgeGroup,
@@ -132,6 +133,7 @@
132133
"ConcurrentBuilder",
133134
"Default",
134135
"Edge",
136+
"EdgeCondition",
135137
"EdgeDuplicationError",
136138
"Executor",
137139
"ExecutorCompletedEvent",

python/packages/core/agent_framework/_workflows/_edge.py

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
# Copyright (c) Microsoft. All rights reserved.
22

3+
import inspect
34
import logging
45
import uuid
5-
from collections.abc import Callable, Sequence
6+
from collections.abc import Awaitable, Callable, Sequence
67
from dataclasses import dataclass, field
7-
from typing import Any, ClassVar
8+
from typing import Any, ClassVar, TypeAlias, TypeVar
89

910
from ._const import INTERNAL_SOURCE_ID
1011
from ._executor import Executor
1112
from ._model_utils import DictConvertible, encode_value
1213

1314
logger = logging.getLogger(__name__)
1415

16+
# Type alias for edge condition functions.
17+
# Conditions receive the message data and return bool (sync or async).
18+
EdgeCondition: TypeAlias = Callable[[Any], bool | Awaitable[bool]]
19+
1520

1621
def _extract_function_name(func: Callable[..., Any]) -> str:
1722
"""Map a Python callable to a concise, human-focused identifier.
@@ -71,26 +76,27 @@ class Edge(DictConvertible):
7176
serialising the edge down to primitives we can reconstruct the topology of
7277
a workflow irrespective of the original Python process.
7378
79+
Edge conditions receive the message data and return a boolean (sync or async).
80+
7481
Examples:
7582
.. code-block:: python
7683
77-
edge = Edge(source_id="ingest", target_id="score", condition=lambda payload: payload["ready"])
78-
assert edge.should_route({"ready": True}) is True
79-
assert edge.should_route({"ready": False}) is False
84+
edge = Edge(source_id="ingest", target_id="score", condition=lambda data: data["ready"])
85+
assert await edge.should_route({"ready": True}) is True
8086
"""
8187

8288
ID_SEPARATOR: ClassVar[str] = "->"
8389

8490
source_id: str
8591
target_id: str
8692
condition_name: str | None
87-
_condition: Callable[[Any], bool] | None = field(default=None, repr=False, compare=False)
93+
_condition: EdgeCondition | None = field(default=None, repr=False, compare=False)
8894

8995
def __init__(
9096
self,
9197
source_id: str,
9298
target_id: str,
93-
condition: Callable[[Any], bool] | None = None,
99+
condition: EdgeCondition | None = None,
94100
*,
95101
condition_name: str | None = None,
96102
) -> None:
@@ -103,9 +109,9 @@ def __init__(
103109
target_id:
104110
Canonical identifier of the downstream executor instance.
105111
condition:
106-
Optional predicate that receives the message payload and returns
107-
`True` when the edge should be traversed. When omitted, the edge is
108-
considered unconditionally active.
112+
Optional predicate that receives the message data and returns
113+
`True` when the edge should be traversed. Can be sync or async.
114+
When omitted, the edge is unconditionally active.
109115
condition_name:
110116
Optional override that pins a human-friendly name for the condition
111117
when the callable cannot be introspected (for example after
@@ -125,7 +131,9 @@ def __init__(
125131
self.source_id = source_id
126132
self.target_id = target_id
127133
self._condition = condition
128-
self.condition_name = _extract_function_name(condition) if condition is not None else condition_name
134+
self.condition_name = (
135+
_extract_function_name(condition) if condition is not None and condition_name is None else condition_name
136+
)
129137

130138
@property
131139
def id(self) -> str:
@@ -144,25 +152,44 @@ def id(self) -> str:
144152
"""
145153
return f"{self.source_id}{self.ID_SEPARATOR}{self.target_id}"
146154

147-
def should_route(self, data: Any) -> bool:
148-
"""Evaluate the edge predicate against an incoming payload.
155+
@property
156+
def has_condition(self) -> bool:
157+
"""Check if this edge has a condition.
158+
159+
Returns True if the edge was configured with a condition function.
160+
"""
161+
return self._condition is not None
162+
163+
async def should_route(self, data: Any) -> bool:
164+
"""Evaluate the edge predicate against payload.
149165
150166
When the edge was defined without an explicit predicate the method
151167
returns `True`, signalling an unconditional routing rule. Otherwise the
152168
user-supplied callable decides whether the message should proceed along
153169
this edge. Any exception raised by the callable is deliberately allowed
154170
to surface to the caller to avoid masking logic bugs.
155171
172+
The condition receives the message data and may be sync or async.
173+
174+
Args:
175+
data: The message payload
176+
177+
Returns:
178+
True if the edge should be traversed, False otherwise.
179+
156180
Examples:
157181
.. code-block:: python
158182
159-
edge = Edge("stage1", "stage2", condition=lambda payload: payload["score"] > 0.8)
160-
assert edge.should_route({"score": 0.9}) is True
161-
assert edge.should_route({"score": 0.4}) is False
183+
edge = Edge("stage1", "stage2", condition=lambda data: data["score"] > 0.8)
184+
assert await edge.should_route({"score": 0.9}) is True
185+
assert await edge.should_route({"score": 0.4}) is False
162186
"""
163187
if self._condition is None:
164188
return True
165-
return self._condition(data)
189+
result = self._condition(data)
190+
if inspect.isawaitable(result):
191+
return bool(await result)
192+
return bool(result)
166193

167194
def to_dict(self) -> dict[str, Any]:
168195
"""Produce a JSON-serialisable view of the edge metadata.
@@ -281,6 +308,8 @@ class EdgeGroup(DictConvertible):
281308

282309
from builtins import type as builtin_type
283310

311+
_T_EdgeGroup = TypeVar("_T_EdgeGroup", bound="EdgeGroup")
312+
284313
_TYPE_REGISTRY: ClassVar[dict[str, builtin_type["EdgeGroup"]]] = {}
285314

286315
def __init__(
@@ -363,7 +392,7 @@ def to_dict(self) -> dict[str, Any]:
363392
}
364393

365394
@classmethod
366-
def register(cls, subclass: builtin_type["EdgeGroup"]) -> builtin_type["EdgeGroup"]:
395+
def register(cls, subclass: builtin_type[_T_EdgeGroup]) -> builtin_type[_T_EdgeGroup]:
367396
"""Register a subclass so deserialisation can recover the right type.
368397
369398
Registration is typically performed via the decorator syntax applied to
@@ -443,12 +472,18 @@ def __init__(
443472
self,
444473
source_id: str,
445474
target_id: str,
446-
condition: Callable[[Any], bool] | None = None,
475+
condition: EdgeCondition | None = None,
447476
*,
448477
id: str | None = None,
449478
) -> None:
450479
"""Create a one-to-one edge group between two executors.
451480
481+
Args:
482+
source_id: The source executor ID.
483+
target_id: The target executor ID.
484+
condition: Optional condition function `(data) -> bool | Awaitable[bool]`.
485+
id: Optional explicit ID for the edge group.
486+
452487
Examples:
453488
.. code-block:: python
454489

python/packages/core/agent_framework/_workflows/_edge_runner.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ async def send_message(self, message: Message, shared_state: SharedState, ctx: R
112112
return False
113113

114114
if self._can_handle(self._edge.target_id, message):
115-
if self._edge.should_route(message.data):
115+
route_result = await self._edge.should_route(message.data)
116+
117+
if route_result:
116118
span.set_attributes({
117119
OtelAttr.EDGE_GROUP_DELIVERED: True,
118120
OtelAttr.EDGE_GROUP_DELIVERY_STATUS: EdgeGroupDeliveryStatus.DELIVERED.value,
@@ -162,8 +164,8 @@ def __init__(self, edge_group: FanOutEdgeGroup, executors: dict[str, Executor])
162164

163165
async def send_message(self, message: Message, shared_state: SharedState, ctx: RunnerContext) -> bool:
164166
"""Send a message through all edges in the fan-out edge group."""
165-
deliverable_edges = []
166-
single_target_edge = None
167+
deliverable_edges: list[Edge] = []
168+
single_target_edge: Edge | None = None
167169
# Process routing logic within span
168170
with create_edge_group_processing_span(
169171
self._edge_group.__class__.__name__,
@@ -192,7 +194,9 @@ async def send_message(self, message: Message, shared_state: SharedState, ctx: R
192194
if message.target_id in selection_results:
193195
edge = self._target_map.get(message.target_id)
194196
if edge and self._can_handle(edge.target_id, message):
195-
if edge.should_route(message.data):
197+
route_result = await edge.should_route(message.data)
198+
199+
if route_result:
196200
span.set_attributes({
197201
OtelAttr.EDGE_GROUP_DELIVERED: True,
198202
OtelAttr.EDGE_GROUP_DELIVERY_STATUS: EdgeGroupDeliveryStatus.DELIVERED.value,
@@ -223,8 +227,10 @@ async def send_message(self, message: Message, shared_state: SharedState, ctx: R
223227
# If no target ID, send the message to the selected targets
224228
for target_id in selection_results:
225229
edge = self._target_map[target_id]
226-
if self._can_handle(edge.target_id, message) and edge.should_route(message.data):
227-
deliverable_edges.append(edge)
230+
if self._can_handle(edge.target_id, message):
231+
route_result = await edge.should_route(message.data)
232+
if route_result:
233+
deliverable_edges.append(edge)
228234

229235
if len(deliverable_edges) > 0:
230236
span.set_attributes({

python/packages/core/agent_framework/_workflows/_group_chat.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from ._base_group_chat_orchestrator import BaseGroupChatOrchestrator
3636
from ._checkpoint import CheckpointStorage
3737
from ._conversation_history import ensure_author, latest_user_message
38+
from ._edge import EdgeCondition
3839
from ._executor import Executor, handler
3940
from ._orchestration_request_info import RequestInfoInterceptor
4041
from ._participant_utils import GroupChatParticipantSpec, prepare_participant_metadata, wrap_participant
@@ -213,7 +214,7 @@ class _GroupChatConfig:
213214
# region Default participant factory
214215

215216
_GroupChatOrchestratorFactory: TypeAlias = Callable[[_GroupChatConfig], Executor]
216-
_InterceptorSpec: TypeAlias = tuple[Callable[[_GroupChatConfig], Executor], Callable[[Any], bool]]
217+
_InterceptorSpec: TypeAlias = tuple[Callable[[_GroupChatConfig], Executor], EdgeCondition]
217218

218219

219220
def _default_participant_factory(
@@ -1701,7 +1702,7 @@ def with_request_handler(
17011702
self,
17021703
handler: Callable[[_GroupChatConfig], Executor] | Executor,
17031704
*,
1704-
condition: Callable[[Any], bool],
1705+
condition: EdgeCondition,
17051706
) -> "GroupChatBuilder":
17061707
"""Register an interceptor factory that creates executors for special requests.
17071708

python/packages/core/agent_framework/_workflows/_runner.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ def _normalize_message_payload(message: Message) -> None:
168168
# Route all messages through normal workflow edges
169169
associated_edge_runners = self._edge_runner_map.get(source_executor_id, [])
170170
if not associated_edge_runners:
171-
logger.warning(f"No outgoing edges found for executor {source_executor_id}; dropping messages.")
171+
# This is expected for terminal nodes (e.g., EndWorkflow, last action in workflow)
172+
logger.debug(f"No outgoing edges found for executor {source_executor_id}; dropping messages.")
172173
return
173174

174175
for message in messages:

python/packages/core/agent_framework/_workflows/_workflow_builder.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from ._edge import (
1919
Case,
2020
Default,
21+
EdgeCondition,
2122
EdgeGroup,
2223
FanInEdgeGroup,
2324
FanOutEdgeGroup,
@@ -48,12 +49,12 @@ class _EdgeRegistration:
4849
Args:
4950
source: The registered source name.
5051
target: The registered target name.
51-
condition: An optional condition function for the edge.
52+
condition: An optional condition function `(data) -> bool | Awaitable[bool]`.
5253
"""
5354

5455
source: str
5556
target: str
56-
condition: Callable[[Any], bool] | None = None
57+
condition: EdgeCondition | None = None
5758

5859

5960
@dataclass
@@ -437,7 +438,10 @@ def add_agent(
437438
"Consider using register_agent() for lazy initialization instead."
438439
)
439440
executor = self._maybe_wrap_agent(
440-
agent, agent_thread=agent_thread, output_response=output_response, executor_id=id
441+
agent,
442+
agent_thread=agent_thread,
443+
output_response=output_response,
444+
executor_id=id,
441445
)
442446
self._add_executor(executor)
443447
return self
@@ -446,7 +450,7 @@ def add_edge(
446450
self,
447451
source: Executor | AgentProtocol | str,
448452
target: Executor | AgentProtocol | str,
449-
condition: Callable[[Any], bool] | None = None,
453+
condition: EdgeCondition | None = None,
450454
) -> Self:
451455
"""Add a directed edge between two executors.
452456
@@ -456,13 +460,14 @@ def add_edge(
456460
Args:
457461
source: The source executor or registered name of the source factory for the edge.
458462
target: The target executor or registered name of the target factory for the edge.
459-
condition: An optional condition function that determines whether the edge
460-
should be traversed based on the message.
463+
condition: An optional condition function `(data) -> bool | Awaitable[bool]`
464+
that determines whether the edge should be traversed.
465+
Example: `lambda data: data["ready"]`.
461466
462-
Note: If instances are provided for both source and target, they will be shared across
463-
all workflow instances created from the built Workflow. To avoid this, consider
464-
registering the executors and agents using `register_executor` and `register_agent`
465-
and referencing them by factory name for lazy initialization instead.
467+
Note: If instances are provided for both source and target, they will be shared across
468+
all workflow instances created from the built Workflow. To avoid this, consider
469+
registering the executors and agents using `register_executor` and `register_agent`
470+
and referencing them by factory name for lazy initialization instead.
466471
467472
Returns:
468473
Self: The WorkflowBuilder instance for method chaining.
@@ -496,12 +501,6 @@ async def process(self, count: int, ctx: WorkflowContext[Never, str]) -> None:
496501
.build()
497502
)
498503
499-
500-
# With a condition
501-
def only_large_numbers(msg: int) -> bool:
502-
return msg > 100
503-
504-
505504
workflow = (
506505
WorkflowBuilder()
507506
.register_executor(lambda: ProcessorA(id="a"), name="ProcessorA")
@@ -529,7 +528,7 @@ def only_large_numbers(msg: int) -> bool:
529528
target_exec = self._maybe_wrap_agent(target) # type: ignore[arg-type]
530529
source_id = self._add_executor(source_exec)
531530
target_id = self._add_executor(target_exec)
532-
self._edge_groups.append(SingleEdgeGroup(source_id, target_id, condition)) # type: ignore[call-arg]
531+
self._edge_groups.append(SingleEdgeGroup(source_id, target_id, condition))
533532
return self
534533

535534
def add_fan_out_edges(
@@ -1141,7 +1140,9 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
11411140
self._checkpoint_storage = checkpoint_storage
11421141
return self
11431142

1144-
def _resolve_edge_registry(self) -> tuple[Executor, list[Executor], list[EdgeGroup]]:
1143+
def _resolve_edge_registry(
1144+
self,
1145+
) -> tuple[Executor, list[Executor], list[EdgeGroup]]:
11451146
"""Resolve deferred edge registrations into executors and edge groups."""
11461147
if not self._start_executor:
11471148
raise ValueError("Starting executor must be set using set_start_executor before building the workflow.")
@@ -1211,7 +1212,11 @@ def _get_executor(name: str) -> Executor:
12111212
if start_executor is None:
12121213
raise ValueError("Failed to resolve starting executor from registered factories.")
12131214

1214-
return start_executor, list(executor_id_to_instance.values()), deferred_edge_groups
1215+
return (
1216+
start_executor,
1217+
list(executor_id_to_instance.values()),
1218+
deferred_edge_groups,
1219+
)
12151220

12161221
def build(self) -> Workflow:
12171222
"""Build and return the constructed workflow.

0 commit comments

Comments
 (0)