Skip to content

Commit b99b0bd

Browse files
Merge duplicate observation content instead of selecting best
Changed the approach for handling multiple observations with the same tool_call_id from selecting the "best" one to merging all of them. Now when consecutive observations share a tool_call_id (e.g., when a runtime restart creates an error but the tool also completes), their content is combined into a single tool result message. This ensures the LLM receives all relevant information from both the error and the actual result, rather than losing one of them. Co-authored-by: openhands <openhands@all-hands.dev>
1 parent b75c6e3 commit b99b0bd

File tree

2 files changed

+71
-62
lines changed

2 files changed

+71
-62
lines changed

openhands-sdk/openhands/sdk/event/base.py

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,8 @@ def events_to_messages(events: list["LLMConvertibleEvent"]) -> list[Message]:
9595
1. ActionEvents with the same llm_response_id are combined into a single
9696
message (parallel tool calls from the same LLM response)
9797
2. Consecutive ObservationBaseEvents with the same tool_call_id are
98-
deduplicated (handles race conditions when a tool completes after a
99-
restart creates an error)
100-
101-
For duplicate observations, priority is:
102-
ObservationEvent > others > AgentErrorEvent.
98+
merged into a single message with combined content (handles race
99+
conditions when a tool completes after a restart creates an error)
103100
"""
104101
from openhands.sdk.event.llm_convertible import (
105102
ActionEvent,
@@ -149,13 +146,11 @@ def events_to_messages(events: list["LLMConvertibleEvent"]) -> list[Message]:
149146
batch_observations.append(next_event)
150147
j += 1
151148

152-
# Select the best observation to use
149+
# Merge all observations by combining their content
153150
if len(batch_observations) > 1:
154-
selected = _select_best_observation(batch_observations)
151+
messages.append(_merge_observation_messages(batch_observations))
155152
else:
156-
selected = batch_observations[0]
157-
158-
messages.append(selected.to_llm_message())
153+
messages.append(batch_observations[0].to_llm_message())
159154
i = j
160155
else:
161156
# Regular event - direct conversion
@@ -165,35 +160,30 @@ def events_to_messages(events: list["LLMConvertibleEvent"]) -> list[Message]:
165160
return messages
166161

167162

168-
def _select_best_observation(
163+
def _merge_observation_messages(
169164
observations: list["ObservationBaseEvent"],
170-
) -> "ObservationBaseEvent":
171-
"""Select the best observation when multiple exist for the same tool_call_id.
172-
173-
Priority: ObservationEvent > other types > AgentErrorEvent
174-
If same priority, prefer the later one (more recent).
165+
) -> Message:
166+
"""Merge multiple observations with the same tool_call_id into a single message.
175167
176168
This handles the race condition where a runtime restart creates an
177169
AgentErrorEvent for an "unmatched" action, but the tool then completes and
178-
creates an ObservationEvent. The ObservationEvent has the actual result,
179-
so it takes priority.
170+
creates an ObservationEvent. Instead of selecting one, we merge all
171+
observation contents into a single tool result message.
180172
"""
181-
from openhands.sdk.event.llm_convertible import AgentErrorEvent, ObservationEvent
182-
183-
obs_events = [o for o in observations if isinstance(o, ObservationEvent)]
184-
error_events = [o for o in observations if isinstance(o, AgentErrorEvent)]
185-
other_events = [
186-
o
187-
for o in observations
188-
if not isinstance(o, (ObservationEvent, AgentErrorEvent))
189-
]
190-
191-
# Return by priority - later events within each category are preferred
192-
if obs_events:
193-
return obs_events[-1]
194-
if other_events:
195-
return other_events[-1]
196-
return error_events[-1]
173+
# Convert all observations to messages and merge their content
174+
merged_content: list[TextContent | ImageContent] = []
175+
first_message = observations[0].to_llm_message()
176+
177+
for obs in observations:
178+
msg = obs.to_llm_message()
179+
merged_content.extend(msg.content)
180+
181+
return Message(
182+
role="tool",
183+
content=merged_content,
184+
tool_call_id=first_message.tool_call_id,
185+
name=first_message.name,
186+
)
197187

198188

199189
def _combine_action_events(events: list["ActionEvent"]) -> Message:

tests/sdk/event/test_events_to_messages.py

Lines changed: 47 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -455,20 +455,21 @@ def test_action_event_with_none_action_round_trip_and_observation_match(self):
455455
assert msgs[1].tool_call_id == "call_ne"
456456

457457

458-
class TestDuplicateObservationDeduplication:
459-
"""Tests for deduplication of consecutive observations with the same tool_call_id.
458+
class TestDuplicateObservationMerging:
459+
"""Tests for merging consecutive observations with the same tool_call_id.
460460
461461
This handles the scenario where:
462462
1. Agent invokes a tool (ActionEvent with tool_call_id)
463463
2. Runtime restarts, creating AgentErrorEvent for "unmatched" action
464464
3. Tool completes, creating ObservationEvent with same tool_call_id
465-
4. Both have the same tool_call_id, but only one message should be sent
465+
4. Both have the same tool_call_id, so their content is merged into one message
466466
467-
The deduplication prioritizes: ObservationEvent > other types > AgentErrorEvent
467+
All consecutive observations with the same tool_call_id are merged by combining
468+
their content fields into a single tool result message.
468469
"""
469470

470-
def test_consecutive_error_and_observation_keeps_observation(self):
471-
"""Test that ObservationEvent is kept over AgentErrorEvent when consecutive."""
471+
def test_consecutive_error_and_observation_merges_content(self):
472+
"""Test that consecutive observations with same tool_call_id are merged."""
472473
error = AgentErrorEvent(
473474
error="A restart occurred while this tool was in progress.",
474475
tool_call_id="call_1",
@@ -485,15 +486,18 @@ def test_consecutive_error_and_observation_keeps_observation(self):
485486
events = cast(list[LLMConvertibleEvent], [error, obs])
486487
messages = LLMConvertibleEvent.events_to_messages(events)
487488

488-
# Only one message should be produced (the ObservationEvent)
489+
# Only one message should be produced (merged content)
489490
assert len(messages) == 1
490491
assert messages[0].role == "tool"
491492
assert messages[0].tool_call_id == "call_1"
492-
# Should contain the actual result, not the error
493-
assert "Command succeeded" in messages[0].content[0].text # type: ignore
494-
495-
def test_consecutive_observation_and_error_keeps_observation(self):
496-
"""Test deduplication regardless of order - observation still preferred."""
493+
# Should contain BOTH the error AND the result (merged)
494+
assert len(messages[0].content) == 2
495+
content_texts = [c.text for c in messages[0].content] # type: ignore
496+
assert any("restart occurred" in t for t in content_texts)
497+
assert any("Command succeeded" in t for t in content_texts)
498+
499+
def test_consecutive_observation_and_error_merges_content(self):
500+
"""Test merging works regardless of order."""
497501
obs = ObservationEvent(
498502
source="environment",
499503
observation=EventsToMessagesMockObservation(result="Command succeeded"),
@@ -510,9 +514,12 @@ def test_consecutive_observation_and_error_keeps_observation(self):
510514
events = cast(list[LLMConvertibleEvent], [obs, error])
511515
messages = LLMConvertibleEvent.events_to_messages(events)
512516

513-
# Only one message - ObservationEvent is preferred
517+
# Only one message with merged content
514518
assert len(messages) == 1
515-
assert "Command succeeded" in messages[0].content[0].text # type: ignore
519+
assert len(messages[0].content) == 2
520+
content_texts = [c.text for c in messages[0].content] # type: ignore
521+
assert any("Command succeeded" in t for t in content_texts)
522+
assert any("restart occurred" in t for t in content_texts)
516523

517524
def test_non_consecutive_duplicates_not_deduplicated(self):
518525
"""Test that non-consecutive duplicates are NOT deduplicated.
@@ -548,8 +555,8 @@ def test_non_consecutive_duplicates_not_deduplicated(self):
548555
assert messages[1].role == "user" # message
549556
assert messages[2].role == "tool" # observation
550557

