Skip to content

Commit 14496d3

Browse files
committed
refactor(server): migrate from AgentResult to AgentEvent for protocol agnostic design
This change migrates the core event handling system from AgentResult to AgentEvent, providing a protocol-agnostic foundation that supports both OpenAI and AG-UI protocols. The new AgentEvent system automatically handles boundary events (START/END) at the protocol layer, simplifying user-facing event definitions and improving consistency across different protocol implementations. The migration includes: - Replacing AgentResult with AgentEvent as the primary event type - Introducing simplified event types (TEXT, TOOL_CALL, TOOL_RESULT) - Removing manual boundary event handling from user code - Adding automatic boundary event generation in protocol handlers - Updating all integrations and tests to use the new event system BREAKING CHANGE: AgentResult has been replaced with AgentEvent. Users must update their event handling code to use the new event types and structure. 将服务器从 AgentResult 迁移到 AgentEvent,实现协议无关的设计 此次更改将核心事件处理系统从 AgentResult 迁移到 AgentEvent, 提供了支持 OpenAI 和 AG-UI 协议的协议无关基础。 新的 AgentEvent 系统在协议层自动处理边界事件(START/END), 简化了面向用户的事件定义并提高不同协议实现之间的一致性。 迁移包括: - 将 AgentResult 替换为 AgentEvent 作为主要事件类型 - 引入简化的事件类型(TEXT、TOOL_CALL、TOOL_RESULT) - 从用户代码中移除手动边界事件处理 - 在协议处理器中添加自动边界事件生成 - 更新所有集成和测试以使用新的事件系统 重大变更:AgentResult 已被 AgentEvent 替换。用户必须更新 他们的事件处理代码以使用新的事件类型和结构。 perf(langgraph): optimize tool call event conversion with chunk-based streaming Improves the tool call event conversion process in LangGraph integration by switching from multiple discrete events (START, ARGS, END) to a single chunk-based approach (TOOL_CALL_CHUNK). This reduces event overhead and simplifies the event flow while maintaining proper tool call ID consistency and argument streaming capabilities. The optimization includes: - Consolidating tool call events into single chunks with complete args - Maintaining proper tool call ID tracking across streaming chunks - Supporting both complete and incremental argument transmission - Preserving compatibility with existing LangGraph event formats 性能优化(langgraph): 使用基于块的流优化工具调用事件转换 通过从多个离散事件(START、ARGS、END)切换到单一块方法(TOOL_CALL_CHUNK), 改进 LangGraph 集成中的工具调用事件转换过程。 这减少了事件开销并简化了事件流,同时保持适当的工具调用 ID 一致性和 参数流功能。 优化包括: - 将工具调用事件整合到包含完整参数的单个块中 - 在流块间保持适当的工具调用 ID 跟踪 - 支持完整和增量参数传输 - 保持与现有 LangGraph 事件格式的兼容性 test: add comprehensive test coverage for event conversion and protocol handling Adds extensive test coverage for the new AgentEvent system and event conversion functionality. Includes unit tests for LangGraph event conversion, protocol handling, and edge cases. The test suite ensures proper behavior across different event formats and verifies correct event ordering and ID consistency. The new tests cover: - LangGraph event conversion for different stream modes - AG-UI event normalizer functionality - Server protocol handling for both OpenAI and AG-UI - Tool call ID consistency across streaming chunks - Error handling and edge cases 测试: 为事件转换和协议处理添加全面的测试覆盖 为新的 AgentEvent 系统和事件转换功能添加广泛的测试覆盖。 包括 LangGraph 事件转换、协议处理和边缘情况的单元测试。 测试套件确保不同事件格式的正确行为,并验证正确的事件排序和 ID 一致性。 新测试覆盖: - 不同流模式下的 LangGraph 事件转换 - AG-UI 事件规范化器功能 - 服务器协议处理(OpenAI 和 AG-UI) - 流块间工具调用 ID 的一致性 - 错误处理和边缘情况 Change-Id: I92fca1758866344bd34486b853d177e7d8f9fdf4 Signed-off-by: OhYee <[email protected]>
1 parent 8a19135 commit 14496d3

File tree

14 files changed

+3259
-1657
lines changed

14 files changed

+3259
-1657
lines changed

agentrun/integration/langgraph/agent_converter.py

