diff --git a/python/packages/chatkit/agent_framework_chatkit/_converter.py b/python/packages/chatkit/agent_framework_chatkit/_converter.py index 4c911f5604..0adf040156 100644 --- a/python/packages/chatkit/agent_framework_chatkit/_converter.py +++ b/python/packages/chatkit/agent_framework_chatkit/_converter.py @@ -27,6 +27,7 @@ EndOfTurnItem, HiddenContextItem, ImageAttachment, + SDKHiddenContextItem, TaskItem, ThreadItem, UserMessageItem, @@ -180,8 +181,10 @@ async def fetch_data(attachment_id: str) -> bytes: # Subclasses can override this method to provide custom handling return None - def hidden_context_to_input(self, item: HiddenContextItem) -> ChatMessage | list[ChatMessage] | None: - """Convert a ChatKit HiddenContextItem to Agent Framework ChatMessage(s). + def hidden_context_to_input( + self, item: HiddenContextItem | SDKHiddenContextItem + ) -> ChatMessage | list[ChatMessage] | None: + """Convert a ChatKit HiddenContextItem or SDKHiddenContextItem to Agent Framework ChatMessage(s). This method is called internally by `to_agent_input()`. Override this method to customize how hidden context is converted. @@ -522,6 +525,9 @@ async def _thread_item_to_input_item( case HiddenContextItem(): out = self.hidden_context_to_input(item) or [] return out if isinstance(out, list) else [out] + case SDKHiddenContextItem(): + out = self.hidden_context_to_input(item) or [] + return out if isinstance(out, list) else [out] case _: assert_never(item) diff --git a/python/packages/chatkit/pyproject.toml b/python/packages/chatkit/pyproject.toml index 1e2e7bdbd8..a987974a44 100644 --- a/python/packages/chatkit/pyproject.toml +++ b/python/packages/chatkit/pyproject.toml @@ -23,7 +23,7 @@ classifiers = [ ] dependencies = [ "agent-framework-core", - "openai-chatkit>=1.1.0,<2.0.0", + "openai-chatkit>=1.4.0,<2.0.0", ] [tool.uv] diff --git a/python/packages/core/agent_framework/_workflows/__init__.py b/python/packages/core/agent_framework/_workflows/__init__.py index 35c9f11501..990264df41 100644 --- a/python/packages/core/agent_framework/_workflows/__init__.py +++ b/python/packages/core/agent_framework/_workflows/__init__.py @@ -61,6 +61,8 @@ GroupChatDirective, GroupChatStateSnapshot, ManagerDirectiveModel, + ManagerSelectionRequest, + ManagerSelectionResponse, ) from ._handoff import HandoffBuilder, HandoffUserInputRequest from ._magentic import ( @@ -147,6 +149,8 @@ "MagenticPlanReviewReply", "MagenticPlanReviewRequest", "ManagerDirectiveModel", + "ManagerSelectionRequest", + "ManagerSelectionResponse", "Message", "OrchestrationState", "RequestInfoEvent", diff --git a/python/packages/core/agent_framework/_workflows/_group_chat.py b/python/packages/core/agent_framework/_workflows/_group_chat.py index 84859a4f0c..78ddb5c2eb 100644 --- a/python/packages/core/agent_framework/_workflows/_group_chat.py +++ b/python/packages/core/agent_framework/_workflows/_group_chat.py @@ -24,13 +24,12 @@ from collections.abc import Awaitable, Callable, Mapping, Sequence from dataclasses import dataclass, field from types import MappingProxyType -from typing import Any, TypeAlias +from typing import Any, TypeAlias, cast from uuid import uuid4 from pydantic import BaseModel, Field -from .._agents import AgentProtocol -from .._clients import ChatClientProtocol +from .._agents import AgentProtocol, ChatAgent from .._types import ChatMessage, Role from ._agent_executor import AgentExecutorRequest, AgentExecutorResponse from ._base_group_chat_orchestrator import BaseGroupChatOrchestrator @@ -87,6 +86,75 @@ class GroupChatDirective: final_message: ChatMessage | None = None +@dataclass +class ManagerSelectionRequest: + """Request sent to manager agent for next speaker selection. + + This dataclass packages the full conversation state and task context + for the manager agent to analyze and make a speaker selection decision. + + Attributes: + task: Original user task message + participants: Mapping of participant names to their descriptions + conversation: Full conversation history including all messages + round_index: Number of manager selection rounds completed so far + metadata: Optional metadata for extensibility + """ + + task: ChatMessage + participants: dict[str, str] # type: ignore + conversation: list[ChatMessage] # type: ignore + round_index: int + metadata: dict[str, Any] | None = None + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + "task": self.task.to_dict(), + "participants": dict(self.participants), + "conversation": [msg.to_dict() for msg in self.conversation], + "round_index": self.round_index, + "metadata": self.metadata, + } + + +class ManagerSelectionResponse(BaseModel): + """Response from manager agent with speaker selection decision. + + The manager agent must produce this structure (or compatible dict/JSON) + to communicate its decision back to the orchestrator. + + Attributes: + selected_participant: Name of participant to speak next (None = finish conversation) + instruction: Optional instruction to provide to the selected participant + finish: Whether the conversation should be completed + final_message: Optional final message string when finishing conversation (will be converted to ChatMessage) + """ + + model_config = {"extra": "forbid"} + + selected_participant: str | None = None + instruction: str | None = None + finish: bool = False + final_message: str | None = Field(default=None, description="Optional text content for final message") + + @staticmethod + def from_dict(data: dict[str, Any]) -> "ManagerSelectionResponse": + """Create from dictionary representation.""" + return ManagerSelectionResponse( + selected_participant=data.get("selected_participant"), + instruction=data.get("instruction"), + finish=data.get("finish", False), + final_message=data.get("final_message"), + ) + + def get_final_message_as_chat_message(self) -> ChatMessage | None: + """Convert final_message string to ChatMessage if present.""" + if self.final_message: + return ChatMessage(role=Role.ASSISTANT, text=self.final_message) + return None + + # endregion @@ -112,17 +180,23 @@ class _GroupChatConfig: """Internal: Configuration passed to factories during workflow assembly. Attributes: - manager: Manager instance responsible for orchestration decisions (None when custom factory handles it) + manager: Manager callable for orchestration decisions (used by set_select_speakers_func) + manager_participant: Manager agent/executor instance (used by set_manager) manager_name: Display name for the manager in conversation history participants: Mapping of participant names to their specifications max_rounds: Optional limit on manager selection rounds to prevent infinite loops + termination_condition: Optional callable that halts the conversation when it returns True orchestrator: Orchestrator executor instance (populated during build) + participant_aliases: Mapping of aliases to executor IDs + participant_executors: Mapping of participant names to their executor instances """ manager: _GroupChatManagerFn | None + manager_participant: AgentProtocol | Executor | None manager_name: str participants: Mapping[str, GroupChatParticipantSpec] max_rounds: int | None = None + termination_condition: Callable[[list[ChatMessage]], bool | Awaitable[bool]] | None = None orchestrator: Executor | None = None participant_aliases: dict[str, str] = field(default_factory=dict) # type: ignore[type-arg] participant_executors: dict[str, Executor] = field(default_factory=dict) # type: ignore[type-arg] @@ -220,6 +294,7 @@ class GroupChatOrchestratorExecutor(BaseGroupChatOrchestrator): participants: Mapping of participant names to descriptions (for manager context) manager_name: Display name for manager in conversation history max_rounds: Optional limit on manager selection rounds (None = unlimited) + termination_condition: Optional callable that halts the conversation when it returns True executor_id: Optional custom ID for observability (auto-generated if not provided) """ @@ -230,6 +305,7 @@ def __init__( participants: Mapping[str, str], manager_name: str, max_rounds: int | None = None, + termination_condition: Callable[[list[ChatMessage]], bool | Awaitable[bool]] | None = None, executor_id: str | None = None, ) -> None: super().__init__(executor_id or f"groupchat_orchestrator_{uuid4().hex[:8]}") @@ -237,9 +313,11 @@ def __init__( self._participants = dict(participants) self._manager_name = manager_name self._max_rounds = max_rounds + self._termination_condition = termination_condition self._history: list[_GroupChatTurn] = [] self._task_message: ChatMessage | None = None self._pending_agent: str | None = None + self._pending_finalization: bool = False # Stashes the initial conversation list until _handle_task_message normalizes it into _conversation. self._pending_initial_conversation: list[ChatMessage] | None = None @@ -317,10 +395,75 @@ def _restore_pattern_metadata(self, metadata: dict[str, Any]) -> None: for turn in metadata["history"] ] + async def _complete_on_termination( + self, + ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, list[ChatMessage]], + ) -> bool: + """Finish the conversation early when the termination condition is met.""" + if not await self._check_termination(): + return False + + if self._is_manager_agent(): + if self._pending_finalization: + return True + + self._pending_finalization = True + termination_prompt = ChatMessage( + role=Role.SYSTEM, + text="Termination condition met. Provide a final manager summary and finish the conversation.", + ) + manager_conversation = [ + self._build_manager_context_message(), + termination_prompt, + *list(self._conversation), + ] + self._pending_agent = self._manager_name + await self._route_to_participant( + participant_name=self._manager_name, + conversation=manager_conversation, + ctx=ctx, + instruction="", + task=self._task_message, + metadata={"termination_condition": True}, + ) + return True + + final_message: ChatMessage | None = None + if self._manager is not None and not self._is_manager_agent(): + try: + directive = await self._manager(self._build_state()) + except Exception: + logger.warning("Manager finalization failed during termination; using default termination message.") + else: + if directive.final_message is not None: + final_message = ensure_author(directive.final_message, self._manager_name) + elif directive.finish: + final_message = ensure_author( + self._create_completion_message( + text="Conversation completed.", + reason="termination_condition_manager_finish", + ), + self._manager_name, + ) + + if final_message is None: + final_message = ensure_author( + self._create_completion_message( + text="Conversation halted after termination condition was met.", + reason="termination_condition", + ), + self._manager_name, + ) + self._conversation.append(final_message) + self._history.append(_GroupChatTurn(self._manager_name, "manager", final_message)) + self._pending_agent = None + await ctx.yield_output(list(self._conversation)) + return True + async def _apply_directive( self, directive: GroupChatDirective, - ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, ChatMessage], + ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, list[ChatMessage]], ) -> None: """Execute a manager directive by either finishing the workflow or routing to a participant. @@ -366,7 +509,7 @@ async def _apply_directive( self._conversation.extend((final_message,)) self._history.append(_GroupChatTurn(self._manager_name, "manager", final_message)) self._pending_agent = None - await ctx.yield_output(final_message) + await ctx.yield_output(list(self._conversation)) return agent_name = directive.agent_name @@ -386,6 +529,9 @@ async def _apply_directive( self._conversation.extend((manager_message,)) self._history.append(_GroupChatTurn(self._manager_name, "manager", manager_message)) + if await self._complete_on_termination(ctx): + return + self._pending_agent = agent_name self._increment_round() @@ -415,7 +561,7 @@ async def _ingest_participant_message( self, participant_name: str, message: ChatMessage, - ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, ChatMessage], + ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, list[ChatMessage]], ) -> None: """Common response ingestion logic shared by agent and custom participants.""" if participant_name not in self._participants: @@ -426,17 +572,213 @@ async def _ingest_participant_message( self._history.append(_GroupChatTurn(participant_name, "agent", message)) self._pending_agent = None + if await self._complete_on_termination(ctx): + return + if self._check_round_limit(): - await ctx.yield_output( - self._create_completion_message( - text="Conversation halted after reaching manager round limit.", - reason="max_rounds reached after response", + final_message = self._create_completion_message( + text="Conversation halted after reaching manager round limit.", + reason="max_rounds reached after response", + ) + self._conversation.extend((final_message,)) + self._history.append(_GroupChatTurn(self._manager_name, "manager", final_message)) + await ctx.yield_output(list(self._conversation)) + return + + # Query manager for next speaker selection + if self._is_manager_agent(): + # Agent-based manager: route request through workflow graph + # Prepend system message with participant context + manager_conversation = [self._build_manager_context_message(), *list(self._conversation)] + await self._route_to_participant( + participant_name=self._manager_name, + conversation=manager_conversation, + ctx=ctx, + instruction="", + task=self._task_message, + metadata=None, + ) + else: + # Callable manager: invoke directly + directive = await self._manager(self._build_state()) + await self._apply_directive(directive, ctx) + + def _is_manager_agent(self) -> bool: + """Check if orchestrator is using an agent-based manager (vs callable manager).""" + return self._registry.is_participant_registered(self._manager_name) + + def _build_manager_context_message(self) -> ChatMessage: + """Build system message with participant context for manager agent. + + This message is prepended to the conversation when querying the manager + to provide up-to-date participant information for selection decisions. + + Returns: + System message with participant names and descriptions + """ + participant_list = "\n".join(f"- {name}: {desc}" for name, desc in self._participants.items()) + context_text = ( + "Available participants:\n" + f"{participant_list}\n\n" + "IMPORTANT: Choose only from these exact participant names (case-sensitive)." + ) + return ChatMessage(role=Role.SYSTEM, text=context_text) + + def _parse_manager_selection(self, response: AgentExecutorResponse) -> ManagerSelectionResponse: + """Extract manager selection decision from agent response. + + Attempts to parse structured output from the manager agent using multiple strategies: + 1. response.value (structured output from response_format) + 2. JSON parsing from message text + 3. Fallback error handling + + Args: + response: AgentExecutor response from manager agent + + Returns: + Parsed ManagerSelectionResponse with speaker selection + + Raises: + RuntimeError: If manager response cannot be parsed into valid selection + """ + import json + + # Strategy 1: agent_run_response.value (structured output) + agent_value = response.agent_run_response.value + if agent_value is not None: + if isinstance(agent_value, ManagerSelectionResponse): + return agent_value + if isinstance(agent_value, dict): + return ManagerSelectionResponse.from_dict(cast(dict[str, Any], agent_value)) + if isinstance(agent_value, str): + try: + data = json.loads(agent_value) + return ManagerSelectionResponse.from_dict(data) + except (json.JSONDecodeError, TypeError, KeyError) as e: + raise RuntimeError(f"Manager response.value contains invalid JSON: {e}") from e + + # Strategy 2: Parse from message text + messages = response.agent_run_response.messages or [] + if messages: + last_msg = messages[-1] + text = last_msg.text or "" + try: + return ManagerSelectionResponse.model_validate_json(text) + except (json.JSONDecodeError, TypeError, KeyError): + pass + + # Fallback: Cannot parse manager decision + raise RuntimeError( + "Manager response did not contain valid selection data. " + "Ensure manager agent uses response_format=ManagerSelectionResponse " + "or returns compatible JSON structure." + ) + + async def _handle_manager_response( + self, + response: AgentExecutorResponse, + ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, list[ChatMessage]], + ) -> None: + """Process manager agent's speaker selection decision. + + Parses the manager's response and either finishes the conversation or routes + to the selected participant. This method implements the core orchestration + logic for agent-based managers. + + Args: + response: AgentExecutor response from manager agent + ctx: Workflow context for routing and output + + Behavior: + - Parses manager selection from response + - If finish=True: yields final message and completes workflow + - If participant selected: routes request to that participant + - Validates selected participant exists + - Enforces round limits if configured + + Raises: + ValueError: If manager selects invalid/unknown participant + RuntimeError: If manager response cannot be parsed + """ + selection = self._parse_manager_selection(response) + + if self._pending_finalization: + self._pending_finalization = False + final_message_obj = selection.get_final_message_as_chat_message() + if final_message_obj is None: + final_message_obj = self._create_completion_message( + text="Conversation halted after termination condition was met.", + reason="termination_condition_manager", + ) + final_message_obj = ensure_author(final_message_obj, self._manager_name) + + self._conversation.append(final_message_obj) + self._history.append(_GroupChatTurn(self._manager_name, "manager", final_message_obj)) + self._pending_agent = None + await ctx.yield_output(list(self._conversation)) + return + + if selection.finish: + # Manager decided to complete conversation + final_message_obj = selection.get_final_message_as_chat_message() + if final_message_obj is None: + final_message_obj = self._create_completion_message( + text="Conversation completed.", + reason="manager_finish", ) + final_message_obj = ensure_author(final_message_obj, self._manager_name) + + self._conversation.append(final_message_obj) + self._history.append(_GroupChatTurn(self._manager_name, "manager", final_message_obj)) + self._pending_agent = None + await ctx.yield_output(list(self._conversation)) + return + + # Manager selected next participant + selected = selection.selected_participant + if not selected: + raise ValueError("Manager selection missing selected_participant when finish=False.") + if selected not in self._participants: + raise ValueError(f"Manager selected unknown participant: '{selected}'") + + # Route to selected participant + instruction = selection.instruction or "" + conversation = list(self._conversation) + if instruction: + manager_message = ensure_author( + self._create_completion_message(text=instruction, reason="manager_instruction"), + self._manager_name, ) + conversation.append(manager_message) + self._conversation.append(manager_message) + self._history.append(_GroupChatTurn(self._manager_name, "manager", manager_message)) + + if await self._complete_on_termination(ctx): return - directive = await self._manager(self._build_state()) - await self._apply_directive(directive, ctx) + self._pending_agent = selected + self._increment_round() + + await self._route_to_participant( + participant_name=selected, + conversation=conversation, + ctx=ctx, + instruction=instruction, + task=self._task_message, + metadata=None, + ) + + if self._check_round_limit(): + await self._apply_directive( + GroupChatDirective( + finish=True, + final_message=self._create_completion_message( + text="Conversation halted after reaching manager round limit.", + reason="max_rounds reached after manager selection", + ), + ), + ctx, + ) @staticmethod def _extract_agent_message(response: AgentExecutorResponse, participant_name: str) -> ChatMessage: @@ -469,7 +811,7 @@ def _extract_agent_message(response: AgentExecutorResponse, participant_name: st async def _handle_task_message( self, task_message: ChatMessage, - ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, ChatMessage], + ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, list[ChatMessage]], ) -> None: """Initialize orchestrator state and start the manager-directed conversation loop. @@ -519,14 +861,33 @@ async def _handle_task_message( self._history = [_GroupChatTurn("user", "user", task_message)] self._pending_agent = None self._round_index = 0 - directive = await self._manager(self._build_state()) - await self._apply_directive(directive, ctx) + + if await self._complete_on_termination(ctx): + return + + # Query manager for first speaker selection + if self._is_manager_agent(): + # Agent-based manager: route request through workflow graph + # Prepend system message with participant context + manager_conversation = [self._build_manager_context_message(), *list(self._conversation)] + await self._route_to_participant( + participant_name=self._manager_name, + conversation=manager_conversation, + ctx=ctx, + instruction="", + task=self._task_message, + metadata=None, + ) + else: + # Callable manager: invoke directly + directive = await self._manager(self._build_state()) + await self._apply_directive(directive, ctx) @handler async def handle_str( self, task: str, - ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, ChatMessage], + ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, list[ChatMessage]], ) -> None: """Handler for string input as workflow entry point. @@ -545,7 +906,7 @@ async def handle_str( async def handle_chat_message( self, task_message: ChatMessage, - ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, ChatMessage], + ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, list[ChatMessage]], ) -> None: """Handler for ChatMessage input as workflow entry point. @@ -564,7 +925,7 @@ async def handle_chat_message( async def handle_conversation( self, conversation: list[ChatMessage], - ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, ChatMessage], + ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, list[ChatMessage]], ) -> None: """Handler for conversation history as workflow entry point. @@ -602,7 +963,7 @@ async def handle_conversation( async def handle_agent_response( self, response: _GroupChatResponseMessage, - ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, ChatMessage], + ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, list[ChatMessage]], ) -> None: """Handle responses from custom participant executors.""" await self._ingest_participant_message(response.agent_name, response.message, ctx) @@ -611,9 +972,14 @@ async def handle_agent_response( async def handle_agent_executor_response( self, response: AgentExecutorResponse, - ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, ChatMessage], + ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, list[ChatMessage]], ) -> None: - """Handle direct AgentExecutor responses.""" + """Handle responses from both manager agent and regular participants. + + Routes responses based on whether they come from the manager or a participant: + - Manager responses: parsed for speaker selection decisions + - Participant responses: ingested as conversation messages + """ participant_name = self._registry.get_participant_name(response.executor_id) if participant_name is None: logger.debug( @@ -621,8 +987,14 @@ async def handle_agent_executor_response( response.executor_id, ) return - message = self._extract_agent_message(response, participant_name) - await self._ingest_participant_message(participant_name, message, ctx) + + # Check if response is from manager agent + if participant_name == self._manager_name and self._is_manager_agent(): + await self._handle_manager_response(response, ctx) + else: + # Regular participant response + message = self._extract_agent_message(response, participant_name) + await self._ingest_participant_message(participant_name, message, ctx) def _default_orchestrator_factory(wiring: _GroupChatConfig) -> Executor: @@ -640,8 +1012,9 @@ def _default_orchestrator_factory(wiring: _GroupChatConfig) -> Executor: Behavior: - Extracts participant names and descriptions for manager context - - Forwards manager instance, manager name, and max_rounds settings + - Forwards manager instance, manager name, max_rounds, and termination_condition settings - Allows orchestrator to auto-generate its executor ID + - Supports both callable managers (set_select_speakers_func) and agent-based managers (set_manager) Why descriptions are extracted: The manager needs participant descriptions (not full specs) to make informed @@ -649,16 +1022,30 @@ def _default_orchestrator_factory(wiring: _GroupChatConfig) -> Executor: since routing is handled by the workflow graph. Raises: - RuntimeError: If manager is None (should not happen when using default factory) + RuntimeError: If neither manager nor manager_participant is configured """ - if wiring.manager is None: - raise RuntimeError("Default orchestrator factory requires a manager to be set") + if wiring.manager is None and wiring.manager_participant is None: + raise RuntimeError( + "Default orchestrator factory requires a manager to be configured. " + "Call set_manager(...) or set_select_speakers_func(...) before build()." + ) + + manager_callable = wiring.manager + if manager_callable is None: + # Keep orchestrator signature satisfied; agent managers are routed via the workflow graph + async def _agent_manager_placeholder(_: GroupChatStateSnapshot) -> GroupChatDirective: # noqa: RUF029 + raise RuntimeError( + "Manager callable invoked unexpectedly. Agent-based managers should route through the workflow graph." + ) + + manager_callable = _agent_manager_placeholder return GroupChatOrchestratorExecutor( - manager=wiring.manager, + manager=manager_callable, participants={name: spec.description for name, spec in wiring.participants.items()}, manager_name=wiring.manager_name, max_rounds=wiring.max_rounds, + termination_condition=wiring.termination_condition, ) @@ -684,8 +1071,41 @@ def assemble_group_chat_workflow( wiring.orchestrator = orchestrator workflow_builder = builder or WorkflowBuilder() - workflow_builder = workflow_builder.set_start_executor(orchestrator) + start_executor = getattr(workflow_builder, "_start_executor", None) + if start_executor is None: + workflow_builder = workflow_builder.set_start_executor(orchestrator) + + # Wire manager as participant if agent-based manager is configured + if wiring.manager_participant is not None: + manager_spec = GroupChatParticipantSpec( + name=wiring.manager_name, + participant=wiring.manager_participant, + description="Coordination manager", + ) + manager_pipeline = list(participant_factory(manager_spec, wiring)) + if not manager_pipeline: + raise ValueError("Participant factory returned empty pipeline for manager.") + manager_entry = manager_pipeline[0] + manager_exit = manager_pipeline[-1] + + # Register manager with orchestrator + register_entry = getattr(orchestrator, "register_participant_entry", None) + if callable(register_entry): + register_entry( + wiring.manager_name, + entry_id=manager_entry.id, + is_agent=not isinstance(wiring.manager_participant, Executor), + ) + + # Wire manager edges: Orchestrator ↔ Manager + workflow_builder = workflow_builder.add_edge(orchestrator, manager_entry) + for upstream, downstream in itertools.pairwise(manager_pipeline): + workflow_builder = workflow_builder.add_edge(upstream, downstream) + if manager_exit is not orchestrator: + workflow_builder = workflow_builder.add_edge(manager_exit, orchestrator) + + # Wire regular participants for name, spec in wiring.participants.items(): pipeline = list(participant_factory(spec, wiring)) if not pipeline: @@ -733,12 +1153,14 @@ class GroupChatBuilder: r"""High-level builder for manager-directed group chat workflows with dynamic orchestration. GroupChat coordinates multi-agent conversations using a manager that selects which participant - speaks next. The manager can be a simple Python function (select_speakers) or an LLM-based - selector (set_prompt_based_manager). These two approaches are mutually exclusive. + speaks next. The manager can be a simple Python function (:py:meth:`GroupChatBuilder.set_select_speakers_func`) + or an agent-based selector via :py:meth:`GroupChatBuilder.set_manager`. These two approaches are + mutually exclusive. **Core Workflow:** 1. Define participants: list of agents (uses their .name) or dict mapping names to agents - 2. Configure speaker selection: select_speakers() OR set_prompt_based_manager() (not both) + 2. Configure speaker selection: :py:meth:`GroupChatBuilder.set_select_speakers_func` OR + :py:meth:`GroupChatBuilder.set_manager` (not both) 3. Optional: set round limits, checkpointing, termination conditions 4. Build and run the workflow @@ -748,6 +1170,9 @@ class GroupChatBuilder: .. code-block:: python + from agent_framework import GroupChatBuilder, GroupChatStateSnapshot + + def select_next_speaker(state: GroupChatStateSnapshot) -> str | None: # state contains: task, participants, conversation, history, round_index if state["round_index"] >= 5: @@ -760,7 +1185,7 @@ def select_next_speaker(state: GroupChatStateSnapshot) -> str | None: workflow = ( GroupChatBuilder() - .select_speakers(select_next_speaker) + .set_select_speakers_func(select_next_speaker) .participants([researcher_agent, writer_agent]) # Uses agent.name .build() ) @@ -769,11 +1194,20 @@ def select_next_speaker(state: GroupChatStateSnapshot) -> str | None: .. code-block:: python + from agent_framework import ChatAgent from agent_framework.azure import AzureOpenAIChatClient + manager_agent = AzureOpenAIChatClient().create_agent( + instructions="Coordinate the conversation and pick the next speaker.", + name="Coordinator", + temperature=0.3, + seed=42, + max_tokens=500, + ) + workflow = ( GroupChatBuilder() - .set_prompt_based_manager(chat_client=AzureOpenAIChatClient(), display_name="Coordinator") + .set_manager(manager_agent, display_name="Coordinator") .participants([researcher, writer]) # Or use dict: researcher=r, writer=w .with_max_rounds(10) .build() @@ -782,24 +1216,24 @@ def select_next_speaker(state: GroupChatStateSnapshot) -> str | None: **Participant Specification:** Two ways to specify participants: - - List form: ``[agent1, agent2]`` - uses ``agent.name`` attribute for participant names - - Dict form: ``{name1: agent1, name2: agent2}`` - explicit name control - - Keyword form: ``participants(name1=agent1, name2=agent2)`` - explicit name control + - List form: `[agent1, agent2]` - uses `agent.name` attribute for participant names + - Dict form: `{name1: agent1, name2: agent2}` - explicit name control + - Keyword form: `participants(name1=agent1, name2=agent2)` - explicit name control **State Snapshot Structure:** - The GroupChatStateSnapshot passed to select_speakers contains: - - ``task``: ChatMessage - Original user task - - ``participants``: dict[str, str] - Mapping of participant names to descriptions - - ``conversation``: tuple[ChatMessage, ...] - Full conversation history - - ``history``: tuple[GroupChatTurn, ...] - Turn-by-turn record with speaker attribution - - ``round_index``: int - Number of manager selection rounds so far - - ``pending_agent``: str | None - Name of agent currently processing (if any) + The GroupChatStateSnapshot passed to set_select_speakers_func contains: + - `task`: ChatMessage - Original user task + - `participants`: dict[str, str] - Mapping of participant names to descriptions + - `conversation`: tuple[ChatMessage, ...] - Full conversation history + - `history`: tuple[GroupChatTurn, ...] - Turn-by-turn record with speaker attribution + - `round_index`: int - Number of manager selection rounds so far + - `pending_agent`: str | None - Name of agent currently processing (if any) **Important Constraints:** - - Cannot combine select_speakers() and set_prompt_based_manager() - choose one + - Cannot combine :py:meth:`GroupChatBuilder.set_select_speakers_func` and :py:meth:`GroupChatBuilder.set_manager` - Participant names must be unique - - When using list form, agents must have a non-empty ``name`` attribute + - When using list form, agents must have a non-empty `name` attribute """ def __init__( @@ -820,9 +1254,11 @@ def __init__( self._participants: dict[str, AgentProtocol | Executor] = {} self._participant_metadata: dict[str, Any] | None = None self._manager: _GroupChatManagerFn | None = None + self._manager_participant: AgentProtocol | Executor | None = None self._manager_name: str = "manager" self._checkpoint_storage: CheckpointStorage | None = None self._max_rounds: int | None = None + self._termination_condition: Callable[[list[ChatMessage]], bool | Awaitable[bool]] | None = None self._interceptors: list[_InterceptorSpec] = [] self._orchestrator_factory = group_chat_orchestrator(_orchestrator_factory) self._participant_factory = _participant_factory or _default_participant_factory @@ -832,68 +1268,107 @@ def _set_manager_function( manager: _GroupChatManagerFn, display_name: str | None, ) -> "GroupChatBuilder": - if self._manager is not None: + if self._manager is not None or self._manager_participant is not None: raise ValueError( "GroupChatBuilder already has a manager configured. " - "Call select_speakers(...) or set_prompt_based_manager(...) at most once." + "Call set_select_speakers_func(...) or set_manager(...) at most once." ) resolved_name = display_name or getattr(manager, "name", None) or "manager" self._manager = manager self._manager_name = resolved_name return self - def set_prompt_based_manager( + def set_manager( self, - chat_client: ChatClientProtocol, + manager: AgentProtocol | Executor, *, - instructions: str | None = None, display_name: str | None = None, ) -> "GroupChatBuilder": - r"""Configure the default prompt-based manager driven by an LLM chat client. + """Configure the manager/coordinator agent for group chat orchestration. + + The manager coordinates participants by selecting who speaks next based on + conversation state and task requirements. The manager is a full workflow + participant with access to all agent infrastructure (tools, context, observability). - The manager coordinates participants by making selection decisions based on the conversation - state, task, and participant descriptions. It uses structured output (ManagerDirectiveModel) - to ensure reliable parsing of decisions. + The manager agent must produce structured output compatible with ManagerSelectionResponse + to communicate its speaker selection decisions. Use response_format for reliable parsing. + GroupChatBuilder enforces this when the manager is a ChatAgent and rejects incompatible + response formats. Args: - chat_client: Chat completion client used to run the coordinator LLM. - instructions: System instructions to steer the coordinator's decision-making. - If not provided, uses DEFAULT_MANAGER_INSTRUCTIONS. These instructions are combined - with the task description, participant list, and structured output format to guide - the LLM in selecting the next speaker or completing the conversation. - display_name: Optional conversational display name for manager messages. + manager: Agent or executor responsible for speaker selection and coordination. + Must return ManagerSelectionResponse or compatible dict/JSON structure. + display_name: Optional name for manager messages in conversation history. + If not provided, uses manager.name for AgentProtocol or manager.id for Executor. Returns: Self for fluent chaining. - Note: - Calling this method and :meth:`set_speaker_selector` together is not allowed; choose one. + Raises: + ValueError: If manager is already configured via :py:meth:`GroupChatBuilder.set_select_speakers_func` + TypeError: If manager is not AgentProtocol or Executor instance Example: .. code-block:: python - from agent_framework import GroupChatBuilder, DEFAULT_MANAGER_INSTRUCTIONS + from agent_framework import GroupChatBuilder, ChatAgent + from agent_framework.openai import OpenAIChatClient + + # Coordinator agent - response_format is enforced to ManagerSelectionResponse + coordinator = ChatAgent( + name="Coordinator", + description="Coordinates multi-agent collaboration", + instructions=''' + You coordinate a team conversation. Review the conversation history + and select the next participant to speak. - custom_instructions = ( - DEFAULT_MANAGER_INSTRUCTIONS + "\\n\\nPrioritize the researcher for data analysis tasks." + When ready to finish, set finish=True and provide a summary in final_message. + ''', + chat_client=OpenAIChatClient(), ) workflow = ( GroupChatBuilder() - .set_prompt_based_manager(chat_client, instructions=custom_instructions, display_name="Coordinator") - .participants(researcher=researcher, writer=writer) + .set_manager(coordinator, display_name="Orchestrator") + .participants([researcher, writer]) .build() ) + + Note: + The manager agent's response_format must be ManagerSelectionResponse for structured output. + Custom response formats raise ValueError instead of being overridden. """ - manager = _PromptBasedGroupChatManager( - chat_client, - instructions=instructions, - name=display_name, - ) - return self._set_manager_function(manager, display_name) + if self._manager is not None or self._manager_participant is not None: + raise ValueError( + "GroupChatBuilder already has a manager configured. " + "Call set_select_speakers_func(...) or set_manager(...) at most once." + ) - def select_speakers( + if not isinstance(manager, (AgentProtocol, Executor)): + raise TypeError(f"Manager must be AgentProtocol or Executor instance. Got {type(manager).__name__}.") + + # Infer display name from manager if not provided + if display_name is None: + display_name = manager.id if isinstance(manager, Executor) else manager.name or "manager" + + # Enforce ManagerSelectionResponse for ChatAgent managers + if isinstance(manager, ChatAgent): + configured_format = manager.chat_options.response_format + if configured_format is None: + manager.chat_options.response_format = ManagerSelectionResponse + elif configured_format is not ManagerSelectionResponse: + configured_format_name = getattr(configured_format, "__name__", str(configured_format)) + raise ValueError( + "Manager ChatAgent response_format must be ManagerSelectionResponse. " + f"Received '{configured_format_name}' for manager '{display_name}'." + ) + + self._manager_participant = manager + self._manager_name = display_name + return self + + def set_select_speakers_func( self, selector: ( Callable[[GroupChatStateSnapshot], Awaitable[str | None]] | Callable[[GroupChatStateSnapshot], str | None] @@ -908,6 +1383,15 @@ def select_speakers( function receives an immutable snapshot of the current conversation state and returns the name of the next participant to speak, or None to finish the conversation. + The selector function can implement any logic including: + - Simple round-robin or rule-based selection + - LLM-based decision making with custom prompts + - Conversation summarization before routing to the next agent + - Custom metadata or context passing + + For advanced scenarios, return a GroupChatDirective instead of a string to include + custom instructions or metadata for the next participant. + The selector function signature: def select_next_speaker(state: GroupChatStateSnapshot) -> str | None: # state contains: task, participants, conversation, history, round_index @@ -917,6 +1401,7 @@ def select_next_speaker(state: GroupChatStateSnapshot) -> str | None: Args: selector: Function that takes GroupChatStateSnapshot and returns the next speaker's name (str) to continue the conversation, or None to finish. May be sync or async. + Can also return GroupChatDirective for advanced control (instruction, metadata). display_name: Optional name shown in conversation history for orchestrator messages (defaults to "manager"). final_message: Optional final message (or factory) emitted when selector returns None @@ -925,7 +1410,7 @@ def select_next_speaker(state: GroupChatStateSnapshot) -> str | None: Returns: Self for fluent chaining. - Example: + Example (simple): .. code-block:: python @@ -940,13 +1425,37 @@ def select_next_speaker(state: GroupChatStateSnapshot) -> str | None: workflow = ( GroupChatBuilder() - .select_speakers(select_next_speaker) + .set_select_speakers_func(select_next_speaker) .participants(researcher=researcher_agent, writer=writer_agent) .build() ) + Example (with LLM and custom instructions): + + .. code-block:: python + + from agent_framework import GroupChatDirective + + + async def llm_based_selector(state: GroupChatStateSnapshot) -> GroupChatDirective | None: + if state["round_index"] >= 5: + return GroupChatDirective(finish=True) + + # Use LLM to decide next speaker and summarize conversation + conversation_summary = await summarize_with_llm(state["conversation"]) + next_agent = await pick_agent_with_llm(state["participants"], state["task"]) + + # Pass custom instruction to the selected agent + return GroupChatDirective( + agent_name=next_agent, + instruction=f"Context summary: {conversation_summary}", + ) + + + workflow = GroupChatBuilder().set_select_speakers_func(llm_based_selector).participants(...).build() + Note: - Cannot be combined with set_prompt_based_manager(). Choose one orchestration strategy. + Cannot be combined with :py:meth:`GroupChatBuilder.set_manager`. Choose one orchestration strategy. """ manager_name = display_name or "manager" adapter = _SpeakerSelectorAdapter( @@ -985,10 +1494,7 @@ def participants( from agent_framework import GroupChatBuilder workflow = ( - GroupChatBuilder() - .set_prompt_based_manager(chat_client) - .participants([writer_agent, reviewer_agent]) - .build() + GroupChatBuilder().set_manager(manager_agent).participants([writer_agent, reviewer_agent]).build() ) """ combined: dict[str, AgentProtocol | Executor] = {} @@ -998,6 +1504,11 @@ def _add(name: str, participant: AgentProtocol | Executor) -> None: raise ValueError("participant names must be non-empty strings") if name in combined or name in self._participants: raise ValueError(f"Duplicate participant name '{name}' supplied.") + if name == self._manager_name: + raise ValueError( + f"Participant name '{name}' conflicts with manager name. " + "Manager is automatically registered as a participant." + ) combined[name] = participant if participants: @@ -1050,7 +1561,7 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "GroupCha storage = MemoryCheckpointStorage() workflow = ( GroupChatBuilder() - .set_prompt_based_manager(chat_client) + .set_manager(manager_agent) .participants(agent1=agent1, agent2=agent2) .with_checkpointing(storage) .build() @@ -1088,6 +1599,40 @@ def _factory(_: _GroupChatConfig) -> Executor: self._interceptors.append((factory, condition)) return self + def with_termination_condition( + self, + condition: Callable[[list[ChatMessage]], bool | Awaitable[bool]], + ) -> "GroupChatBuilder": + """Define a custom termination condition for the group chat workflow. + + The condition receives the full conversation (including manager and agent messages) and may be async. + When it returns True, the orchestrator halts the conversation and emits a completion message authored + by the manager. + + Example: + + .. code-block:: python + + from agent_framework import ChatMessage, GroupChatBuilder, Role + + + def stop_after_two_calls(conversation: list[ChatMessage]) -> bool: + calls = sum(1 for msg in conversation if msg.role == Role.ASSISTANT and msg.author_name == "specialist") + return calls >= 2 + + + specialist_agent = ... + workflow = ( + GroupChatBuilder() + .set_select_speakers_func(lambda _: "specialist") + .participants(specialist=specialist_agent) + .with_termination_condition(stop_after_two_calls) + .build() + ) + """ + self._termination_condition = condition + return self + def with_max_rounds(self, max_rounds: int | None) -> "GroupChatBuilder": """Set a maximum number of manager rounds to prevent infinite conversations. @@ -1109,7 +1654,7 @@ def with_max_rounds(self, max_rounds: int | None) -> "GroupChatBuilder": # Limit to 15 rounds workflow = ( GroupChatBuilder() - .set_prompt_based_manager(chat_client) + .set_manager(manager_agent) .participants(agent1=agent1, agent2=agent2) .with_max_rounds(15) .build() @@ -1117,11 +1662,7 @@ def with_max_rounds(self, max_rounds: int | None) -> "GroupChatBuilder": # Unlimited rounds workflow = ( - GroupChatBuilder() - .set_prompt_based_manager(chat_client) - .participants(agent1=agent1) - .with_max_rounds(None) - .build() + GroupChatBuilder().set_manager(manager_agent).participants(agent1=agent1).with_max_rounds(None).build() ) """ self._max_rounds = max_rounds @@ -1182,19 +1723,21 @@ def build(self) -> Workflow: from agent_framework import GroupChatBuilder # Execute the workflow - workflow = ( - GroupChatBuilder() - .set_prompt_based_manager(chat_client) - .participants(agent1=agent1, agent2=agent2) - .build() - ) + workflow = GroupChatBuilder().set_manager(manager_agent).participants(agent1=agent1, agent2=agent2).build() async for message in workflow.run("Solve this problem collaboratively"): print(message.text) """ # Manager is only required when using the default orchestrator factory # Custom factories (e.g., MagenticBuilder) provide their own orchestrator with embedded manager - if self._manager is None and self._orchestrator_factory == _default_orchestrator_factory: - raise ValueError("manager must be configured before build() when using default orchestrator") + if ( + self._manager is None + and self._manager_participant is None + and self._orchestrator_factory == _default_orchestrator_factory + ): + raise ValueError( + "manager must be configured before build() when using default orchestrator. " + "Call set_manager(...) or set_select_speakers_func(...) before build()." + ) if not self._participants: raise ValueError("participants must be configured before build()") @@ -1202,9 +1745,11 @@ def build(self) -> Workflow: participant_specs = self._build_participant_specs() wiring = _GroupChatConfig( manager=self._manager, + manager_participant=self._manager_participant, manager_name=self._manager_name, participants=participant_specs, max_rounds=self._max_rounds, + termination_condition=self._termination_condition, participant_aliases=metadata["aliases"], participant_executors=metadata["executors"], ) @@ -1262,117 +1807,6 @@ class ManagerDirectiveModel(BaseModel): ) -class _PromptBasedGroupChatManager: - """LLM-backed manager that produces directives via structured output. - - This is the default manager implementation for group chat workflows. It uses an LLM - to make speaker selection decisions based on conversation state, participant - descriptions, and custom instructions. - - Coordination strategy: - - Receives immutable state snapshot with full conversation history - - Formats system prompt with instructions, task, and participant descriptions - - Appends conversation context and uses structured output (Pydantic model) for reliable parsing - - Converts LLM response to GroupChatDirective - - Flexibility: - - Custom instructions allow domain-specific coordination strategies - - Participant descriptions guide the LLM's selection logic - - Structured output ensures reliable parsing (no regex or brittle prompts) - - Example coordination patterns: - - Round-robin: "Rotate between participants in order" - - Task-based: "Select the participant best suited for the current sub-task" - - Dependency-aware: "Only call analyst after researcher provides data" - - Args: - chat_client: ChatClientProtocol implementation for LLM inference - instructions: Custom system instructions (defaults to DEFAULT_MANAGER_INSTRUCTIONS). - These instructions are combined with the task, participant list, and - structured output format (ManagerDirectiveModel) to coordinate the conversation. - name: Display name for the manager in conversation history - - Raises: - RuntimeError: If LLM response cannot be parsed into the directive payload - If directive is missing next_agent when finish=False - If selected agent is not in participants - """ - - def __init__( - self, - chat_client: ChatClientProtocol, - *, - instructions: str | None = None, - name: str | None = None, - ) -> None: - self._chat_client = chat_client - self._instructions = instructions or DEFAULT_MANAGER_INSTRUCTIONS - self._name = name or "GroupChatManager" - - @property - def name(self) -> str: - return self._name - - async def __call__(self, state: GroupChatStateSnapshot) -> GroupChatDirective: - participants = state["participants"] - task_message = state["task"] - conversation = state["conversation"] - - participants_section = "\n".join(f"- {agent}: {description}" for agent, description in participants.items()) - - system_message = ChatMessage( - role=Role.SYSTEM, - text=( - f"{self._instructions}\n\n" - f"Task:\n{task_message.text}\n\n" - f"Participants:\n{participants_section}\n\n" - f"{DEFAULT_MANAGER_STRUCTURED_OUTPUT_PROMPT}" - ), - ) - - messages: list[ChatMessage] = [system_message, *conversation] - - response = await self._chat_client.get_response(messages, response_format=ManagerDirectiveModel) - - directive_model: ManagerDirectiveModel - if response.value is not None: - if isinstance(response.value, ManagerDirectiveModel): - directive_model = response.value - elif isinstance(response.value, str): - directive_model = ManagerDirectiveModel.model_validate_json(response.value) - elif isinstance(response.value, dict): - directive_model = ManagerDirectiveModel.model_validate(response.value) # type: ignore[arg-type] - else: - raise RuntimeError(f"Unexpected response.value type: {type(response.value)}") - elif response.messages: - text = response.messages[-1].text or "{}" - directive_model = ManagerDirectiveModel.model_validate_json(text) - else: - raise RuntimeError("LLM response did not contain structured output.") - - if directive_model.finish: - final_text = directive_model.final_response or "" - return GroupChatDirective( - finish=True, - final_message=ChatMessage( - role=Role.ASSISTANT, - text=final_text, - author_name=self._name, - ), - ) - - next_agent = directive_model.next_agent - if not next_agent: - raise RuntimeError("Manager directive missing next_agent while finish is False.") - if next_agent not in participants: - raise RuntimeError(f"Manager selected unknown participant '{next_agent}'.") - - return GroupChatDirective( - agent_name=next_agent, - instruction=directive_model.message or "", - ) - - class _SpeakerSelectorAdapter: """Adapter that turns a simple speaker selector into a full manager directive.""" diff --git a/python/packages/core/agent_framework/_workflows/_handoff.py b/python/packages/core/agent_framework/_workflows/_handoff.py index d18bc59562..054c53f6e3 100644 --- a/python/packages/core/agent_framework/_workflows/_handoff.py +++ b/python/packages/core/agent_framework/_workflows/_handoff.py @@ -1424,6 +1424,7 @@ def build(self) -> Workflow: prompt=self._request_prompt, id="handoff-user-input", ) + builder = WorkflowBuilder(name=self._name, description=self._description).set_start_executor(input_node) specialist_aliases = {alias: exec_id for alias, exec_id in self._aliases.items() if exec_id in specialists} @@ -1440,6 +1441,7 @@ def _handoff_orchestrator_factory(_: _GroupChatConfig) -> Executor: wiring = _GroupChatConfig( manager=None, + manager_participant=None, manager_name=self._starting_agent_id, participants=participant_specs, max_rounds=None, @@ -1453,14 +1455,13 @@ def _handoff_orchestrator_factory(_: _GroupChatConfig) -> Executor: orchestrator_factory=_handoff_orchestrator_factory, interceptors=(), checkpoint_storage=self._checkpoint_storage, - builder=WorkflowBuilder(name=self._name, description=self._description), + builder=builder, return_builder=True, ) if not isinstance(result, tuple): raise TypeError("Expected tuple from assemble_group_chat_workflow with return_builder=True") builder, coordinator = result - builder = builder.set_start_executor(input_node) builder = builder.add_edge(input_node, starting_executor) builder = builder.add_edge(coordinator, user_gateway) builder = builder.add_edge(user_gateway, coordinator) diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index 624c6f50ae..1a6aaf2999 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -961,7 +961,7 @@ def register_agent_executor(self, name: str, executor: "MagenticAgentExecutor") async def _emit_orchestrator_message( self, - ctx: WorkflowContext[Any, ChatMessage], + ctx: WorkflowContext[Any, list[ChatMessage]], message: ChatMessage, kind: str, ) -> None: @@ -1110,7 +1110,7 @@ async def handle_start_message( self, message: _MagenticStartMessage, context: WorkflowContext[ - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, ChatMessage + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] ], ) -> None: """Handle the initial start message to begin orchestration.""" @@ -1145,7 +1145,7 @@ async def handle_start_message( # Start the inner loop ctx2 = cast( - WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage], + WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], context, ) await self._run_inner_loop(ctx2) @@ -1155,7 +1155,7 @@ async def handle_task_text( self, task_text: str, context: WorkflowContext[ - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, ChatMessage + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] ], ) -> None: await self.handle_start_message(_MagenticStartMessage.from_string(task_text), context) @@ -1165,7 +1165,7 @@ async def handle_task_message( self, task_message: ChatMessage, context: WorkflowContext[ - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, ChatMessage + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] ], ) -> None: await self.handle_start_message(_MagenticStartMessage(task_message), context) @@ -1175,7 +1175,7 @@ async def handle_task_messages( self, conversation: list[ChatMessage], context: WorkflowContext[ - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, ChatMessage + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] ], ) -> None: await self.handle_start_message(_MagenticStartMessage(conversation), context) @@ -1184,7 +1184,7 @@ async def handle_task_messages( async def handle_response_message( self, message: _MagenticResponseMessage, - context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage], + context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], ) -> None: """Handle responses from agents.""" if getattr(self, "_terminated", False): @@ -1216,7 +1216,7 @@ async def handle_plan_review_response( response: _MagenticPlanReviewReply, context: WorkflowContext[ # may broadcast ledger next, or ask for another round of review - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, ChatMessage + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] ], ) -> None: if getattr(self, "_terminated", False): @@ -1262,7 +1262,7 @@ async def handle_plan_review_response( # Enter the normal coordination loop ctx2 = cast( - WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage], + WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], context, ) await self._run_inner_loop(ctx2) @@ -1289,7 +1289,7 @@ async def handle_plan_review_response( self._context.chat_history.append(self._task_ledger) # No further review requests; proceed directly into coordination ctx2 = cast( - WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage], + WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], context, ) await self._run_inner_loop(ctx2) @@ -1324,7 +1324,7 @@ async def handle_plan_review_response( async def _run_outer_loop( self, - context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage], + context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], ) -> None: """Run the outer orchestration loop - planning phase.""" if self._context is None: @@ -1347,7 +1347,7 @@ async def _run_outer_loop( async def _run_inner_loop( self, - context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage], + context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], ) -> None: """Run the inner orchestration loop. Coordination phase. Serialized with a lock.""" if self._context is None or self._task_ledger is None: @@ -1357,7 +1357,7 @@ async def _run_inner_loop( async def _run_inner_loop_helper( self, - context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage], + context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], ) -> None: """Run inner loop with exclusive access.""" # Narrow optional context for the remainder of this method @@ -1442,7 +1442,7 @@ async def _run_inner_loop_helper( async def _reset_and_replan( self, - context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage], + context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], ) -> None: """Reset context and replan.""" if self._context is None: @@ -1468,7 +1468,7 @@ async def _reset_and_replan( async def _prepare_final_answer( self, - context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage], + context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], ) -> None: """Prepare the final answer using the manager.""" if self._context is None: @@ -1478,11 +1478,11 @@ async def _prepare_final_answer( final_answer = await self._manager.prepare_final_answer(self._context.clone(deep=True)) # Emit a completed event for the workflow - await context.yield_output(final_answer) + await context.yield_output([final_answer]) async def _check_within_limits_or_complete( self, - context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage], + context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], ) -> bool: """Check if orchestrator is within operational limits.""" if self._context is None: @@ -1509,7 +1509,7 @@ async def _check_within_limits_or_complete( ) # Yield the partial result and signal completion - await context.yield_output(partial_result) + await context.yield_output([partial_result]) return False return True @@ -2283,21 +2283,22 @@ async def _validate_checkpoint_participants( return # At this point, checkpoint is guaranteed to be WorkflowCheckpoint - executor_states: dict[str, Any] = checkpoint.shared_state.get(EXECUTOR_STATE_KEY, {}) + executor_states = cast(dict[str, Any], checkpoint.shared_state.get(EXECUTOR_STATE_KEY, {})) orchestrator_id = getattr(orchestrator, "id", "") - orchestrator_state = executor_states.get(orchestrator_id) + orchestrator_state = cast(Any, executor_states.get(orchestrator_id)) if orchestrator_state is None: - orchestrator_state = executor_states.get("magentic_orchestrator") + orchestrator_state = cast(Any, executor_states.get("magentic_orchestrator")) if not isinstance(orchestrator_state, dict): return - context_payload = orchestrator_state.get("magentic_context") + orchestrator_state_dict = cast(dict[str, Any], orchestrator_state) + context_payload = cast(Any, orchestrator_state_dict.get("magentic_context")) if not isinstance(context_payload, dict): return context_dict = cast(dict[str, Any], context_payload) - restored_participants = context_dict.get("participant_descriptions") + restored_participants = cast(Any, context_dict.get("participant_descriptions")) if not isinstance(restored_participants, dict): return diff --git a/python/packages/core/agent_framework/_workflows/_orchestrator_helpers.py b/python/packages/core/agent_framework/_workflows/_orchestrator_helpers.py index 4b17dda414..9da726faf4 100644 --- a/python/packages/core/agent_framework/_workflows/_orchestrator_helpers.py +++ b/python/packages/core/agent_framework/_workflows/_orchestrator_helpers.py @@ -186,6 +186,10 @@ def is_registered(self, name: str) -> bool: """Check if a participant is registered.""" return name in self._participant_entry_ids + def is_participant_registered(self, name: str) -> bool: + """Check if a participant is registered (alias for is_registered for compatibility).""" + return self.is_registered(name) + def all_participants(self) -> set[str]: """Get all registered participant names.""" return set(self._participant_entry_ids.keys()) diff --git a/python/packages/core/tests/workflow/test_group_chat.py b/python/packages/core/tests/workflow/test_group_chat.py index 8d1798a7a4..5d11e64c79 100644 --- a/python/packages/core/tests/workflow/test_group_chat.py +++ b/python/packages/core/tests/workflow/test_group_chat.py @@ -1,9 +1,10 @@ # Copyright (c) Microsoft. All rights reserved. from collections.abc import AsyncIterable, Callable -from typing import Any +from typing import Any, cast import pytest +from pydantic import BaseModel from agent_framework import ( MAGENTIC_EVENT_TYPE_AGENT_DELTA, @@ -14,6 +15,7 @@ AgentThread, BaseAgent, ChatMessage, + Executor, GroupChatBuilder, GroupChatDirective, GroupChatStateSnapshot, @@ -23,21 +25,27 @@ Role, TextContent, Workflow, + WorkflowContext, WorkflowOutputEvent, + handler, ) from agent_framework._workflows._checkpoint import InMemoryCheckpointStorage from agent_framework._workflows._group_chat import ( GroupChatOrchestratorExecutor, + ManagerSelectionResponse, _default_orchestrator_factory, # type: ignore + _default_participant_factory, # type: ignore _GroupChatConfig, # type: ignore - _PromptBasedGroupChatManager, # type: ignore _SpeakerSelectorAdapter, # type: ignore + assemble_group_chat_workflow, ) from agent_framework._workflows._magentic import ( _MagenticProgressLedger, # type: ignore _MagenticProgressLedgerItem, # type: ignore _MagenticStartMessage, # type: ignore ) +from agent_framework._workflows._participant_utils import GroupChatParticipantSpec +from agent_framework._workflows._workflow_builder import WorkflowBuilder class StubAgent(BaseAgent): @@ -70,6 +78,73 @@ async def _stream() -> AsyncIterable[AgentRunResponseUpdate]: return _stream() +class StubManagerAgent(BaseAgent): + def __init__(self) -> None: + super().__init__(name="manager_agent", description="Stub manager") + self._call_count = 0 + + async def run( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + *, + thread: AgentThread | None = None, + **kwargs: Any, + ) -> AgentRunResponse: # type: ignore[override] + if self._call_count == 0: + self._call_count += 1 + payload = {"selected_participant": "agent", "finish": False, "final_message": None} + return AgentRunResponse( + messages=[ + ChatMessage( + role=Role.ASSISTANT, + text='{"selected_participant": "agent", "finish": false}', + author_name=self.name, + ) + ], + value=payload, + ) + + payload = {"selected_participant": None, "finish": True, "final_message": "agent manager final"} + return AgentRunResponse( + messages=[ + ChatMessage( + role=Role.ASSISTANT, + text='{"finish": true, "final_message": "agent manager final"}', + author_name=self.name, + ) + ], + value=payload, + ) + + def run_stream( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + *, + thread: AgentThread | None = None, + **kwargs: Any, + ) -> AsyncIterable[AgentRunResponseUpdate]: # type: ignore[override] + if self._call_count == 0: + self._call_count += 1 + + async def _stream_initial() -> AsyncIterable[AgentRunResponseUpdate]: + yield AgentRunResponseUpdate( + contents=[TextContent(text='{"selected_participant": "agent", "finish": false}')], + role=Role.ASSISTANT, + author_name=self.name, + ) + + return _stream_initial() + + async def _stream_final() -> AsyncIterable[AgentRunResponseUpdate]: + yield AgentRunResponseUpdate( + contents=[TextContent(text='{"finish": true, "final_message": "agent manager final"}')], + role=Role.ASSISTANT, + author_name=self.name, + ) + + return _stream_final() + + def make_sequence_selector() -> Callable[[GroupChatStateSnapshot], Any]: state_counter = {"value": 0} @@ -123,6 +198,22 @@ async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatM return ChatMessage(role=Role.ASSISTANT, text="final", author_name="magentic_manager") +class PassthroughExecutor(Executor): + @handler + async def forward(self, message: Any, ctx: WorkflowContext[Any]) -> None: + await ctx.send_message(message) + + +class CountingWorkflowBuilder(WorkflowBuilder): + def __init__(self) -> None: + super().__init__() + self.start_calls = 0 + + def set_start_executor(self, executor: Any) -> "CountingWorkflowBuilder": + self.start_calls += 1 + return cast("CountingWorkflowBuilder", super().set_start_executor(executor)) + + async def test_group_chat_builder_basic_flow() -> None: selector = make_sequence_selector() alpha = StubAgent("alpha", "ack from alpha") @@ -130,21 +221,23 @@ async def test_group_chat_builder_basic_flow() -> None: workflow = ( GroupChatBuilder() - .select_speakers(selector, display_name="manager", final_message="done") + .set_select_speakers_func(selector, display_name="manager", final_message="done") .participants(alpha=alpha, beta=beta) .build() ) - outputs: list[ChatMessage] = [] + outputs: list[list[ChatMessage]] = [] async for event in workflow.run_stream("coordinate task"): if isinstance(event, WorkflowOutputEvent): data = event.data - if isinstance(data, ChatMessage): - outputs.append(data) + if isinstance(data, list): + outputs.append(cast(list[ChatMessage], data)) assert len(outputs) == 1 - assert outputs[0].text == "done" - assert outputs[0].author_name == "manager" + assert len(outputs[0]) >= 1 + # The final message should be "done" from the manager + assert outputs[0][-1].text == "done" + assert outputs[0][-1].author_name == "manager" async def test_magentic_builder_returns_workflow_and_runs() -> None: @@ -169,11 +262,13 @@ async def test_magentic_builder_returns_workflow_and_runs() -> None: agent_event_count += 1 if isinstance(event, WorkflowOutputEvent): msg = event.data - if isinstance(msg, ChatMessage): - outputs.append(msg) + if isinstance(msg, list): + outputs.append(cast(list[ChatMessage], msg)) assert outputs, "Expected a final output message" - final = outputs[-1] + conversation = outputs[-1] + assert len(conversation) >= 1 + final = conversation[-1] assert final.text == "final" assert final.author_name == "magentic_manager" assert orchestrator_event_count > 0, "Expected orchestrator events to be emitted" @@ -187,7 +282,7 @@ async def test_group_chat_as_agent_accepts_conversation() -> None: workflow = ( GroupChatBuilder() - .select_speakers(selector, display_name="manager", final_message="done") + .set_select_speakers_func(selector, display_name="manager", final_message="done") .participants(alpha=alpha, beta=beta) .build() ) @@ -239,7 +334,7 @@ def test_build_without_participants_raises_error(self) -> None: def selector(state: GroupChatStateSnapshot) -> str | None: return None - builder = GroupChatBuilder().select_speakers(selector) + builder = GroupChatBuilder().set_select_speakers_func(selector) with pytest.raises(ValueError, match="participants must be configured before build"): builder.build() @@ -250,10 +345,10 @@ def test_duplicate_manager_configuration_raises_error(self) -> None: def selector(state: GroupChatStateSnapshot) -> str | None: return None - builder = GroupChatBuilder().select_speakers(selector) + builder = GroupChatBuilder().set_select_speakers_func(selector) with pytest.raises(ValueError, match="already has a manager configured"): - builder.select_speakers(selector) + builder.set_select_speakers_func(selector) def test_empty_participants_raises_error(self) -> None: """Test that empty participants list raises ValueError.""" @@ -261,7 +356,7 @@ def test_empty_participants_raises_error(self) -> None: def selector(state: GroupChatStateSnapshot) -> str | None: return None - builder = GroupChatBuilder().select_speakers(selector) + builder = GroupChatBuilder().set_select_speakers_func(selector) with pytest.raises(ValueError, match="participants cannot be empty"): builder.participants([]) @@ -274,7 +369,7 @@ def test_duplicate_participant_names_raises_error(self) -> None: def selector(state: GroupChatStateSnapshot) -> str | None: return None - builder = GroupChatBuilder().select_speakers(selector) + builder = GroupChatBuilder().set_select_speakers_func(selector) with pytest.raises(ValueError, match="Duplicate participant name 'test'"): builder.participants([agent1, agent2]) @@ -302,7 +397,7 @@ async def _stream() -> AsyncIterable[AgentRunResponseUpdate]: def selector(state: GroupChatStateSnapshot) -> str | None: return None - builder = GroupChatBuilder().select_speakers(selector) + builder = GroupChatBuilder().set_select_speakers_func(selector) with pytest.raises(ValueError, match="must define a non-empty 'name' attribute"): builder.participants([agent]) @@ -314,11 +409,53 @@ def test_empty_participant_name_raises_error(self) -> None: def selector(state: GroupChatStateSnapshot) -> str | None: return None - builder = GroupChatBuilder().select_speakers(selector) + builder = GroupChatBuilder().set_select_speakers_func(selector) with pytest.raises(ValueError, match="participant names must be non-empty strings"): builder.participants({"": agent}) + def test_assemble_group_chat_respects_existing_start_executor(self) -> None: + """Ensure assemble_group_chat_workflow does not override preconfigured start executor.""" + + async def manager(_: GroupChatStateSnapshot) -> GroupChatDirective: + return GroupChatDirective(finish=True) + + builder = CountingWorkflowBuilder() + entry = PassthroughExecutor(id="entry") + builder = builder.set_start_executor(entry) + + participant = PassthroughExecutor(id="participant") + participant_spec = GroupChatParticipantSpec( + name="participant", + participant=participant, + description="participant", + ) + + wiring = _GroupChatConfig( + manager=manager, + manager_participant=None, + manager_name="manager", + participants={"participant": participant_spec}, + max_rounds=None, + termination_condition=None, + participant_aliases={}, + participant_executors={"participant": participant}, + ) + + result = assemble_group_chat_workflow( + wiring=wiring, + participant_factory=_default_participant_factory, + orchestrator_factory=_default_orchestrator_factory, + builder=builder, + return_builder=True, + ) + + assert isinstance(result, tuple) + assembled_builder, _ = result + assert assembled_builder is builder + assert builder.start_calls == 1 + assert assembled_builder._start_executor is entry # type: ignore + class TestGroupChatOrchestrator: """Tests for GroupChatOrchestratorExecutor core functionality.""" @@ -336,25 +473,116 @@ def selector(state: GroupChatStateSnapshot) -> str | None: workflow = ( GroupChatBuilder() - .select_speakers(selector) + .set_select_speakers_func(selector) .participants([agent]) .with_max_rounds(2) # Limit to 2 rounds .build() ) - outputs: list[ChatMessage] = [] + outputs: list[list[ChatMessage]] = [] async for event in workflow.run_stream("test task"): if isinstance(event, WorkflowOutputEvent): data = event.data - if isinstance(data, ChatMessage): - outputs.append(data) + if isinstance(data, list): + outputs.append(cast(list[ChatMessage], data)) # Should have terminated due to max_rounds, expect at least one output assert len(outputs) >= 1 - # The final message should be about round limit - final_output = outputs[-1] + # The final message in the conversation should be about round limit + conversation = outputs[-1] + assert len(conversation) >= 1 + final_output = conversation[-1] assert "round limit" in final_output.text.lower() + async def test_termination_condition_halts_conversation(self) -> None: + """Test that a custom termination condition stops the workflow.""" + + def selector(state: GroupChatStateSnapshot) -> str | None: + return "agent" + + def termination_condition(conversation: list[ChatMessage]) -> bool: + replies = [msg for msg in conversation if msg.role == Role.ASSISTANT and msg.author_name == "agent"] + return len(replies) >= 2 + + agent = StubAgent("agent", "response") + + workflow = ( + GroupChatBuilder() + .set_select_speakers_func(selector) + .participants([agent]) + .with_termination_condition(termination_condition) + .build() + ) + + outputs: list[list[ChatMessage]] = [] + async for event in workflow.run_stream("test task"): + if isinstance(event, WorkflowOutputEvent): + data = event.data + if isinstance(data, list): + outputs.append(cast(list[ChatMessage], data)) + + assert outputs, "Expected termination to yield output" + conversation = outputs[-1] + agent_replies = [msg for msg in conversation if msg.author_name == "agent" and msg.role == Role.ASSISTANT] + assert len(agent_replies) == 2 + final_output = conversation[-1] + assert final_output.author_name == "manager" + assert "termination condition" in final_output.text.lower() + + async def test_termination_condition_uses_manager_final_message(self) -> None: + """Test that manager-provided final message is used on termination.""" + + async def selector(state: GroupChatStateSnapshot) -> str | None: + return None + + agent = StubAgent("agent", "response") + final_text = "manager summary on termination" + + workflow = ( + GroupChatBuilder() + .set_select_speakers_func(selector, final_message=final_text) + .participants([agent]) + .with_termination_condition(lambda _: True) + .build() + ) + + outputs: list[list[ChatMessage]] = [] + async for event in workflow.run_stream("test task"): + if isinstance(event, WorkflowOutputEvent): + data = event.data + if isinstance(data, list): + outputs.append(cast(list[ChatMessage], data)) + + assert outputs, "Expected termination to yield output" + conversation = outputs[-1] + assert conversation[-1].text == final_text + assert conversation[-1].author_name == "manager" + + async def test_termination_condition_agent_manager_finalizes(self) -> None: + """Test that agent-based manager can provide final message on termination.""" + manager = StubManagerAgent() + worker = StubAgent("agent", "response") + + workflow = ( + GroupChatBuilder() + .set_manager(manager, display_name="Manager") + .participants([worker]) + .with_termination_condition(lambda conv: any(msg.author_name == "agent" for msg in conv)) + .build() + ) + + outputs: list[list[ChatMessage]] = [] + async for event in workflow.run_stream("test task"): + if isinstance(event, WorkflowOutputEvent): + data = event.data + if isinstance(data, list): + outputs.append(cast(list[ChatMessage], data)) + + assert outputs, "Expected termination to yield output" + conversation = outputs[-1] + assert conversation[-1].text == "agent manager final" + assert conversation[-1].author_name == "Manager" + async def test_unknown_participant_error(self) -> None: """Test that _apply_directive raises error for unknown participants.""" @@ -363,7 +591,7 @@ def selector(state: GroupChatStateSnapshot) -> str | None: agent = StubAgent("agent", "response") - workflow = GroupChatBuilder().select_speakers(selector).participants([agent]).build() + workflow = GroupChatBuilder().set_select_speakers_func(selector).participants([agent]).build() with pytest.raises(ValueError, match="Manager selected unknown participant 'unknown_agent'"): async for _ in workflow.run_stream("test task"): @@ -379,7 +607,7 @@ def bad_selector(state: GroupChatStateSnapshot) -> GroupChatDirective: agent = StubAgent("agent", "response") # The _SpeakerSelectorAdapter will catch this and raise TypeError - workflow = GroupChatBuilder().select_speakers(bad_selector).participants([agent]).build() # type: ignore + workflow = GroupChatBuilder().set_select_speakers_func(bad_selector).participants([agent]).build() # type: ignore # This should raise a TypeError because selector doesn't return str or None with pytest.raises(TypeError, match="must return a participant name \\(str\\) or None"): @@ -394,7 +622,7 @@ def selector(state: GroupChatStateSnapshot) -> str | None: agent = StubAgent("agent", "response") - workflow = GroupChatBuilder().select_speakers(selector).participants([agent]).build() + workflow = GroupChatBuilder().set_select_speakers_func(selector).participants([agent]).build() with pytest.raises(ValueError, match="requires at least one chat message"): async for _ in workflow.run_stream([]): @@ -529,69 +757,76 @@ def selector(state: GroupChatStateSnapshot) -> str | None: storage = InMemoryCheckpointStorage() workflow = ( - GroupChatBuilder().select_speakers(selector).participants([agent]).with_checkpointing(storage).build() + GroupChatBuilder() + .set_select_speakers_func(selector) + .participants([agent]) + .with_checkpointing(storage) + .build() ) - outputs: list[ChatMessage] = [] + outputs: list[list[ChatMessage]] = [] async for event in workflow.run_stream("test task"): if isinstance(event, WorkflowOutputEvent): data = event.data - if isinstance(data, ChatMessage): - outputs.append(data) + if isinstance(data, list): + outputs.append(cast(list[ChatMessage], data)) assert len(outputs) == 1 # Should complete normally -class TestPromptBasedManager: - """Tests for _PromptBasedGroupChatManager.""" +class TestAgentManagerConfiguration: + """Tests for agent-based manager configuration.""" - async def test_manager_with_missing_next_agent_raises_error(self) -> None: - """Test that manager directive without next_agent raises RuntimeError.""" + async def test_set_manager_configures_response_format(self) -> None: + """Ensure ChatAgent managers receive default ManagerSelectionResponse formatting.""" + from unittest.mock import MagicMock - class MockChatClient: - async def get_response(self, messages: Any, response_format: Any = None) -> Any: - # Return response that has finish=False but no next_agent - class MockResponse: - def __init__(self) -> None: - self.value = {"finish": False, "next_agent": None} - self.messages: list[Any] = [] + from agent_framework import ChatAgent - return MockResponse() + chat_client = MagicMock() + manager_agent = ChatAgent(chat_client=chat_client, name="Coordinator") + assert manager_agent.chat_options.response_format is None - manager = _PromptBasedGroupChatManager(MockChatClient()) # type: ignore + worker = StubAgent("worker", "response") - state = { - "participants": {"agent": "desc"}, - "task": ChatMessage(role=Role.USER, text="test"), - "conversation": (), - } + builder = GroupChatBuilder().set_manager(manager_agent).participants([worker]) - with pytest.raises(RuntimeError, match="missing next_agent while finish is False"): - await manager(state) + assert manager_agent.chat_options.response_format is ManagerSelectionResponse + assert builder._manager_participant is manager_agent # type: ignore[attr-defined] - async def test_manager_with_unknown_participant_raises_error(self) -> None: - """Test that manager selecting unknown participant raises RuntimeError.""" + async def test_set_manager_accepts_agent_manager(self) -> None: + """Verify agent-based manager can be set and workflow builds.""" + from unittest.mock import MagicMock - class MockChatClient: - async def get_response(self, messages: Any, response_format: Any = None) -> Any: - # Return response selecting unknown participant - class MockResponse: - def __init__(self) -> None: - self.value = {"finish": False, "next_agent": "unknown"} - self.messages: list[Any] = [] + from agent_framework import ChatAgent - return MockResponse() + chat_client = MagicMock() + manager_agent = ChatAgent(chat_client=chat_client, name="Coordinator") + worker = StubAgent("worker", "response") - manager = _PromptBasedGroupChatManager(MockChatClient()) # type: ignore + builder = GroupChatBuilder().set_manager(manager_agent, display_name="Orchestrator") + builder = builder.participants([worker]).with_max_rounds(1) - state = { - "participants": {"agent": "desc"}, - "task": ChatMessage(role=Role.USER, text="test"), - "conversation": (), - } + assert builder._manager_participant is manager_agent # type: ignore[attr-defined] + assert "worker" in builder._participants # type: ignore[attr-defined] + + async def test_set_manager_rejects_custom_response_format(self) -> None: + """Reject custom response_format on ChatAgent managers.""" + from unittest.mock import MagicMock + + from agent_framework import ChatAgent + + class CustomResponse(BaseModel): + value: str - with pytest.raises(RuntimeError, match="Manager selected unknown participant 'unknown'"): - await manager(state) + chat_client = MagicMock() + manager_agent = ChatAgent(chat_client=chat_client, name="Coordinator", response_format=CustomResponse) + worker = StubAgent("worker", "response") + + with pytest.raises(ValueError, match="response_format must be ManagerSelectionResponse"): + GroupChatBuilder().set_manager(manager_agent).participants([worker]) + + assert manager_agent.chat_options.response_format is CustomResponse class TestFactoryFunctions: @@ -599,9 +834,9 @@ class TestFactoryFunctions: def test_default_orchestrator_factory_without_manager_raises_error(self) -> None: """Test that default factory requires manager to be set.""" - config = _GroupChatConfig(manager=None, manager_name="test", participants={}) + config = _GroupChatConfig(manager=None, manager_participant=None, manager_name="test", participants={}) - with pytest.raises(RuntimeError, match="requires a manager to be set"): + with pytest.raises(RuntimeError, match="requires a manager to be configured"): _default_orchestrator_factory(config) @@ -619,14 +854,14 @@ def selector(state: GroupChatStateSnapshot) -> str | None: agent = StubAgent("agent", "response") - workflow = GroupChatBuilder().select_speakers(selector).participants([agent]).build() + workflow = GroupChatBuilder().set_select_speakers_func(selector).participants([agent]).build() - outputs: list[ChatMessage] = [] + outputs: list[list[ChatMessage]] = [] async for event in workflow.run_stream("test string"): if isinstance(event, WorkflowOutputEvent): data = event.data - if isinstance(data, ChatMessage): - outputs.append(data) + if isinstance(data, list): + outputs.append(cast(list[ChatMessage], data)) assert len(outputs) == 1 @@ -641,14 +876,14 @@ def selector(state: GroupChatStateSnapshot) -> str | None: agent = StubAgent("agent", "response") - workflow = GroupChatBuilder().select_speakers(selector).participants([agent]).build() + workflow = GroupChatBuilder().set_select_speakers_func(selector).participants([agent]).build() - outputs: list[ChatMessage] = [] + outputs: list[list[ChatMessage]] = [] async for event in workflow.run_stream(task_message): if isinstance(event, WorkflowOutputEvent): data = event.data - if isinstance(data, ChatMessage): - outputs.append(data) + if isinstance(data, list): + outputs.append(cast(list[ChatMessage], data)) assert len(outputs) == 1 @@ -667,14 +902,14 @@ def selector(state: GroupChatStateSnapshot) -> str | None: agent = StubAgent("agent", "response") - workflow = GroupChatBuilder().select_speakers(selector).participants([agent]).build() + workflow = GroupChatBuilder().set_select_speakers_func(selector).participants([agent]).build() - outputs: list[ChatMessage] = [] + outputs: list[list[ChatMessage]] = [] async for event in workflow.run_stream(conversation): if isinstance(event, WorkflowOutputEvent): data = event.data - if isinstance(data, ChatMessage): - outputs.append(data) + if isinstance(data, list): + outputs.append(cast(list[ChatMessage], data)) assert len(outputs) == 1 @@ -695,23 +930,25 @@ def selector(state: GroupChatStateSnapshot) -> str | None: workflow = ( GroupChatBuilder() - .select_speakers(selector) + .set_select_speakers_func(selector) .participants([agent]) .with_max_rounds(1) # Very low limit .build() ) - outputs: list[ChatMessage] = [] + outputs: list[list[ChatMessage]] = [] async for event in workflow.run_stream("test"): if isinstance(event, WorkflowOutputEvent): data = event.data - if isinstance(data, ChatMessage): - outputs.append(data) + if isinstance(data, list): + outputs.append(cast(list[ChatMessage], data)) # Should have at least one output (the round limit message) assert len(outputs) >= 1 - # The last message should be about round limit - final_output = outputs[-1] + # The last message in the conversation should be about round limit + conversation = outputs[-1] + assert len(conversation) >= 1 + final_output = conversation[-1] assert "round limit" in final_output.text.lower() async def test_round_limit_in_ingest_participant_message(self) -> None: @@ -728,23 +965,25 @@ def selector(state: GroupChatStateSnapshot) -> str | None: workflow = ( GroupChatBuilder() - .select_speakers(selector) + .set_select_speakers_func(selector) .participants([agent]) .with_max_rounds(1) # Hit limit after first response .build() ) - outputs: list[ChatMessage] = [] + outputs: list[list[ChatMessage]] = [] async for event in workflow.run_stream("test"): if isinstance(event, WorkflowOutputEvent): data = event.data - if isinstance(data, ChatMessage): - outputs.append(data) + if isinstance(data, list): + outputs.append(cast(list[ChatMessage], data)) # Should have at least one output (the round limit message) assert len(outputs) >= 1 - # The last message should be about round limit - final_output = outputs[-1] + # The last message in the conversation should be about round limit + conversation = outputs[-1] + assert len(conversation) >= 1 + final_output = conversation[-1] assert "round limit" in final_output.text.lower() @@ -758,12 +997,12 @@ async def test_group_chat_checkpoint_runtime_only() -> None: agent_b = StubAgent("agentB", "Reply from B") selector = make_sequence_selector() - wf = GroupChatBuilder().participants([agent_a, agent_b]).select_speakers(selector).build() + wf = GroupChatBuilder().participants([agent_a, agent_b]).set_select_speakers_func(selector).build() baseline_output: list[ChatMessage] | None = None async for ev in wf.run_stream("runtime checkpoint test", checkpoint_storage=storage): if isinstance(ev, WorkflowOutputEvent): - baseline_output = ev.data # type: ignore[assignment] + baseline_output = cast(list[ChatMessage], ev.data) if isinstance(ev.data, list) else None if isinstance(ev, WorkflowStatusEvent) and ev.state in ( WorkflowRunState.IDLE, WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, @@ -794,7 +1033,7 @@ async def test_group_chat_checkpoint_runtime_overrides_buildtime() -> None: wf = ( GroupChatBuilder() .participants([agent_a, agent_b]) - .select_speakers(selector) + .set_select_speakers_func(selector) .with_checkpointing(buildtime_storage) .build() ) @@ -802,7 +1041,7 @@ async def test_group_chat_checkpoint_runtime_overrides_buildtime() -> None: baseline_output: list[ChatMessage] | None = None async for ev in wf.run_stream("override test", checkpoint_storage=runtime_storage): if isinstance(ev, WorkflowOutputEvent): - baseline_output = ev.data # type: ignore[assignment] + baseline_output = cast(list[ChatMessage], ev.data) if isinstance(ev.data, list) else None if isinstance(ev, WorkflowStatusEvent) and ev.state in ( WorkflowRunState.IDLE, WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, @@ -816,3 +1055,30 @@ async def test_group_chat_checkpoint_runtime_overrides_buildtime() -> None: assert len(runtime_checkpoints) > 0, "Runtime storage should have checkpoints" assert len(buildtime_checkpoints) == 0, "Build-time storage should have no checkpoints when overridden" + + +class _StubExecutor(Executor): + """Minimal executor used to satisfy workflow wiring in tests.""" + + def __init__(self, id: str) -> None: + super().__init__(id=id) + + @handler + async def handle(self, message: object, ctx: WorkflowContext[ChatMessage]) -> None: + await ctx.yield_output(message) + + +def test_set_manager_builds_with_agent_manager() -> None: + """GroupChatBuilder should build when using an agent-based manager.""" + + manager = _StubExecutor("manager_executor") + participant = _StubExecutor("participant_executor") + + workflow = ( + GroupChatBuilder().set_manager(manager, display_name="Moderator").participants({"worker": participant}).build() + ) + + orchestrator = workflow.get_start_executor() + + assert isinstance(orchestrator, GroupChatOrchestratorExecutor) + assert orchestrator._is_manager_agent() diff --git a/python/packages/core/tests/workflow/test_handoff.py b/python/packages/core/tests/workflow/test_handoff.py index 5dfd7522df..3bbed7681e 100644 --- a/python/packages/core/tests/workflow/test_handoff.py +++ b/python/packages/core/tests/workflow/test_handoff.py @@ -23,7 +23,22 @@ WorkflowOutputEvent, ) from agent_framework._mcp import MCPTool +from agent_framework._workflows import _handoff as handoff_module # type: ignore from agent_framework._workflows._handoff import _clone_chat_agent # type: ignore[reportPrivateUsage] +from agent_framework._workflows._workflow_builder import WorkflowBuilder + + +class _CountingWorkflowBuilder(WorkflowBuilder): + created: list["_CountingWorkflowBuilder"] = [] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.start_calls = 0 + _CountingWorkflowBuilder.created.append(self) + + def set_start_executor(self, executor: Any) -> "_CountingWorkflowBuilder": # type: ignore[override] + self.start_calls += 1 + return cast("_CountingWorkflowBuilder", super().set_start_executor(executor)) @dataclass @@ -478,6 +493,27 @@ async def test_return_to_previous_enabled(): assert len(specialist_a.calls) == 2, "Specialist A should handle follow-up with return_to_previous enabled" +def test_handoff_builder_sets_start_executor_once(monkeypatch: pytest.MonkeyPatch) -> None: + """Ensure HandoffBuilder.build sets the start executor only once when assembling the workflow.""" + _CountingWorkflowBuilder.created.clear() + monkeypatch.setattr(handoff_module, "WorkflowBuilder", _CountingWorkflowBuilder) + + coordinator = _RecordingAgent(name="coordinator") + specialist = _RecordingAgent(name="specialist") + + workflow = ( + HandoffBuilder(participants=[coordinator, specialist]) + .set_coordinator("coordinator") + .with_termination_condition(lambda conv: len(conv) > 0) + .build() + ) + + assert workflow is not None + assert _CountingWorkflowBuilder.created, "Expected CountingWorkflowBuilder to be instantiated" + builder = _CountingWorkflowBuilder.created[-1] + assert builder.start_calls == 1, "set_start_executor should be invoked exactly once" + + async def test_tool_choice_preserved_from_agent_config(): """Verify that agent-level tool_choice configuration is preserved and not overridden.""" from unittest.mock import AsyncMock diff --git a/python/packages/core/tests/workflow/test_magentic.py b/python/packages/core/tests/workflow/test_magentic.py index 7dc8a2c471..b41d243a3e 100644 --- a/python/packages/core/tests/workflow/test_magentic.py +++ b/python/packages/core/tests/workflow/test_magentic.py @@ -33,6 +33,7 @@ WorkflowStatusEvent, handler, ) +from agent_framework._workflows import _group_chat as group_chat_module # type: ignore from agent_framework._workflows._checkpoint import InMemoryCheckpointStorage from agent_framework._workflows._magentic import ( # type: ignore[reportPrivateUsage] MagenticAgentExecutor, @@ -42,6 +43,7 @@ _MagenticProgressLedgerItem, # type: ignore _MagenticStartMessage, # type: ignore ) +from agent_framework._workflows._workflow_builder import WorkflowBuilder if sys.version_info >= (3, 12): from typing import override @@ -162,6 +164,19 @@ async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatM return ChatMessage(role=Role.ASSISTANT, text="FINAL", author_name="magentic_manager") +class _CountingWorkflowBuilder(WorkflowBuilder): + created: list["_CountingWorkflowBuilder"] = [] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.start_calls = 0 + _CountingWorkflowBuilder.created.append(self) + + def set_start_executor(self, executor: Any) -> "_CountingWorkflowBuilder": # type: ignore[override] + self.start_calls += 1 + return cast("_CountingWorkflowBuilder", super().set_start_executor(executor)) + + async def test_standard_manager_plan_and_replan_combined_ledger(): manager = FakeManager(max_round_count=10, max_stall_count=3, max_reset_count=2) ctx = MagenticContext( @@ -210,7 +225,7 @@ async def test_magentic_workflow_plan_review_approval_to_completion(): assert req_event is not None completed = False - output: ChatMessage | None = None + output: list[ChatMessage] | None = None async for ev in wf.send_responses_streaming( responses={req_event.request_id: MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)} ): @@ -222,7 +237,8 @@ async def test_magentic_workflow_plan_review_approval_to_completion(): break assert completed assert output is not None - assert isinstance(output, ChatMessage) + assert isinstance(output, list) + assert all(isinstance(msg, ChatMessage) for msg in output) async def test_magentic_plan_review_approve_with_comments_replans_and_proceeds(): @@ -300,8 +316,10 @@ async def test_magentic_orchestrator_round_limit_produces_partial_result(): output_event = next((e for e in events if isinstance(e, WorkflowOutputEvent)), None) assert output_event is not None data = output_event.data - assert isinstance(data, ChatMessage) - assert data.role == Role.ASSISTANT + assert isinstance(data, list) + assert all(isinstance(msg, ChatMessage) for msg in data) + assert len(data) > 0 + assert data[-1].role == Role.ASSISTANT async def test_magentic_checkpoint_resume_round_trip(): @@ -374,6 +392,23 @@ async def _noop(self, message: object, ctx: WorkflowContext[object]) -> None: # pass +def test_magentic_builder_sets_start_executor_once(monkeypatch: pytest.MonkeyPatch) -> None: + """Ensure MagenticBuilder wiring sets the start executor only once.""" + _CountingWorkflowBuilder.created.clear() + monkeypatch.setattr(group_chat_module, "WorkflowBuilder", _CountingWorkflowBuilder) + + manager = FakeManager() + + workflow = ( + MagenticBuilder().participants(agentA=_DummyExec("agentA")).with_standard_manager(manager=manager).build() + ) + + assert workflow is not None + assert _CountingWorkflowBuilder.created, "Expected CountingWorkflowBuilder to be instantiated" + builder = _CountingWorkflowBuilder.created[-1] + assert builder.start_calls == 1, "set_start_executor should be called exactly once" + + async def test_magentic_agent_executor_on_checkpoint_save_and_restore_roundtrip(): backing_executor = _DummyExec("backing") agent_exec = MagenticAgentExecutor(backing_executor, "agentA") @@ -746,9 +781,11 @@ async def test_magentic_stall_and_reset_successfully(): assert idle_status is not None output_event = next((e for e in events if isinstance(e, WorkflowOutputEvent)), None) assert output_event is not None - assert isinstance(output_event.data, ChatMessage) - assert output_event.data.text is not None - assert output_event.data.text == "re-ledger" + assert isinstance(output_event.data, list) + assert all(isinstance(msg, ChatMessage) for msg in output_event.data) + assert len(output_event.data) > 0 + assert output_event.data[-1].text is not None + assert output_event.data[-1].text == "re-ledger" async def test_magentic_checkpoint_runtime_only() -> None: diff --git a/python/samples/README.md b/python/samples/README.md index bb7d01527a..3434fa639d 100644 --- a/python/samples/README.md +++ b/python/samples/README.md @@ -268,97 +268,7 @@ This directory contains samples demonstrating the capabilities of Microsoft Agen ## Workflows -### Start Here - -| File | Description | -|------|-------------| -| [`getting_started/workflows/_start-here/step1_executors_and_edges.py`](./getting_started/workflows/_start-here/step1_executors_and_edges.py) | Step 1: Foundational patterns: Executors and edges | -| [`getting_started/workflows/_start-here/step2_agents_in_a_workflow.py`](./getting_started/workflows/_start-here/step2_agents_in_a_workflow.py) | Step 2: Agents in a Workflow non-streaming | -| [`getting_started/workflows/_start-here/step3_streaming.py`](./getting_started/workflows/_start-here/step3_streaming.py) | Step 3: Agents in a workflow with streaming | - -### Agents in Workflows - -| File | Description | -|------|-------------| -| [`getting_started/workflows/agents/azure_ai_agents_streaming.py`](./getting_started/workflows/agents/azure_ai_agents_streaming.py) | Sample: Agents in a workflow with streaming | -| [`getting_started/workflows/agents/azure_chat_agents_function_bridge.py`](./getting_started/workflows/agents/azure_chat_agents_function_bridge.py) | Sample: Two agents connected by a function executor bridge | -| [`getting_started/workflows/agents/azure_chat_agents_streaming.py`](./getting_started/workflows/agents/azure_chat_agents_streaming.py) | Sample: Agents in a workflow with streaming | -| [`getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py`](./getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py) | Sample: Tool-enabled agents with human feedback | -| [`getting_started/workflows/agents/custom_agent_executors.py`](./getting_started/workflows/agents/custom_agent_executors.py) | Step 2: Agents in a Workflow non-streaming | -| [`getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py`](./getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py) | Sample: Workflow Agent with Human-in-the-Loop | -| [`getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py`](./getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py) | Sample: Workflow as Agent with Reflection and Retry Pattern | - -### Checkpoint - -| File | Description | -|------|-------------| -| [`getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py`](./getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py) | Sample: Checkpoint + human-in-the-loop quickstart | -| [`getting_started/workflows/checkpoint/checkpoint_with_resume.py`](./getting_started/workflows/checkpoint/checkpoint_with_resume.py) | Sample: Checkpointing and Resuming a Workflow (with an Agent stage) | -| [`getting_started/workflows/checkpoint/sub_workflow_checkpoint.py`](./getting_started/workflows/checkpoint/sub_workflow_checkpoint.py) | Sample: Checkpointing for workflows that embed sub-workflows | - -### Composition - -| File | Description | -|------|-------------| -| [`getting_started/workflows/composition/sub_workflow_basics.py`](./getting_started/workflows/composition/sub_workflow_basics.py) | Sample: Sub-Workflows (Basics) | -| [`getting_started/workflows/composition/sub_workflow_parallel_requests.py`](./getting_started/workflows/composition/sub_workflow_parallel_requests.py) | Sample: Sub-workflow with parallel request handling by specialized interceptors | -| [`getting_started/workflows/composition/sub_workflow_request_interception.py`](./getting_started/workflows/composition/sub_workflow_request_interception.py) | Sample: Sub-Workflows with Request Interception | - -### Control Flow - -| File | Description | -|------|-------------| -| [`getting_started/workflows/control-flow/edge_condition.py`](./getting_started/workflows/control-flow/edge_condition.py) | Sample: Conditional routing with structured outputs | -| [`getting_started/workflows/control-flow/multi_selection_edge_group.py`](./getting_started/workflows/control-flow/multi_selection_edge_group.py) | Step 06b — Multi-Selection Edge Group sample | -| [`getting_started/workflows/control-flow/sequential_executors.py`](./getting_started/workflows/control-flow/sequential_executors.py) | Sample: Sequential workflow with streaming | -| [`getting_started/workflows/control-flow/sequential_streaming.py`](./getting_started/workflows/control-flow/sequential_streaming.py) | Sample: Foundational sequential workflow with streaming using function-style executors | -| [`getting_started/workflows/control-flow/simple_loop.py`](./getting_started/workflows/control-flow/simple_loop.py) | Sample: Simple Loop (with an Agent Judge) | -| [`getting_started/workflows/control-flow/switch_case_edge_group.py`](./getting_started/workflows/control-flow/switch_case_edge_group.py) | Sample: Switch-Case Edge Group with an explicit Uncertain branch | - -### Human-in-the-Loop - -| File | Description | -|------|-------------| -| [`getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py`](./getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py) | Sample: Human in the loop guessing game | -| [`getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py`](./getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py) | Sample: Agents with Approval Requests in Workflows | - -### Orchestration - -| File | Description | -|------|-------------| -| [`getting_started/workflows/orchestration/concurrent_agents.py`](./getting_started/workflows/orchestration/concurrent_agents.py) | Sample: Concurrent fan-out/fan-in (agent-only API) with default aggregator | -| [`getting_started/workflows/orchestration/concurrent_custom_agent_executors.py`](./getting_started/workflows/orchestration/concurrent_custom_agent_executors.py) | Sample: Concurrent Orchestration with Custom Agent Executors | -| [`getting_started/workflows/orchestration/concurrent_custom_aggregator.py`](./getting_started/workflows/orchestration/concurrent_custom_aggregator.py) | Sample: Concurrent Orchestration with Custom Aggregator | -| [`getting_started/workflows/orchestration/group_chat_prompt_based_manager.py`](./getting_started/workflows/orchestration/group_chat_prompt_based_manager.py) | Sample: Group Chat Orchestration with LLM-based manager | -| [`getting_started/workflows/orchestration/group_chat_simple_selector.py`](./getting_started/workflows/orchestration/group_chat_simple_selector.py) | Sample: Group Chat Orchestration with function-based speaker selector | -| [`getting_started/workflows/orchestration/handoff_simple.py`](./getting_started/workflows/orchestration/handoff_simple.py) | Sample: Handoff Orchestration with simple agent handoff pattern | -| [`getting_started/workflows/orchestration/handoff_specialist_to_specialist.py`](./getting_started/workflows/orchestration/handoff_specialist_to_specialist.py) | Sample: Handoff Orchestration with specialist-to-specialist routing | -| [`getting_started/workflows/orchestration/handoff_return_to_previous`](./getting_started/workflows/orchestration/handoff_return_to_previous.py) | Return-to-previous routing: after user input, routes back to the previous specialist instead of coordinator using `.enable_return_to_previous()` | -| [`getting_started/workflows/orchestration/magentic.py`](./getting_started/workflows/orchestration/magentic.py) | Sample: Magentic Orchestration (agentic task planning with multi-agent execution) | -| [`getting_started/workflows/orchestration/magentic_checkpoint.py`](./getting_started/workflows/orchestration/magentic_checkpoint.py) | Sample: Magentic Orchestration with Checkpointing | -| [`getting_started/workflows/orchestration/magentic_human_plan_update.py`](./getting_started/workflows/orchestration/magentic_human_plan_update.py) | Sample: Magentic Orchestration with Human Plan Review | -| [`getting_started/workflows/orchestration/sequential_agents.py`](./getting_started/workflows/orchestration/sequential_agents.py) | Sample: Sequential workflow (agent-focused API) with shared conversation context | -| [`getting_started/workflows/orchestration/sequential_custom_executors.py`](./getting_started/workflows/orchestration/sequential_custom_executors.py) | Sample: Sequential workflow mixing agents and a custom summarizer executor | - -### Parallelism - -| File | Description | -|------|-------------| -| [`getting_started/workflows/parallelism/aggregate_results_of_different_types.py`](./getting_started/workflows/parallelism/aggregate_results_of_different_types.py) | Sample: Concurrent fan out and fan in with two different tasks that output results of different types | -| [`getting_started/workflows/parallelism/fan_out_fan_in_edges.py`](./getting_started/workflows/parallelism/fan_out_fan_in_edges.py) | Sample: Concurrent fan out and fan in with three domain agents | -| [`getting_started/workflows/parallelism/map_reduce_and_visualization.py`](./getting_started/workflows/parallelism/map_reduce_and_visualization.py) | Sample: Map reduce word count with fan out and fan in over file backed intermediate results | - -### State Management - -| File | Description | -|------|-------------| -| [`getting_started/workflows/state-management/shared_states_with_agents.py`](./getting_started/workflows/state-management/shared_states_with_agents.py) | Sample: Shared state with agents and conditional routing | - -### Visualization - -| File | Description | -|------|-------------| -| [`getting_started/workflows/visualization/concurrent_with_visualization.py`](./getting_started/workflows/visualization/concurrent_with_visualization.py) | Sample: Concurrent (Fan-out/Fan-in) with Agents + Visualization | +View the list of Workflows samples [here](./getting_started/workflows/README.md). ## Sample Guidelines diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index c7d1622577..4dbeeb6071 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -92,7 +92,8 @@ For observability samples in Agent Framework, see the [observability getting sta | Concurrent Orchestration (Default Aggregator) | [orchestration/concurrent_agents.py](./orchestration/concurrent_agents.py) | Fan-out to multiple agents; fan-in with default aggregator returning combined ChatMessages | | Concurrent Orchestration (Custom Aggregator) | [orchestration/concurrent_custom_aggregator.py](./orchestration/concurrent_custom_aggregator.py) | Override aggregator via callback; summarize results with an LLM | | Concurrent Orchestration (Custom Agent Executors) | [orchestration/concurrent_custom_agent_executors.py](./orchestration/concurrent_custom_agent_executors.py) | Child executors own ChatAgents; concurrent fan-out/fan-in via ConcurrentBuilder | -| Group Chat Orchestration with Prompt Based Manager | [orchestration/group_chat_prompt_based_manager.py](./orchestration/group_chat_prompt_based_manager.py) | LLM Manager-directed conversation using GroupChatBuilder | +| Group Chat with Agent Manager | [orchestration/group_chat_agent_manager.py](./orchestration/group_chat_agent_manager.py) | Agent-based manager using `set_manager()` to select next speaker | +| Group Chat Philosophical Debate | [orchestration/group_chat_philosophical_debate.py](./orchestration/group_chat_philosophical_debate.py) | Agent manager moderates long-form, multi-round debate across diverse participants | | Group Chat with Simple Function Selector | [orchestration/group_chat_simple_selector.py](./orchestration/group_chat_simple_selector.py) | Group chat with a simple function selector for next speaker | | Handoff (Simple) | [orchestration/handoff_simple.py](./orchestration/handoff_simple.py) | Single-tier routing: triage agent routes to specialists, control returns to user after each specialist response | | Handoff (Specialist-to-Specialist) | [orchestration/handoff_specialist_to_specialist.py](./orchestration/handoff_specialist_to_specialist.py) | Multi-tier routing: specialists can hand off to other specialists using `.add_handoff()` fluent API | diff --git a/python/samples/getting_started/workflows/orchestration/group_chat_agent_manager.py b/python/samples/getting_started/workflows/orchestration/group_chat_agent_manager.py new file mode 100644 index 0000000000..3bc79fcddc --- /dev/null +++ b/python/samples/getting_started/workflows/orchestration/group_chat_agent_manager.py @@ -0,0 +1,112 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging +from typing import cast + +from agent_framework import ( + AgentRunUpdateEvent, + ChatAgent, + ChatMessage, + GroupChatBuilder, + Role, + WorkflowOutputEvent, +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + +logging.basicConfig(level=logging.INFO) + +""" +Sample: Group Chat with Agent-Based Manager + +What it does: +- Demonstrates the new set_manager() API for agent-based coordination +- Manager is a full ChatAgent with access to tools, context, and observability +- Coordinates a researcher and writer agent to solve tasks collaboratively + +Prerequisites: +- OpenAI environment variables configured for OpenAIChatClient +""" + + +def _get_chat_client() -> AzureOpenAIChatClient: + return AzureOpenAIChatClient(credential=AzureCliCredential()) + + +async def main() -> None: + # Create coordinator agent with structured output for speaker selection + # Note: response_format is enforced to ManagerSelectionResponse by set_manager() + coordinator = ChatAgent( + name="Coordinator", + description="Coordinates multi-agent collaboration by selecting speakers", + instructions=""" +You coordinate a team conversation to solve the user's task. + +Review the conversation history and select the next participant to speak. + +Guidelines: +- Start with Researcher to gather information +- Then have Writer synthesize the final answer +- Only finish after both have contributed meaningfully +- Allow for multiple rounds of information gathering if needed +""", + chat_client=_get_chat_client(), + ) + + researcher = ChatAgent( + name="Researcher", + description="Collects relevant background information", + instructions="Gather concise facts that help a teammate answer the question.", + chat_client=_get_chat_client(), + ) + + writer = ChatAgent( + name="Writer", + description="Synthesizes polished answers from gathered information", + instructions="Compose clear and structured answers using any notes provided.", + chat_client=_get_chat_client(), + ) + + workflow = ( + GroupChatBuilder() + .set_manager(coordinator, display_name="Orchestrator") + .with_termination_condition(lambda messages: sum(1 for msg in messages if msg.role == Role.ASSISTANT) >= 2) + .participants([researcher, writer]) + .build() + ) + + task = "What are the key benefits of using async/await in Python? Provide a concise summary." + + print("\nStarting Group Chat with Agent-Based Manager...\n") + print(f"TASK: {task}\n") + print("=" * 80) + + final_conversation: list[ChatMessage] = [] + last_executor_id: str | None = None + async for event in workflow.run_stream(task): + if isinstance(event, AgentRunUpdateEvent): + eid = event.executor_id + if eid != last_executor_id: + if last_executor_id is not None: + print() + print(f"{eid}:", end=" ", flush=True) + last_executor_id = eid + print(event.data, end="", flush=True) + elif isinstance(event, WorkflowOutputEvent): + final_conversation = cast(list[ChatMessage], event.data) + + if final_conversation and isinstance(final_conversation, list): + print("\n\n" + "=" * 80) + print("FINAL CONVERSATION") + print("=" * 80) + for msg in final_conversation: + author = getattr(msg, "author_name", "Unknown") + text = getattr(msg, "text", str(msg)) + print(f"\n[{author}]") + print(text) + print("-" * 80) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/orchestration/group_chat_philosophical_debate.py b/python/samples/getting_started/workflows/orchestration/group_chat_philosophical_debate.py new file mode 100644 index 0000000000..7059a84e32 --- /dev/null +++ b/python/samples/getting_started/workflows/orchestration/group_chat_philosophical_debate.py @@ -0,0 +1,408 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging +from typing import cast + +from agent_framework import ( + AgentRunUpdateEvent, + ChatAgent, + ChatMessage, + GroupChatBuilder, + Role, + WorkflowOutputEvent, +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + +logging.basicConfig(level=logging.WARNING) + +""" +Sample: Philosophical Debate with Agent-Based Manager + +What it does: +- Creates a diverse group of agents representing different global perspectives +- Uses an agent-based manager to guide a philosophical discussion +- Demonstrates longer, multi-round discourse with natural conversation flow +- Manager decides when discussion has reached meaningful conclusion + +Topic: "What does a good life mean to you personally?" + +Participants represent: +- Farmer from Southeast Asia (tradition, sustainability, land connection) +- Software Developer from United States (innovation, technology, work-life balance) +- History Teacher from Eastern Europe (legacy, learning, cultural continuity) +- Activist from South America (social justice, environmental rights) +- Spiritual Leader from Middle East (morality, community service) +- Artist from Africa (creative expression, storytelling) +- Immigrant Entrepreneur from Asia in Canada (tradition + adaptation) +- Doctor from Scandinavia (public health, equity, societal support) + +Prerequisites: +- OpenAI environment variables configured for OpenAIChatClient +""" + + +def _get_chat_client() -> AzureOpenAIChatClient: + return AzureOpenAIChatClient(credential=AzureCliCredential()) + + +async def main() -> None: + # Create debate moderator with structured output for speaker selection + # Note: Participant names and descriptions are automatically injected by the orchestrator + moderator = ChatAgent( + name="Moderator", + description="Guides philosophical discussion by selecting next speaker", + instructions=""" +You are a thoughtful moderator guiding a philosophical discussion on the topic handed to you by the user. + +Your participants bring diverse global perspectives. Select speakers strategically to: +- Create natural conversation flow and responses to previous points +- Ensure all voices are heard throughout the discussion +- Build on themes and contrasts that emerge +- Allow for respectful challenges and counterpoints +- Guide toward meaningful conclusions + +Select speakers who can: +1. Respond directly to points just made +2. Introduce fresh perspectives when needed +3. Bridge or contrast different viewpoints +4. Deepen the philosophical exploration + +Finish when: +- Multiple rounds have occurred (at least 6-8 exchanges) +- Key themes have been explored from different angles +- Natural conclusion or synthesis has emerged +- Diminishing returns in new insights + +In your final_message, provide a brief synthesis highlighting key themes that emerged. +""", + chat_client=_get_chat_client(), + ) + + farmer = ChatAgent( + name="Farmer", + description="A rural farmer from Southeast Asia", + instructions=""" +You're a farmer from Southeast Asia. Your life is deeply connected to land and family. +You value tradition and sustainability. You are in a philosophical debate. + +Share your perspective authentically. Feel free to: +- Challenge other participants respectfully +- Build on points others have made +- Use concrete examples from your experience +- Keep responses thoughtful but concise (2-4 sentences) +""", + chat_client=_get_chat_client(), + ) + + developer = ChatAgent( + name="Developer", + description="An urban software developer from the United States", + instructions=""" +You're a software developer from the United States. Your life is fast-paced and technology-driven. +You value innovation, freedom, and work-life balance. You are in a philosophical debate. + +Share your perspective authentically. Feel free to: +- Challenge other participants respectfully +- Build on points others have made +- Use concrete examples from your experience +- Keep responses thoughtful but concise (2-4 sentences) +""", + chat_client=_get_chat_client(), + ) + + teacher = ChatAgent( + name="Teacher", + description="A retired history teacher from Eastern Europe", + instructions=""" +You're a retired history teacher from Eastern Europe. You bring historical and philosophical +perspectives to discussions. You value legacy, learning, and cultural continuity. +You are in a philosophical debate. + +Share your perspective authentically. Feel free to: +- Challenge other participants respectfully +- Build on points others have made +- Use concrete examples from history or your teaching experience +- Keep responses thoughtful but concise (2-4 sentences) +""", + chat_client=_get_chat_client(), + ) + + activist = ChatAgent( + name="Activist", + description="A young activist from South America", + instructions=""" +You're a young activist from South America. You focus on social justice, environmental rights, +and generational change. You are in a philosophical debate. + +Share your perspective authentically. Feel free to: +- Challenge other participants respectfully +- Build on points others have made +- Use concrete examples from your activism +- Keep responses thoughtful but concise (2-4 sentences) +""", + chat_client=_get_chat_client(), + ) + + spiritual_leader = ChatAgent( + name="SpiritualLeader", + description="A spiritual leader from the Middle East", + instructions=""" +You're a spiritual leader from the Middle East. You provide insights grounded in religion, +morality, and community service. You are in a philosophical debate. + +Share your perspective authentically. Feel free to: +- Challenge other participants respectfully +- Build on points others have made +- Use examples from spiritual teachings or community work +- Keep responses thoughtful but concise (2-4 sentences) +""", + chat_client=_get_chat_client(), + ) + + artist = ChatAgent( + name="Artist", + description="An artist from Africa", + instructions=""" +You're an artist from Africa. You view life through creative expression, storytelling, +and collective memory. You are in a philosophical debate. + +Share your perspective authentically. Feel free to: +- Challenge other participants respectfully +- Build on points others have made +- Use examples from your art or cultural traditions +- Keep responses thoughtful but concise (2-4 sentences) +""", + chat_client=_get_chat_client(), + ) + + immigrant = ChatAgent( + name="Immigrant", + description="An immigrant entrepreneur from Asia living in Canada", + instructions=""" +You're an immigrant entrepreneur from Asia living in Canada. You balance tradition with adaptation. +You focus on family success, risk, and opportunity. You are in a philosophical debate. + +Share your perspective authentically. Feel free to: +- Challenge other participants respectfully +- Build on points others have made +- Use examples from your immigrant and entrepreneurial journey +- Keep responses thoughtful but concise (2-4 sentences) +""", + chat_client=_get_chat_client(), + ) + + doctor = ChatAgent( + name="Doctor", + description="A doctor from Scandinavia", + instructions=""" +You're a doctor from Scandinavia. Your perspective is shaped by public health, equity, +and structured societal support. You are in a philosophical debate. + +Share your perspective authentically. Feel free to: +- Challenge other participants respectfully +- Build on points others have made +- Use examples from healthcare and societal systems +- Keep responses thoughtful but concise (2-4 sentences) +""", + chat_client=_get_chat_client(), + ) + + workflow = ( + GroupChatBuilder() + .set_manager(moderator, display_name="Moderator") + .participants([farmer, developer, teacher, activist, spiritual_leader, artist, immigrant, doctor]) + .with_termination_condition(lambda messages: sum(1 for msg in messages if msg.role == Role.ASSISTANT) >= 10) + .build() + ) + + topic = "What does a good life mean to you personally?" + + print("\n" + "=" * 80) + print("PHILOSOPHICAL DEBATE: Perspectives on a Good Life") + print("=" * 80) + print(f"\nTopic: {topic}") + print("\nParticipants:") + print(" - Farmer (Southeast Asia)") + print(" - Developer (United States)") + print(" - Teacher (Eastern Europe)") + print(" - Activist (South America)") + print(" - SpiritualLeader (Middle East)") + print(" - Artist (Africa)") + print(" - Immigrant (Asia → Canada)") + print(" - Doctor (Scandinavia)") + print("\n" + "=" * 80) + print("DISCUSSION BEGINS") + print("=" * 80 + "\n") + + final_conversation: list[ChatMessage] = [] + current_speaker: str | None = None + + async for event in workflow.run_stream(f"Please begin the discussion on: {topic}"): + if isinstance(event, AgentRunUpdateEvent): + speaker_id = event.executor_id.replace("groupchat_agent:", "") + + if speaker_id != current_speaker: + if current_speaker is not None: + print("\n") + print(f"[{speaker_id}]", flush=True) + current_speaker = speaker_id + + print(event.data, end="", flush=True) + + elif isinstance(event, WorkflowOutputEvent): + final_conversation = cast(list[ChatMessage], event.data) + + print("\n\n" + "=" * 80) + print("DISCUSSION SUMMARY") + print("=" * 80) + + if final_conversation and isinstance(final_conversation, list) and final_conversation: + final_msg = final_conversation[-1] + if hasattr(final_msg, "author_name") and final_msg.author_name == "Moderator": + print(f"\n{final_msg.text}") + + """ + Sample Output: + + ================================================================================ + PHILOSOPHICAL DEBATE: Perspectives on a Good Life + ================================================================================ + + Topic: What does a good life mean to you personally? + + Participants: + - Farmer (Southeast Asia) + - Developer (United States) + - Teacher (Eastern Europe) + - Activist (South America) + - SpiritualLeader (Middle East) + - Artist (Africa) + - Immigrant (Asia → Canada) + - Doctor (Scandinavia) + + ================================================================================ + DISCUSSION BEGINS + ================================================================================ + + [Moderator] + {"selected_participant":"Farmer","instruction":"Please start by sharing what living a good life means to you, + especially from your perspective living in a rural area in Southeast Asia.","finish":false,"final_message":null} + + [Farmer] + To me, a good life is deeply intertwined with the rhythm of the land and the nurturing of relationships with my + family and community. It means cultivating crops that respect our environment, ensuring sustainability for future + generations, and sharing meals made from our harvests around the dinner table. The joy found in everyday + tasks—planting rice or tending to our livestock—creates a sense of fulfillment that cannot be measured by material + wealth. It's the simple moments, like sharing stories with my children under the stars, that truly define a good + life. What good is progress if it isolates us from those we love and the land that sustains us? + + [Moderator] + {"selected_participant":"Developer","instruction":"Given the insights shared by the Farmer, please discuss what a + good life means to you as a software developer in an urban setting in the United States and how it might contrast + with or complement the Farmer's view.","finish":false,"final_message":null} + + [Developer] + As a software developer in an urban environment, a good life for me hinges on the intersection of innovation, + creativity, and balance. It's about having the freedom to explore new technologies that can solve real-world + problems while ensuring that my work doesn't encroach on my personal life. For instance, I value remote work + flexibility, which allows me to maintain connections with family and friends, similar to how the Farmer values + community. While our lifestyles may differ markedly, both of us seek fulfillment—whether through meaningful work or + rich personal experiences. The challenge is finding harmony between technological progress and preserving the + intimate human connections that truly enrich our lives. + + [Moderator] + {"selected_participant":"SpiritualLeader","instruction":"Reflect on both the Farmer's and Developer's perspectives + and share your view of what constitutes a good life, particularly from your spiritual and cultural standpoint in + the Middle East.","finish":false,"final_message":null} + + [SpiritualLeader] + From my spiritual perspective, a good life embodies a balance between personal fulfillment and service to others, + rooted in compassion and community. In our teachings, we emphasize that true happiness comes from helping those in + need and fostering strong connections with our families and neighbors. Whether it's the Farmer nurturing the earth + or the Developer creating tools to enhance lives, both contribute to the greater good. The essence of a good life + lies in our intentions and actions—finding ways to serve our communities, spread kindness, and live harmoniously + with those around us. Ultimately, as we align our personal beliefs with our communal responsibilities, we cultivate + a richness that transcends material wealth. + + [Moderator] + {"selected_participant":"Activist","instruction":"Add to the discussion by sharing your perspective on what a good + life entails, particularly from your background as a young activist in South America.","finish":false, + "final_message":null} + + [Activist] + As a young activist in South America, a good life for me is about advocating for social justice and environmental + sustainability. It means living in a society where everyone's rights are respected and where marginalized voices, + particularly those of Indigenous communities, are amplified. I see a good life as one where we work collectively to + dismantle oppressive systems—such as deforestation and inequality—while nurturing our planet. For instance, through + my activism, I've witnessed the transformative power of community organizing, where collective efforts lead to real + change, like resisting destructive mining practices that threaten our rivers and lands. A good life, therefore, is + not just lived for oneself but is deeply tied to the well-being of our communities and the health of our + environment. How can we, regardless of our backgrounds, collaborate to foster these essential changes? + + [Moderator] + {"selected_participant":"Teacher","instruction":"Considering the views shared so far, tell us how your experience + as a retired history teacher from Eastern Europe shapes your understanding of a good life, perhaps reflecting on + lessons from the past and their impact on present-day life choices.","finish":false,"final_message":null} + + [Teacher] + As a retired history teacher from Eastern Europe, my understanding of a good life is deeply rooted in the lessons + drawn from history and the struggle for freedom and dignity. Historical events, such as the fall of the Iron + Curtain, remind us of the profound importance of liberty and collective resilience. A good life, therefore, is about + cherishing our freedoms and working towards a society where everyone has a voice, much as my students and I + discussed the impacts of totalitarian regimes. Additionally, I believe it involves fostering cultural continuity, + where we honor our heritage while embracing progressive values. We must learn from the past—especially the + consequences of neglecting empathy and solidarity—so that we can cultivate a future that values every individual's + contributions to the rich tapestry of our shared humanity. How can we ensure that the lessons of history inform a + more compassionate and just society moving forward? + + [Moderator] + {"selected_participant":"Artist","instruction":"Expound on the themes and perspectives discussed so far by sharing + how, as an artist from Africa, you define a good life and how art plays a role in that vision.","finish":false, + "final_message":null} + + [Artist] + As an artist from Africa, I define a good life as one steeped in cultural expression, storytelling, and the + celebration of our collective memories. Art is a powerful medium through which we capture our histories, struggles, + and triumphs, creating a tapestry that connects generations. For instance, in my work, I often draw from folktales + and traditional music, weaving narratives that reflect the human experience, much like how the retired teacher + emphasizes learning from history. A good life involves not only personal fulfillment but also the responsibility to + share our narratives and use our creativity to inspire change, whether addressing social injustices or environmental + issues. It's in this interplay of art and activism that we can transcend individual existence and contribute to a + collective good, fostering empathy and understanding among diverse communities. How can we harness art to bridge + differences and amplify marginalized voices in our pursuit of a good life? + + [Moderator] + {"selected_participant":null,"instruction":null,"finish":true,"final_message":"As our discussion unfolds, several + key themes have gracefully emerged, reflecting the richness of diverse perspectives on what constitutes a good life. + From the rural farmer's integration with the land to the developer's search for balance between technology and + personal connection, each viewpoint validates that fulfillment, at its core, transcends material wealth. The + spiritual leader and the activist highlight the importance of community and social justice, while the history + teacher and the artist remind us of the lessons and narratives that shape our cultural and personal identities. + + Ultimately, the good life seems to revolve around meaningful relationships, honoring our legacies while striving for + progress, and nurturing both our inner selves and external communities. This dialogue demonstrates that despite our + varied backgrounds and experiences, the quest for a good life binds us together, urging cooperation and empathy in + our shared human journey."} + + ================================================================================ + DISCUSSION SUMMARY + ================================================================================ + + As our discussion unfolds, several key themes have gracefully emerged, reflecting the richness of diverse + perspectives on what constitutes a good life. From the rural farmer's integration with the land to the developer's + search for balance between technology and personal connection, each viewpoint validates that fulfillment, at its + core, transcends material wealth. The spiritual leader and the activist highlight the importance of community and + social justice, while the history teacher and the artist remind us of the lessons and narratives that shape our + cultural and personal identities. + + Ultimately, the good life seems to revolve around meaningful relationships, honoring our legacies while striving for + progress, and nurturing both our inner selves and external communities. This dialogue demonstrates that despite our + varied backgrounds and experiences, the quest for a good life binds us together, urging cooperation and empathy in + our shared human journey. + """ + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/orchestration/group_chat_prompt_based_manager.py b/python/samples/getting_started/workflows/orchestration/group_chat_prompt_based_manager.py deleted file mode 100644 index 6a6d3a5e22..0000000000 --- a/python/samples/getting_started/workflows/orchestration/group_chat_prompt_based_manager.py +++ /dev/null @@ -1,75 +0,0 @@ -# Copyright (c) Microsoft. All rights reserved. - -import asyncio -import logging - -from agent_framework import AgentRunUpdateEvent, ChatAgent, GroupChatBuilder, WorkflowOutputEvent -from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient - -logging.basicConfig(level=logging.INFO) - -""" -Sample: Group Chat Orchestration (manager-directed) - -What it does: -- Demonstrates the generic GroupChatBuilder with a language-model manager directing two agents. -- The manager coordinates a researcher (chat completions) and a writer (responses API) to solve a task. -- Uses the default group chat orchestration pipeline shared with Magentic. - -Prerequisites: -- OpenAI environment variables configured for `OpenAIChatClient` and `OpenAIResponsesClient`. -""" - - -async def main() -> None: - researcher = ChatAgent( - name="Researcher", - description="Collects relevant background information.", - instructions="Gather concise facts that help a teammate answer the question.", - chat_client=OpenAIChatClient(model_id="gpt-4o-mini"), - ) - - writer = ChatAgent( - name="Writer", - description="Synthesizes a polished answer using the gathered notes.", - instructions="Compose clear and structured answers using any notes provided.", - chat_client=OpenAIResponsesClient(), - ) - - workflow = ( - GroupChatBuilder() - .set_prompt_based_manager(chat_client=OpenAIChatClient(), display_name="Coordinator") - .participants(researcher=researcher, writer=writer) - .build() - ) - - task = "Outline the core considerations for planning a community hackathon, and finish with a concise action plan." - - print("\nStarting Group Chat Workflow...\n") - print(f"TASK: {task}\n") - - final_response = None - last_executor_id: str | None = None - async for event in workflow.run_stream(task): - if isinstance(event, AgentRunUpdateEvent): - # Handle the streaming agent update as it's produced - eid = event.executor_id - if eid != last_executor_id: - if last_executor_id is not None: - print() - print(f"{eid}:", end=" ", flush=True) - last_executor_id = eid - print(event.data, end="", flush=True) - elif isinstance(event, WorkflowOutputEvent): - final_response = getattr(event.data, "text", str(event.data)) - - if final_response: - print("=" * 60) - print("FINAL RESPONSE") - print("=" * 60) - print(final_response) - print("=" * 60) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/orchestration/group_chat_simple_selector.py b/python/samples/getting_started/workflows/orchestration/group_chat_simple_selector.py index ba4d16accb..1fd074ca4d 100644 --- a/python/samples/getting_started/workflows/orchestration/group_chat_simple_selector.py +++ b/python/samples/getting_started/workflows/orchestration/group_chat_simple_selector.py @@ -2,8 +2,9 @@ import asyncio import logging +from typing import cast -from agent_framework import ChatAgent, GroupChatBuilder, GroupChatStateSnapshot, WorkflowOutputEvent +from agent_framework import ChatAgent, ChatMessage, GroupChatBuilder, GroupChatStateSnapshot, WorkflowOutputEvent from agent_framework.openai import OpenAIChatClient logging.basicConfig(level=logging.INFO) @@ -12,7 +13,7 @@ Sample: Group Chat with Simple Speaker Selector Function What it does: -- Demonstrates the select_speakers() API for GroupChat orchestration +- Demonstrates the set_select_speakers_func() API for GroupChat orchestration - Uses a pure Python function to control speaker selection based on conversation state - Alternates between researcher and writer agents in a simple round-robin pattern - Shows how to access conversation history, round index, and participant metadata @@ -84,7 +85,7 @@ async def main() -> None: # 2. Dict form - explicit names: .participants(researcher=researcher, writer=writer) workflow = ( GroupChatBuilder() - .select_speakers(select_next_speaker, display_name="Orchestrator") + .set_select_speakers_func(select_next_speaker, display_name="Orchestrator") .participants([researcher, writer]) # Uses agent.name for participant names .build() ) @@ -97,11 +98,14 @@ async def main() -> None: async for event in workflow.run_stream(task): if isinstance(event, WorkflowOutputEvent): - final_message = event.data - author = getattr(final_message, "author_name", "Unknown") - text = getattr(final_message, "text", str(final_message)) - print(f"\n[{author}]\n{text}\n") - print("-" * 80) + conversation = cast(list[ChatMessage], event.data) + if isinstance(conversation, list): + print("\n===== Final Conversation =====\n") + for msg in conversation: + author = getattr(msg, "author_name", "Unknown") + text = getattr(msg, "text", str(msg)) + print(f"[{author}]\n{text}\n") + print("-" * 80) print("\nWorkflow completed.") diff --git a/python/samples/getting_started/workflows/orchestration/magentic.py b/python/samples/getting_started/workflows/orchestration/magentic.py index 5010172e2b..0e265cb931 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic.py +++ b/python/samples/getting_started/workflows/orchestration/magentic.py @@ -2,20 +2,21 @@ import asyncio import logging +from typing import cast from agent_framework import ( + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, + AgentRunUpdateEvent, ChatAgent, + ChatMessage, HostedCodeInterpreterTool, - MagenticAgentDeltaEvent, - MagenticAgentMessageEvent, MagenticBuilder, - MagenticFinalResultEvent, - MagenticOrchestratorMessageEvent, WorkflowOutputEvent, ) from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient -logging.basicConfig(level=logging.DEBUG) +logging.basicConfig(level=logging.WARNING) logger = logging.getLogger(__name__) """ @@ -97,35 +98,30 @@ async def main() -> None: try: output: str | None = None async for event in workflow.run_stream(task): - if isinstance(event, MagenticOrchestratorMessageEvent): - print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}") - elif isinstance(event, MagenticAgentDeltaEvent): - if last_stream_agent_id != event.agent_id or not stream_line_open: - if stream_line_open: - print() - print(f"\n[STREAM:{event.agent_id}]: ", end="", flush=True) - last_stream_agent_id = event.agent_id - stream_line_open = True - if event.text: - print(event.text, end="", flush=True) - elif isinstance(event, MagenticAgentMessageEvent): - if stream_line_open: - print(" (final)") - stream_line_open = False - print() - msg = event.message - if msg is not None: - response_text = (msg.text or "").replace("\n", " ") - print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}") - elif isinstance(event, MagenticFinalResultEvent): - print("\n" + "=" * 50) - print("FINAL RESULT:") - print("=" * 50) - if event.message is not None: - print(event.message.text) - print("=" * 50) + if isinstance(event, AgentRunUpdateEvent): + props = event.data.additional_properties if event.data else None + event_type = props.get("magentic_event_type") if props else None + + if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR: + kind = props.get("orchestrator_message_kind", "") if props else "" + text = event.data.text if event.data else "" + print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}") + elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA: + agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id + if last_stream_agent_id != agent_id or not stream_line_open: + if stream_line_open: + print() + print(f"\n[STREAM:{agent_id}]: ", end="", flush=True) + last_stream_agent_id = agent_id + stream_line_open = True + if event.data and event.data.text: + print(event.data.text, end="", flush=True) + elif event.data and event.data.text: + print(event.data.text, end="", flush=True) elif isinstance(event, WorkflowOutputEvent): - output = str(event.data) if event.data is not None else None + output_messages = cast(list[ChatMessage], event.data) + if output_messages: + output = output_messages[-1].text if stream_line_open: print() diff --git a/python/samples/semantic-kernel-migration/orchestrations/group_chat.py b/python/samples/semantic-kernel-migration/orchestrations/group_chat.py index 42142b5363..8ea37e8f5e 100644 --- a/python/samples/semantic-kernel-migration/orchestrations/group_chat.py +++ b/python/samples/semantic-kernel-migration/orchestrations/group_chat.py @@ -233,8 +233,8 @@ async def run_agent_framework_example(task: str) -> str: workflow = ( GroupChatBuilder() - .set_prompt_based_manager( - chat_client=AzureOpenAIChatClient(credential=credential), + .set_manager( + manager=AzureOpenAIChatClient(credential=credential).create_agent(), display_name="Coordinator", ) .participants(researcher=researcher, planner=planner) @@ -245,7 +245,12 @@ async def run_agent_framework_example(task: str) -> str: async for event in workflow.run_stream(task): if isinstance(event, WorkflowOutputEvent): data = event.data - final_response = data.text or "" if isinstance(data, ChatMessage) else str(data) + if isinstance(data, list) and len(data) > 0: + # Get the final message from the conversation + final_message = data[-1] + final_response = final_message.text or "" if isinstance(final_message, ChatMessage) else str(data) + else: + final_response = str(data) return final_response diff --git a/python/uv.lock b/python/uv.lock index 767d0b70a2..87081e2bc5 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -280,7 +280,7 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "agent-framework-core", editable = "packages/core" }, - { name = "openai-chatkit", specifier = ">=1.1.0,<2.0.0" }, + { name = "openai-chatkit", specifier = ">=1.4.0,<2.0.0" }, ] [[package]] @@ -798,7 +798,7 @@ wheels = [ [[package]] name = "anthropic" -version = "0.74.1" +version = "0.75.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, @@ -810,9 +810,9 @@ dependencies = [ { name = "sniffio", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, { name = "typing-extensions", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/d7/7b/609eea5c54ae69b1a4a94169d4b0c86dc5c41b43509989913f6cdc61b81d/anthropic-0.74.1.tar.gz", hash = "sha256:04c087b2751385c524f6d332d066a913870e4de8b3e335fb0a0c595f1f88dc6e", size = 428981, upload-time = "2025-11-19T22:17:31.533Z" } +sdist = { url = "https://files.pythonhosted.org/packages/04/1f/08e95f4b7e2d35205ae5dcbb4ae97e7d477fc521c275c02609e2931ece2d/anthropic-0.75.0.tar.gz", hash = "sha256:e8607422f4ab616db2ea5baacc215dd5f028da99ce2f022e33c7c535b29f3dfb", size = 439565, upload-time = "2025-11-24T20:41:45.28Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/dd/45/6b18d0692302b8cbc01a10c35b43953d3c4172fbd4f83337b8ed21a8eaa4/anthropic-0.74.1-py3-none-any.whl", hash = "sha256:b07b998d1cee7f41d9f02530597d7411672b362cc2417760a40c0167b81c6e65", size = 371473, upload-time = "2025-11-19T22:17:29.998Z" }, + { url = "https://files.pythonhosted.org/packages/60/1c/1cd02b7ae64302a6e06724bf80a96401d5313708651d277b1458504a1730/anthropic-0.75.0-py3-none-any.whl", hash = "sha256:ea8317271b6c15d80225a9f3c670152746e88805a7a61e14d4a374577164965b", size = 388164, upload-time = "2025-11-24T20:41:43.587Z" }, ] [[package]] @@ -1799,7 +1799,7 @@ name = "exceptiongroup" version = "1.3.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "typing-extensions", marker = "(python_full_version < '3.13' and sys_platform == 'darwin') or (python_full_version < '3.13' and sys_platform == 'linux') or (python_full_version < '3.13' and sys_platform == 'win32')" }, + { name = "typing-extensions", marker = "(python_full_version < '3.11' and sys_platform == 'darwin') or (python_full_version < '3.11' and sys_platform == 'linux') or (python_full_version < '3.11' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/50/79/66800aadf48771f6b62f7eb014e352e5d06856655206165d775e675a02c9/exceptiongroup-1.3.1.tar.gz", hash = "sha256:8b412432c6055b0b7d14c310000ae93352ed6754f70fa8f7c34141f91c4e3219", size = 30371, upload-time = "2025-11-21T23:01:54.787Z" } wheels = [ @@ -1817,7 +1817,7 @@ wheels = [ [[package]] name = "fastapi" -version = "0.121.3" +version = "0.122.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "annotated-doc", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, @@ -1825,9 +1825,9 @@ dependencies = [ { name = "starlette", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, { name = "typing-extensions", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/80/f0/086c442c6516195786131b8ca70488c6ef11d2f2e33c9a893576b2b0d3f7/fastapi-0.121.3.tar.gz", hash = "sha256:0055bc24fe53e56a40e9e0ad1ae2baa81622c406e548e501e717634e2dfbc40b", size = 344501, upload-time = "2025-11-19T16:53:39.243Z" } +sdist = { url = "https://files.pythonhosted.org/packages/b2/de/3ee97a4f6ffef1fb70bf20561e4f88531633bb5045dc6cebc0f8471f764d/fastapi-0.122.0.tar.gz", hash = "sha256:cd9b5352031f93773228af8b4c443eedc2ac2aa74b27780387b853c3726fb94b", size = 346436, upload-time = "2025-11-24T19:17:47.95Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/98/b6/4f620d7720fc0a754c8c1b7501d73777f6ba43b57c8ab99671f4d7441eb8/fastapi-0.121.3-py3-none-any.whl", hash = "sha256:0c78fc87587fcd910ca1bbf5bc8ba37b80e119b388a7206b39f0ecc95ebf53e9", size = 109801, upload-time = "2025-11-19T16:53:37.918Z" }, + { url = "https://files.pythonhosted.org/packages/7a/93/aa8072af4ff37b795f6bbf43dcaf61115f40f49935c7dbb180c9afc3f421/fastapi-0.122.0-py3-none-any.whl", hash = "sha256:a456e8915dfc6c8914a50d9651133bd47ec96d331c5b44600baa635538a30d67", size = 110671, upload-time = "2025-11-24T19:17:45.96Z" }, ] [[package]] @@ -3730,17 +3730,18 @@ wheels = [ [[package]] name = "openai-chatkit" -version = "1.3.1" +version = "1.4.0" source = { registry = "https://pypi.org/simple" } dependencies = [ + { name = "jinja2", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, { name = "openai", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, { name = "openai-agents", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, { name = "pydantic", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, { name = "uvicorn", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/f7/81/906844005976afb3c415e27f6506ed0f2b8b1040f27b4bc9ef118a256986/openai_chatkit-1.3.1.tar.gz", hash = "sha256:91f39b04584f969642a6c3b4099fdad74c2a357e25d8f746f9709046304a06cf", size = 50730, upload-time = "2025-11-21T21:22:11.62Z" } +sdist = { url = "https://files.pythonhosted.org/packages/c5/89/bf2f094997c8e5cad5334e8a02e05fc458823e65fb2675f45b56b6d1ab73/openai_chatkit-1.4.0.tar.gz", hash = "sha256:e2527dffc3794a05596ad75efa66bdc4efb4ded5a77a013a55496cc989bcf2e6", size = 55269, upload-time = "2025-11-25T21:02:58.503Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/fb/61/235e5f49bd068bbe5f7278bc7d7c4bd92226858698fcac53ab92090baf04/openai_chatkit-1.3.1-py3-none-any.whl", hash = "sha256:5626492e5752879e66b2b6d4fbac51994407d46429de99b91515a77c2e0c6148", size = 35899, upload-time = "2025-11-21T21:22:10.37Z" }, + { url = "https://files.pythonhosted.org/packages/90/bf/68d42561dd8a674b6f8541d879dd165b5ac4d81fcf1027462e154de66a4f/openai_chatkit-1.4.0-py3-none-any.whl", hash = "sha256:35d00ca8398908bd70d63e2284adcd836641cc11746f68d7cfa91d276e3dad3d", size = 39077, upload-time = "2025-11-25T21:02:57.288Z" }, ] [[package]] @@ -5075,7 +5076,7 @@ wheels = [ [[package]] name = "qdrant-client" -version = "1.16.0" +version = "1.16.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "grpcio", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, @@ -5087,9 +5088,9 @@ dependencies = [ { name = "pydantic", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, { name = "urllib3", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/fa/16/366541897d270ee3f9c3f87da145baa8a5c9cc5190e0e53e8bbec1267cff/qdrant_client-1.16.0.tar.gz", hash = "sha256:0716aa0b7cca39745829c2e8ea0beb275fe2990e743ad803eabd6218e4b35c1b", size = 284128, upload-time = "2025-11-17T13:19:52.726Z" } +sdist = { url = "https://files.pythonhosted.org/packages/d9/68/fec3816a223c0b73b0e0036460be45c61ce2770ffb9197ac371e4f615ddc/qdrant_client-1.16.1.tar.gz", hash = "sha256:676c7c10fd4d4cb2981b8fcb32fd764f5f661b04b7334d024034d07212f971fd", size = 332130, upload-time = "2025-11-25T04:31:54.212Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/a5/ff/3a69bb56835c4b2e9fa780655790937011ac389b0408b9a1147eaa2cee22/qdrant_client-1.16.0-py3-none-any.whl", hash = "sha256:6b932393e84e4c0233e5b2eb96b0918e968725855adae4d9c541761f4c50cf11", size = 328579, upload-time = "2025-11-17T13:19:51.092Z" }, + { url = "https://files.pythonhosted.org/packages/60/e2/60a20d04b0595c641516463168909c5bbcc192d3d6eacb637c1677109c6a/qdrant_client-1.16.1-py3-none-any.whl", hash = "sha256:1eefe89f66e8a468ba0de1680e28b441e69825cfb62e8fb2e457c15e24ce5e3b", size = 378481, upload-time = "2025-11-25T04:31:52.629Z" }, ] [[package]]