Skip to content

Commit 958a488

Browse files
authored
Python: Fix context duplication in handoff workflows when restoring from checkpoint (#2867)
* Fix context duplication in handoff workflows when restoring from checkpoint * Address Copilot PR review
1 parent 11d6dcf commit 958a488

File tree

3 files changed

+492
-23
lines changed

3 files changed

+492
-23
lines changed

python/packages/core/agent_framework/_workflows/_handoff.py

Lines changed: 75 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -130,19 +130,57 @@ def _clone_chat_agent(agent: ChatAgent) -> ChatAgent:
130130

131131
@dataclass
132132
class HandoffUserInputRequest:
133-
"""Request message emitted when the workflow needs fresh user input."""
133+
"""Request message emitted when the workflow needs fresh user input.
134+
135+
Note: The conversation field is intentionally excluded from checkpoint serialization
136+
to prevent duplication. The conversation is preserved in the coordinator's state
137+
and will be reconstructed on restore. See issue #2667.
138+
"""
134139

135140
conversation: list[ChatMessage]
136141
awaiting_agent_id: str
137142
prompt: str
138143
source_executor_id: str
139144

145+
def to_dict(self) -> dict[str, Any]:
146+
"""Serialize to dict, excluding conversation to prevent checkpoint duplication.
147+
148+
The conversation is already preserved in the workflow coordinator's state.
149+
Including it here would cause duplicate messages when restoring from checkpoint.
150+
"""
151+
return {
152+
"awaiting_agent_id": self.awaiting_agent_id,
153+
"prompt": self.prompt,
154+
"source_executor_id": self.source_executor_id,
155+
}
156+
157+
@classmethod
158+
def from_dict(cls, data: dict[str, Any]) -> "HandoffUserInputRequest":
159+
"""Deserialize from dict, initializing conversation as empty.
160+
161+
The conversation will be reconstructed from the coordinator's state on restore.
162+
"""
163+
return cls(
164+
conversation=[],
165+
awaiting_agent_id=data["awaiting_agent_id"],
166+
prompt=data["prompt"],
167+
source_executor_id=data["source_executor_id"],
168+
)
169+
140170

141171
@dataclass
142172
class _ConversationWithUserInput:
143-
"""Internal message carrying full conversation + new user messages from gateway to coordinator."""
173+
"""Internal message carrying full conversation + new user messages from gateway to coordinator.
174+
175+
Attributes:
176+
full_conversation: The conversation messages to process.
177+
is_post_restore: If True, indicates this message was created after a checkpoint restore.
178+
The coordinator should append these messages to its existing conversation rather
179+
than replacing it. This prevents duplicate messages (see issue #2667).
180+
"""
144181

145182
full_conversation: list[ChatMessage] = field(default_factory=lambda: []) # type: ignore[misc]
183+
is_post_restore: bool = False
146184

147185

148186
@dataclass
@@ -439,9 +477,25 @@ async def handle_user_input(
439477
message: _ConversationWithUserInput,
440478
ctx: WorkflowContext[AgentExecutorRequest, list[ChatMessage]],
441479
) -> None:
442-
"""Receive full conversation with new user input from gateway, update history, trim for agent."""
443-
# Update authoritative conversation
444-
self._conversation = list(message.full_conversation)
480+
"""Receive user input from gateway, update history, and route to agent.
481+
482+
The message.full_conversation may contain:
483+
- Full conversation history + new user messages (normal flow)
484+
- Only new user messages (post-checkpoint-restore flow, see issue #2667)
485+
486+
The gateway sets message.is_post_restore=True when resuming after a checkpoint
487+
restore. In that case, we append the new messages to the existing conversation
488+
rather than replacing it.
489+
"""
490+
incoming = message.full_conversation
491+
492+
if message.is_post_restore and self._conversation:
493+
# Post-restore: append new user messages to existing conversation
494+
# The coordinator already has its conversation restored from checkpoint
495+
self._conversation.extend(incoming)
496+
else:
497+
# Normal flow: replace with full conversation
498+
self._conversation = list(incoming) if incoming else self._conversation
445499

446500
# Reset autonomous turn counter on new user input
447501
self._autonomous_turns = 0
@@ -626,15 +680,24 @@ async def resume_from_user(
626680
response: object,
627681
ctx: WorkflowContext[_ConversationWithUserInput],
628682
) -> None:
629-
"""Convert user input responses back into chat messages and resume the workflow."""
630-
# Reconstruct full conversation with new user input
631-
conversation = list(original_request.conversation)
683+
"""Convert user input responses back into chat messages and resume the workflow.
684+
685+
After checkpoint restore, original_request.conversation will be empty (not serialized
686+
to prevent duplication - see issue #2667). In this case, we send only the new user
687+
messages and let the coordinator append them to its already-restored conversation.
688+
"""
632689
user_messages = _as_user_messages(response)
633-
conversation.extend(user_messages)
634690

635-
# Send full conversation back to coordinator (not trimmed)
636-
# Coordinator will update its authoritative history and trim for agent
637-
message = _ConversationWithUserInput(full_conversation=conversation)
691+
if original_request.conversation:
692+
# Normal flow: have conversation history from the original request
693+
conversation = list(original_request.conversation)
694+
conversation.extend(user_messages)
695+
message = _ConversationWithUserInput(full_conversation=conversation, is_post_restore=False)
696+
else:
697+
# Post-restore flow: conversation was not serialized, send only new user messages
698+
# The coordinator will append these to its already-restored conversation
699+
message = _ConversationWithUserInput(full_conversation=user_messages, is_post_restore=True)
700+
638701
await ctx.send_message(message, target_id="handoff-coordinator")
639702

640703

0 commit comments

Comments
 (0)