Lines changed: 96 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -385,40 +385,35 @@ def _convert_stream_updates_event(
385385
tc_args = tc.get("args", {})
386386

387387
if tc_id:
388-
yield AgentResult(
389-
event=EventType.TOOL_CALL_START,
390-
data={
391-
"tool_call_id": tc_id,
392-
"tool_call_name": tc_name,
393-
},
394-
)
388+
# 发送带有完整参数的 TOOL_CALL_CHUNK
389+
args_str = ""
395390
if tc_args:
396391
args_str = (
397392
_safe_json_dumps(tc_args)
398393
if isinstance(tc_args, dict)
399394
else str(tc_args)
400395
)
401-
yield AgentResult(
402-
event=EventType.TOOL_CALL_ARGS,
403-
data={"tool_call_id": tc_id, "delta": args_str},
404-
)
396+
yield AgentResult(
397+
event=EventType.TOOL_CALL_CHUNK,
398+
data={
399+
"id": tc_id,
400+
"name": tc_name,
401+
"args_delta": args_str,
402+
},
403+
)
405404

406405
elif msg_type == "tool":
407-
# 工具结果(发送 RESULT 和 END)
406+
# 工具结果
408407
tool_call_id = _get_tool_call_id(msg)
409408
if tool_call_id:
410409
tool_content = _get_message_content(msg)
411410
yield AgentResult(
412-
event=EventType.TOOL_CALL_RESULT,
411+
event=EventType.TOOL_RESULT,
413412
data={
414-
"tool_call_id": tool_call_id,
413+
"id": tool_call_id,
415414
"result": str(tool_content) if tool_content else "",
416415
},
417416
)
418-
yield AgentResult(
419-
event=EventType.TOOL_CALL_END,
420-
data={"tool_call_id": tool_call_id},
421-
)
422417

423418

424419
def _convert_stream_values_event(
@@ -454,46 +449,41 @@ def _convert_stream_values_event(
454449
if content:
455450
yield content
456451

457-
# 工具调用(仅发送 START 和 ARGS)
452+
# 工具调用
458453
for tc in _get_message_tool_calls(last_msg):
459454
tc_id = tc.get("id", "")
460455
tc_name = tc.get("name", "")
461456
tc_args = tc.get("args", {})
462457

463458
if tc_id:
464-
yield AgentResult(
465-
event=EventType.TOOL_CALL_START,
466-
data={
467-
"tool_call_id": tc_id,
468-
"tool_call_name": tc_name,
469-
},
470-
)
459+
# 发送带有完整参数的 TOOL_CALL_CHUNK
460+
args_str = ""
471461
if tc_args:
472462
args_str = (
473463
_safe_json_dumps(tc_args)
474464
if isinstance(tc_args, dict)
475465
else str(tc_args)
476466
)
477-
yield AgentResult(
478-
event=EventType.TOOL_CALL_ARGS,
479-
data={"tool_call_id": tc_id, "delta": args_str},
480-
)
467+
yield AgentResult(
468+
event=EventType.TOOL_CALL_CHUNK,
469+
data={
470+
"id": tc_id,
471+
"name": tc_name,
472+
"args_delta": args_str,
473+
},
474+
)
481475

482476
elif msg_type == "tool":
483477
tool_call_id = _get_tool_call_id(last_msg)
484478
if tool_call_id:
485479
tool_content = _get_message_content(last_msg)
486480
yield AgentResult(
487-
event=EventType.TOOL_CALL_RESULT,
481+
event=EventType.TOOL_RESULT,
488482
data={
489-
"tool_call_id": tool_call_id,
483+
"id": tool_call_id,
490484
"result": str(tool_content) if tool_content else "",
491485
},
492486
)
493-
yield AgentResult(
494-
event=EventType.TOOL_CALL_END,
495-
data={"tool_call_id": tool_call_id},
496-
)
497487

498488

499489
def _convert_astream_events_event(
@@ -559,30 +549,49 @@ def _convert_astream_events_event(
559549
if not tc_id:
560550
continue
561551

562-
# AG-UI 协议要求:先发送 TOOL_CALL_START,再发送 TOOL_CALL_ARGS
563-
# 第一次遇到某个工具调用时(有 id 和 name),先发送 TOOL_CALL_START
564-
if tc_raw_id and tc_name:
565-
if (
552+
# 流式工具调用:第一个 chunk 包含 id 和 name,后续只有 args_delta
553+
# 协议层会自动处理 START/END 边界事件
554+
is_first_chunk = (
555+
tc_raw_id
556+
and tc_name
557+
and (
566558
tool_call_started_set is None
567559
or tc_id not in tool_call_started_set
568-
):
569-
yield AgentResult(
570-
event=EventType.TOOL_CALL_START,
571-
data={
572-
"tool_call_id": tc_id,
573-
"tool_call_name": tc_name,
574-
},
575-
)
576-
if tool_call_started_set is not None:
577-
tool_call_started_set.add(tc_id)
560+
)
561+
)
578562

579-
# 只有有 args 时才生成 TOOL_CALL_ARGS 事件
580-
if tc_args:
581-
if isinstance(tc_args, (dict, list)):
582-
tc_args = _safe_json_dumps(tc_args)
563+
if is_first_chunk:
564+
if tool_call_started_set is not None:
565+
tool_call_started_set.add(tc_id)
566+
# 第一个 chunk 包含 id 和 name
567+
args_delta = ""
568+
if tc_args:
569+
args_delta = (
570+
_safe_json_dumps(tc_args)
571+
if isinstance(tc_args, (dict, list))
572+
else str(tc_args)
573+
)
583574
yield AgentResult(
584-
event=EventType.TOOL_CALL_ARGS,
585-
data={"tool_call_id": tc_id, "delta": tc_args},
575+
event=EventType.TOOL_CALL_CHUNK,
576+
data={
577+
"id": tc_id,
578+
"name": tc_name,
579+
"args_delta": args_delta,
580+
},
581+
)
582+
elif tc_args:
583+
# 后续 chunk 只有 args_delta
584+
args_delta = (
585+
_safe_json_dumps(tc_args)
586+
if isinstance(tc_args, (dict, list))
587+
else str(tc_args)
588+
)
589+
yield AgentResult(
590+
event=EventType.TOOL_CALL_CHUNK,
591+
data={
592+
"id": tc_id,
593+
"args_delta": args_delta,
594+
},
586595
)
587596

588597
# 2. LangChain 格式: on_chain_stream
@@ -598,17 +607,24 @@ def _convert_astream_events_event(
598607

599608
for tc in _get_message_tool_calls(msg):
600609
tc_id = tc.get("id", "")
610+
tc_name = tc.get("name", "")
601611
tc_args = tc.get("args", {})
602612

603-
if tc_id and tc_args:
604-
args_str = (
605-
_safe_json_dumps(tc_args)
606-
if isinstance(tc_args, dict)
607-
else str(tc_args)
608-
)
613+
if tc_id:
614+
args_delta = ""
615+
if tc_args:
616+
args_delta = (
617+
_safe_json_dumps(tc_args)
618+
if isinstance(tc_args, dict)
619+
else str(tc_args)
620+
)
609621
yield AgentResult(
610-
event=EventType.TOOL_CALL_ARGS,
611-
data={"tool_call_id": tc_id, "delta": args_str},
622+
event=EventType.TOOL_CALL_CHUNK,
623+
data={
624+
"id": tc_id,
625+
"name": tc_name,
626+
"args_delta": args_delta,
627+
},
612628
)
613629

614630
# 3. 工具开始
@@ -622,41 +638,33 @@ def _convert_astream_events_event(
622638
tool_input = _filter_tool_input(tool_input_raw)
623639

624640
if tool_call_id:
625-
# 检查是否已在 on_chat_model_stream 中发送过 TOOL_CALL_START
641+
# 检查是否已在 on_chat_model_stream 中发送过
626642
already_started = (
627643
tool_call_started_set is not None
628644
and tool_call_id in tool_call_started_set
629645
)
630646

631647
if not already_started:
632-
# 非流式场景或未收到流式事件,需要发送 TOOL_CALL_START
633-
yield AgentResult(
634-
event=EventType.TOOL_CALL_START,
635-
data={
636-
"tool_call_id": tool_call_id,
637-
"tool_call_name": tool_name,
638-
},
639-
)
648+
# 非流式场景或未收到流式事件,发送完整的 TOOL_CALL_CHUNK
640649
if tool_call_started_set is not None:
641650
tool_call_started_set.add(tool_call_id)
642651

643-
# 非流式场景下,在 START 后发送完整参数
652+
args_delta = ""
644653
if tool_input:
645-
args_str = (
654+
args_delta = (
646655
_safe_json_dumps(tool_input)
647656
if isinstance(tool_input, dict)
648657
else str(tool_input)
649658
)
650-
yield AgentResult(
651-
event=EventType.TOOL_CALL_ARGS,
652-
data={"tool_call_id": tool_call_id, "delta": args_str},
653-
)
654-
655-
# AG-UI 协议:TOOL_CALL_END 表示参数传输完成,在工具执行前发送
656-
yield AgentResult(
657-
event=EventType.TOOL_CALL_END,
658-
data={"tool_call_id": tool_call_id},
659-
)
659+
yield AgentResult(
660+
event=EventType.TOOL_CALL_CHUNK,
661+
data={
662+
"id": tool_call_id,
663+
"name": tool_name,
664+
"args_delta": args_delta,
665+
},
666+
)
667+
# 协议层会自动处理边界事件,无需手动发送 TOOL_CALL_END
660668

661669
# 4. 工具结束
662670
elif event_type == "on_tool_end":
@@ -667,12 +675,11 @@ def _convert_astream_events_event(
667675
tool_call_id = _extract_tool_call_id(tool_input_raw) or run_id
668676

669677
if tool_call_id:
670-
# AG-UI 协议:TOOL_CALL_RESULT 在工具执行完成后发送
671-
# 注意:TOOL_CALL_END 已在 on_tool_start 中发送(表示参数传输完成)
678+
# 工具执行完成后发送结果
672679
yield AgentResult(
673-
event=EventType.TOOL_CALL_RESULT,
680+
event=EventType.TOOL_RESULT,
674681
data={
675-
"tool_call_id": tool_call_id,
682+
"id": tool_call_id,
676683
"result": _format_tool_output(output),
677684
},
678685
)

0 commit comments

Comments
 (0)