Skip to content

Commit a729875

Browse files
authored
Python: Fix WorkflowAgent to emit yield_output as agent response (#2866)
* Fix WorkflowAgent to emit yield_output as agent response * use raw_representation * Raw representation handling
1 parent 0dcebc6 commit a729875

File tree

2 files changed

+180
-3
lines changed

2 files changed

+180
-3
lines changed

python/packages/core/agent_framework/_workflows/_agent.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@
1313
AgentRunResponseUpdate,
1414
AgentThread,
1515
BaseAgent,
16+
BaseContent,
1617
ChatMessage,
18+
Contents,
1719
FunctionApprovalRequestContent,
1820
FunctionApprovalResponseContent,
1921
FunctionCallContent,
2022
FunctionResultContent,
2123
Role,
24+
TextContent,
2225
UsageDetails,
2326
)
2427

@@ -28,6 +31,7 @@
2831
AgentRunUpdateEvent,
2932
RequestInfoEvent,
3033
WorkflowEvent,
34+
WorkflowOutputEvent,
3135
)
3236
from ._message_utils import normalize_messages_input
3337
from ._typing_utils import is_type_compatible
@@ -280,9 +284,8 @@ def _convert_workflow_event_to_agent_update(
280284
) -> AgentRunResponseUpdate | None:
281285
"""Convert a workflow event to an AgentRunResponseUpdate.
282286
283-
Only AgentRunUpdateEvent and RequestInfoEvent are processed.
284-
Other workflow events are ignored as they are workflow-internal and should
285-
have corresponding AgentRunUpdateEvent emissions if relevant to agent consumers.
287+
AgentRunUpdateEvent, RequestInfoEvent, and WorkflowOutputEvent are processed.
288+
Other workflow events are ignored as they are workflow-internal.
286289
"""
287290
match event:
288291
case AgentRunUpdateEvent(data=update):
@@ -291,6 +294,42 @@ def _convert_workflow_event_to_agent_update(
291294
return update
292295
return None
293296

297+
case WorkflowOutputEvent(data=data, source_executor_id=source_executor_id):
298+
# Convert workflow output to an agent response update.
299+
# Handle different data types appropriately.
300+
if isinstance(data, AgentRunResponseUpdate):
301+
# Already an update, pass through
302+
return data
303+
if isinstance(data, ChatMessage):
304+
# Convert ChatMessage to update
305+
return AgentRunResponseUpdate(
306+
contents=list(data.contents),
307+
role=data.role,
308+
author_name=data.author_name or source_executor_id,
309+
response_id=response_id,
310+
message_id=str(uuid.uuid4()),
311+
created_at=datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
312+
raw_representation=data,
313+
)
314+
# Determine contents based on data type
315+
if isinstance(data, BaseContent):
316+
# Already a content type (TextContent, ImageContent, etc.)
317+
contents: list[Contents] = [cast(Contents, data)]
318+
elif isinstance(data, str):
319+
contents = [TextContent(text=data)]
320+
else:
321+
# Fallback: convert to string representation
322+
contents = [TextContent(text=str(data))]
323+
return AgentRunResponseUpdate(
324+
contents=contents,
325+
role=Role.ASSISTANT,
326+
author_name=source_executor_id,
327+
response_id=response_id,
328+
message_id=str(uuid.uuid4()),
329+
created_at=datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
330+
raw_representation=data,
331+
)
332+
294333
case RequestInfoEvent(request_id=request_id):
295334
# Store the pending request for later correlation
296335
self.pending_requests[request_id] = event

python/packages/core/tests/workflow/test_workflow_agent.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,20 @@
1212
AgentThread,
1313
ChatMessage,
1414
ChatMessageStore,
15+
DataContent,
1516
Executor,
1617
FunctionApprovalRequestContent,
1718
FunctionApprovalResponseContent,
1819
FunctionCallContent,
1920
Role,
2021
TextContent,
22+
UriContent,
2123
UsageContent,
2224
UsageDetails,
2325
WorkflowAgent,
2426
WorkflowBuilder,
2527
WorkflowContext,
28+
executor,
2629
handler,
2730
response_handler,
2831
)
@@ -284,6 +287,141 @@ async def handle_bool(self, message: bool, context: WorkflowContext[Any]) -> Non
284287
with pytest.raises(ValueError, match="Workflow's start executor cannot handle list\\[ChatMessage\\]"):
285288
workflow.as_agent()
286289