551-
def test_multiple_consecutive_errors_keeps_last(self):
552-
"""Test that when only errors exist, the last one is kept."""
558+
def test_multiple_consecutive_errors_merges_all(self):
559+
"""Test that when multiple errors exist, they are all merged."""
553560
error1 = AgentErrorEvent(
554561
error="First error",
555562
tool_call_id="call_1",
@@ -564,12 +571,15 @@ def test_multiple_consecutive_errors_keeps_last(self):
564571
events = cast(list[LLMConvertibleEvent], [error1, error2])
565572
messages = LLMConvertibleEvent.events_to_messages(events)
566573

567-
# Only one message - the last error
574+
# Only one message with merged content from both errors
568575
assert len(messages) == 1
569-
assert "Second error" in messages[0].content[0].text # type: ignore
576+
assert len(messages[0].content) == 2
577+
content_texts = [c.text for c in messages[0].content] # type: ignore
578+
assert any("First error" in t for t in content_texts)
579+
assert any("Second error" in t for t in content_texts)
570580

571-
def test_multiple_consecutive_observations_keeps_last(self):
572-
"""Test that when multiple observations exist, the last one is kept."""
581+
def test_multiple_consecutive_observations_merges_all(self):
582+
"""Test that when multiple observations exist, they are all merged."""
573583
obs1 = ObservationEvent(
574584
source="environment",
575585
observation=EventsToMessagesMockObservation(result="First result"),
@@ -588,9 +598,12 @@ def test_multiple_consecutive_observations_keeps_last(self):
588598
events = cast(list[LLMConvertibleEvent], [obs1, obs2])
589599
messages = LLMConvertibleEvent.events_to_messages(events)
590600

591-
# Only one message - the last observation
601+
# Only one message with merged content from both observations
592602
assert len(messages) == 1
593-
assert "Second result" in messages[0].content[0].text # type: ignore
603+
assert len(messages[0].content) == 2
604+
content_texts = [c.text for c in messages[0].content] # type: ignore
605+
assert any("First result" in t for t in content_texts)
606+
assert any("Second result" in t for t in content_texts)
594607

595608
def test_mixed_scenario_multiple_tool_calls(self):
596609
"""Test with multiple tool calls, some having consecutive duplicates."""
@@ -626,11 +639,15 @@ def test_mixed_scenario_multiple_tool_calls(self):
626639
events = cast(list[LLMConvertibleEvent], [error1, obs1, user_msg, obs2])
627640
messages = LLMConvertibleEvent.events_to_messages(events)
628641

629-
# 3 messages: deduplicated tool_1 result + user message + tool_2 result
642+
# 3 messages: merged tool_1 result + user message + tool_2 result
630643
assert len(messages) == 3
631644
assert messages[0].role == "tool"
632645
assert messages[0].tool_call_id == "call_1"
633-
assert "Result 1" in messages[0].content[0].text # type: ignore
646+
# First message has merged content from error and observation
647+
assert len(messages[0].content) == 2
648+
content_texts = [c.text for c in messages[0].content] # type: ignore
649+
assert any("Restart error" in t for t in content_texts)
650+
assert any("Result 1" in t for t in content_texts)
634651
assert messages[1].role == "user"
635652
assert messages[2].role == "tool"
636653
assert messages[2].tool_call_id == "call_2"
@@ -675,14 +692,16 @@ def test_restart_scenario_full_conversation(self):
675692
)
676693
messages = LLMConvertibleEvent.events_to_messages(events)
677694

678-
# Should have 3 messages: user, assistant (tool call), tool result
679-
# The restart error should be deduplicated
695+
# Should have 3 messages: user, assistant (tool call), merged tool result
680696
assert len(messages) == 3
681697
assert messages[0].role == "user"
682698
assert messages[1].role == "assistant"
683699
assert messages[1].tool_calls is not None
684700
assert messages[1].tool_calls[0].id == "call_ls"
685701
assert messages[2].role == "tool"
686702
assert messages[2].tool_call_id == "call_ls"
687-
# Should have the actual result, not the error
688-
assert "drwxr-xr-x" in messages[2].content[0].text # type: ignore
703+
# Should have merged content from both error and result
704+
assert len(messages[2].content) == 2
705+
content_texts = [c.text for c in messages[2].content] # type: ignore
706+
assert any("restart occurred" in t for t in content_texts)
707+
assert any("drwxr-xr-x" in t for t in content_texts)

0 commit comments

Comments
 (0)