290+
async def test_workflow_as_agent_yield_output_surfaces_as_agent_response(self) -> None:
291+
"""Test that ctx.yield_output() in a workflow executor surfaces as agent output when using .as_agent().
292+
293+
This validates the fix for issue #2813: WorkflowOutputEvent should be converted to
294+
AgentRunResponseUpdate when the workflow is wrapped via .as_agent().
295+
"""
296+
297+
@executor
298+
async def yielding_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None:
299+
# Extract text from input for demonstration
300+
input_text = messages[0].text if messages else "no input"
301+
await ctx.yield_output(f"processed: {input_text}")
302+
303+
workflow = WorkflowBuilder().set_start_executor(yielding_executor).build()
304+
305+
# Run directly - should return WorkflowOutputEvent in result
306+
direct_result = await workflow.run([ChatMessage(role=Role.USER, contents=[TextContent(text="hello")])])
307+
direct_outputs = direct_result.get_outputs()
308+
assert len(direct_outputs) == 1
309+
assert direct_outputs[0] == "processed: hello"
310+
311+
# Run as agent - yield_output should surface as agent response message
312+
agent = workflow.as_agent("test-agent")
313+
agent_result = await agent.run("hello")
314+
315+
assert isinstance(agent_result, AgentRunResponse)
316+
assert len(agent_result.messages) == 1
317+
assert agent_result.messages[0].text == "processed: hello"
318+
319+
async def test_workflow_as_agent_yield_output_surfaces_in_run_stream(self) -> None:
320+
"""Test that ctx.yield_output() surfaces as AgentRunResponseUpdate when streaming."""
321+
322+
@executor
323+
async def yielding_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None:
324+
await ctx.yield_output("first output")
325+
await ctx.yield_output("second output")
326+
327+
workflow = WorkflowBuilder().set_start_executor(yielding_executor).build()
328+
agent = workflow.as_agent("test-agent")
329+
330+
updates: list[AgentRunResponseUpdate] = []
331+
async for update in agent.run_stream("hello"):
332+
updates.append(update)
333+
334+
# Should have received updates for both yield_output calls
335+
texts = [u.text for u in updates if u.text]
336+
assert "first output" in texts
337+
assert "second output" in texts
338+
339+
async def test_workflow_as_agent_yield_output_with_content_types(self) -> None:
340+
"""Test that yield_output preserves different content types (TextContent, DataContent, etc.)."""
341+
342+
@executor
343+
async def content_yielding_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None:
344+
# Yield different content types
345+
await ctx.yield_output(TextContent(text="text content"))
346+
await ctx.yield_output(DataContent(data=b"binary data", media_type="application/octet-stream"))
347+
await ctx.yield_output(UriContent(uri="https://example.com/image.png", media_type="image/png"))
348+
349+
workflow = WorkflowBuilder().set_start_executor(content_yielding_executor).build()
350+
agent = workflow.as_agent("content-test-agent")
351+
352+
result = await agent.run("test")
353+
354+
assert isinstance(result, AgentRunResponse)
355+
assert len(result.messages) == 3
356+
357+
# Verify each content type is preserved
358+
assert isinstance(result.messages[0].contents[0], TextContent)
359+
assert result.messages[0].contents[0].text == "text content"
360+
361+
assert isinstance(result.messages[1].contents[0], DataContent)
362+
assert result.messages[1].contents[0].media_type == "application/octet-stream"
363+
364+
assert isinstance(result.messages[2].contents[0], UriContent)
365+
assert result.messages[2].contents[0].uri == "https://example.com/image.png"
366+
367+
async def test_workflow_as_agent_yield_output_with_chat_message(self) -> None:
368+
"""Test that yield_output with ChatMessage preserves the message structure."""
369+
370+
@executor
371+
async def chat_message_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None:
372+
msg = ChatMessage(
373+
role=Role.ASSISTANT,
374+
contents=[TextContent(text="response text")],
375+
author_name="custom-author",
376+
)
377+
await ctx.yield_output(msg)
378+
379+
workflow = WorkflowBuilder().set_start_executor(chat_message_executor).build()
380+
agent = workflow.as_agent("chat-msg-agent")
381+
382+
result = await agent.run("test")
383+
384+
assert len(result.messages) == 1
385+
assert result.messages[0].role == Role.ASSISTANT
386+
assert result.messages[0].text == "response text"
387+
assert result.messages[0].author_name == "custom-author"
388+
389+
async def test_workflow_as_agent_yield_output_sets_raw_representation(self) -> None:
390+
"""Test that yield_output sets raw_representation with the original data."""
391+
392+
# A custom object to verify raw_representation preserves the original data
393+
class CustomData:
394+
def __init__(self, value: int):
395+
self.value = value
396+
397+
def __str__(self) -> str:
398+
return f"CustomData({self.value})"
399+
400+
@executor
401+
async def raw_yielding_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None:
402+
# Yield different types of data
403+
await ctx.yield_output("simple string")
404+
await ctx.yield_output(TextContent(text="text content"))
405+
custom = CustomData(42)
406+
await ctx.yield_output(custom)
407+
408+
workflow = WorkflowBuilder().set_start_executor(raw_yielding_executor).build()
409+
agent = workflow.as_agent("raw-test-agent")
410+
411+
updates: list[AgentRunResponseUpdate] = []
412+
async for update in agent.run_stream("test"):
413+
updates.append(update)
414+
415+
# Should have 3 updates
416+
assert len(updates) == 3
417+
418+
# Verify raw_representation is set for each update
419+
assert updates[0].raw_representation == "simple string"
420+
assert isinstance(updates[1].raw_representation, TextContent)
421+
assert updates[1].raw_representation.text == "text content"
422+
assert isinstance(updates[2].raw_representation, CustomData)
423+
assert updates[2].raw_representation.value == 42
424+
287425
async def test_thread_conversation_history_included_in_workflow_run(self) -> None:
288426
"""Test that conversation history from thread is included when running WorkflowAgent.
289427

0 commit comments

Comments
 (0)