Skip to content
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ gh run rerun <RUN_ID> --repo <OWNER>/<REPO> --failed
- Avoid getattr/hasattr guards and instead enforce type correctness by relying on explicit type assertions and proper object usage, ensuring functions only receive the expected Pydantic models or typed inputs. Prefer type hints and validated models over runtime shape checks.
- Prefer accessing typed attributes directly. If necessary, convert inputs up front into a canonical shape; avoid purely hypothetical fallbacks.
- Use real newlines in commit messages; do not write literal "\n".
- When logging is needed, use the SDK's logger from `openhands.sdk.logger` instead of `logging.getLogger(__name__)`.

## Event Type Deprecation Policy

Expand Down
68 changes: 64 additions & 4 deletions openhands-sdk/openhands/sdk/event/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


if TYPE_CHECKING:
from openhands.sdk.event.llm_convertible import ActionEvent
from openhands.sdk.event.llm_convertible import ActionEvent, ObservationBaseEvent

N_CHAR_PREVIEW = 500

Expand Down Expand Up @@ -89,9 +89,19 @@ def __str__(self) -> str:

@staticmethod
def events_to_messages(events: list["LLMConvertibleEvent"]) -> list[Message]:
"""Convert event stream to LLM message stream, handling multi-action batches"""
# TODO: We should add extensive tests for this
from openhands.sdk.event.llm_convertible import ActionEvent
"""Convert event stream to LLM message stream, handling multi-action batches.

This method handles two key batching scenarios:
1. ActionEvents with the same llm_response_id are combined into a single
message (parallel tool calls from the same LLM response)
2. Consecutive ObservationBaseEvents with the same tool_call_id are
merged into a single message with combined content (handles race
conditions when a tool completes after a restart creates an error)
"""
from openhands.sdk.event.llm_convertible import (
ActionEvent,
ObservationBaseEvent,
)

messages = []
i = 0
Expand All @@ -118,6 +128,30 @@ def events_to_messages(events: list["LLMConvertibleEvent"]) -> list[Message]:
# Create combined message for the response
messages.append(_combine_action_events(batch_events))
i = j

elif isinstance(event, ObservationBaseEvent):
# Collect consecutive observations with the same tool_call_id
# This handles the case where AgentErrorEvent and ObservationEvent
# are both created for the same tool_call_id (e.g., after a restart)
batch_observations: list[ObservationBaseEvent] = [event]
tool_call_id = event.tool_call_id

# Look ahead for consecutive observations with same tool_call_id
j = i + 1
while j < len(events) and isinstance(events[j], ObservationBaseEvent):
next_event = events[j]
assert isinstance(next_event, ObservationBaseEvent)
if next_event.tool_call_id != tool_call_id:
break
batch_observations.append(next_event)
j += 1

# Merge all observations by combining their content
if len(batch_observations) > 1:
messages.append(_merge_observation_messages(batch_observations))
else:
messages.append(batch_observations[0].to_llm_message())
i = j
else:
# Regular event - direct conversion
messages.append(event.to_llm_message())
Expand All @@ -126,6 +160,32 @@ def events_to_messages(events: list["LLMConvertibleEvent"]) -> list[Message]:
return messages


def _merge_observation_messages(
observations: list["ObservationBaseEvent"],
) -> Message:
"""Merge multiple observations with the same tool_call_id into a single message.

This handles the race condition where a runtime restart creates an
AgentErrorEvent for an "unmatched" action, but the tool then completes and
creates an ObservationEvent. Instead of selecting one, we merge all
observation contents into a single tool result message.
"""
# Convert all observations to messages and merge their content
merged_content: list[TextContent | ImageContent] = []
first_message = observations[0].to_llm_message()

for obs in observations:
msg = obs.to_llm_message()
merged_content.extend(msg.content)

return Message(
role="tool",
content=merged_content,
tool_call_id=first_message.tool_call_id,
name=first_message.name,
)


def _combine_action_events(events: list["ActionEvent"]) -> Message:
"""Combine multiple ActionEvents into single LLM message.

Expand Down
252 changes: 252 additions & 0 deletions tests/sdk/event/test_events_to_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,3 +453,255 @@ def test_action_event_with_none_action_round_trip_and_observation_match(self):
assert msgs[0].role == "assistant"
assert msgs[1].role == "tool"
assert msgs[1].tool_call_id == "call_ne"


class TestDuplicateObservationMerging:
"""Tests for merging consecutive observations with the same tool_call_id.

This handles the scenario where:
1. Agent invokes a tool (ActionEvent with tool_call_id)
2. Runtime restarts, creating AgentErrorEvent for "unmatched" action
3. Tool completes, creating ObservationEvent with same tool_call_id
4. Both have the same tool_call_id, so their content is merged into one message

All consecutive observations with the same tool_call_id are merged by combining
their content fields into a single tool result message.
"""

def test_consecutive_error_and_observation_merges_content(self):
"""Test that consecutive observations with same tool_call_id are merged."""
error = AgentErrorEvent(
error="A restart occurred while this tool was in progress.",
tool_call_id="call_1",
tool_name="terminal",
)
obs = ObservationEvent(
source="environment",
observation=EventsToMessagesMockObservation(result="Command succeeded"),
action_id="action_1",
tool_name="terminal",
tool_call_id="call_1",
)

events = cast(list[LLMConvertibleEvent], [error, obs])
messages = LLMConvertibleEvent.events_to_messages(events)

# Only one message should be produced (merged content)
assert len(messages) == 1
assert messages[0].role == "tool"
assert messages[0].tool_call_id == "call_1"
# Should contain BOTH the error AND the result (merged)
assert len(messages[0].content) == 2
content_texts = [c.text for c in messages[0].content] # type: ignore
assert any("restart occurred" in t for t in content_texts)
assert any("Command succeeded" in t for t in content_texts)

def test_consecutive_observation_and_error_merges_content(self):
"""Test merging works regardless of order."""
obs = ObservationEvent(
source="environment",
observation=EventsToMessagesMockObservation(result="Command succeeded"),
action_id="action_1",
tool_name="terminal",
tool_call_id="call_1",
)
error = AgentErrorEvent(
error="A restart occurred while this tool was in progress.",
tool_call_id="call_1",
tool_name="terminal",
)

events = cast(list[LLMConvertibleEvent], [obs, error])
messages = LLMConvertibleEvent.events_to_messages(events)

# Only one message with merged content
assert len(messages) == 1
assert len(messages[0].content) == 2
content_texts = [c.text for c in messages[0].content] # type: ignore
assert any("Command succeeded" in t for t in content_texts)
assert any("restart occurred" in t for t in content_texts)

def test_non_consecutive_duplicates_not_deduplicated(self):
"""Test that non-consecutive duplicates are NOT deduplicated.

If observations with the same tool_call_id are separated by other events,
this indicates a bug that should be exposed rather than hidden.
"""
error = AgentErrorEvent(
error="Restart error",
tool_call_id="call_1",
tool_name="terminal",
)
user_msg = MessageEvent(
source="user",
llm_message=Message(
role="user", content=[TextContent(text="Some interruption")]
),
)
obs = ObservationEvent(
source="environment",
observation=EventsToMessagesMockObservation(result="Actual result"),
action_id="action_1",
tool_name="terminal",
tool_call_id="call_1",
)

events = cast(list[LLMConvertibleEvent], [error, user_msg, obs])
messages = LLMConvertibleEvent.events_to_messages(events)

# All three messages should be produced (non-consecutive not deduplicated)
assert len(messages) == 3
assert messages[0].role == "tool" # error
assert messages[1].role == "user" # message
assert messages[2].role == "tool" # observation

def test_multiple_consecutive_errors_merges_all(self):
"""Test that when multiple errors exist, they are all merged."""
error1 = AgentErrorEvent(
error="First error",
tool_call_id="call_1",
tool_name="terminal",
)
error2 = AgentErrorEvent(
error="Second error",
tool_call_id="call_1",
tool_name="terminal",
)

events = cast(list[LLMConvertibleEvent], [error1, error2])
messages = LLMConvertibleEvent.events_to_messages(events)

# Only one message with merged content from both errors
assert len(messages) == 1
assert len(messages[0].content) == 2
content_texts = [c.text for c in messages[0].content] # type: ignore
assert any("First error" in t for t in content_texts)
assert any("Second error" in t for t in content_texts)

def test_multiple_consecutive_observations_merges_all(self):
"""Test that when multiple observations exist, they are all merged."""
obs1 = ObservationEvent(
source="environment",
observation=EventsToMessagesMockObservation(result="First result"),
action_id="action_1",
tool_name="terminal",
tool_call_id="call_1",
)
obs2 = ObservationEvent(
source="environment",
observation=EventsToMessagesMockObservation(result="Second result"),
action_id="action_1",
tool_name="terminal",
tool_call_id="call_1",
)

events = cast(list[LLMConvertibleEvent], [obs1, obs2])
messages = LLMConvertibleEvent.events_to_messages(events)

# Only one message with merged content from both observations
assert len(messages) == 1
assert len(messages[0].content) == 2
content_texts = [c.text for c in messages[0].content] # type: ignore
assert any("First result" in t for t in content_texts)
assert any("Second result" in t for t in content_texts)

def test_mixed_scenario_multiple_tool_calls(self):
"""Test with multiple tool calls, some having consecutive duplicates."""
# Tool call 1: has consecutive duplicate (error + observation)
error1 = AgentErrorEvent(
error="Restart error for call_1",
tool_call_id="call_1",
tool_name="terminal",
)
obs1 = ObservationEvent(
source="environment",
observation=EventsToMessagesMockObservation(result="Result 1"),
action_id="action_1",
tool_name="terminal",
tool_call_id="call_1",
)

# Some other message between tool calls
user_msg = MessageEvent(
source="user",
llm_message=Message(role="user", content=[TextContent(text="Continue")]),
)

# Tool call 2: single observation (no duplicate)
obs2 = ObservationEvent(
source="environment",
observation=EventsToMessagesMockObservation(result="Result 2"),
action_id="action_2",
tool_name="terminal",
tool_call_id="call_2",
)

events = cast(list[LLMConvertibleEvent], [error1, obs1, user_msg, obs2])
messages = LLMConvertibleEvent.events_to_messages(events)

# 3 messages: merged tool_1 result + user message + tool_2 result
assert len(messages) == 3
assert messages[0].role == "tool"
assert messages[0].tool_call_id == "call_1"
# First message has merged content from error and observation
assert len(messages[0].content) == 2
content_texts = [c.text for c in messages[0].content] # type: ignore
assert any("Restart error" in t for t in content_texts)
assert any("Result 1" in t for t in content_texts)
assert messages[1].role == "user"
assert messages[2].role == "tool"
assert messages[2].tool_call_id == "call_2"

def test_restart_scenario_full_conversation(self):
"""Test the full restart scenario in a realistic conversation."""
# 1. User asks to run a command
user_msg = MessageEvent(
source="user",
llm_message=Message(role="user", content=[TextContent(text="Run ls -la")]),
)

# 2. Agent calls terminal tool
action = create_action_event(
thought_text="I'll run the command",
tool_name="terminal",
tool_call_id="call_ls",
llm_response_id="resp_1",
action_args={"command": "ls -la"},
)

# 3. Runtime restarts while tool is running - creates error
restart_error = AgentErrorEvent(
error="A restart occurred while this tool was in progress.",
tool_call_id="call_ls",
tool_name="terminal",
)

# 4. Tool actually completes - creates observation
result = ObservationEvent(
source="environment",
observation=EventsToMessagesMockObservation(
result="total 0\ndrwxr-xr-x 2 user user 40 Jan 1 00:00 ."
),
action_id="action_ls",
tool_name="terminal",
tool_call_id="call_ls",
)

events = cast(
list[LLMConvertibleEvent], [user_msg, action, restart_error, result]
)
messages = LLMConvertibleEvent.events_to_messages(events)

# Should have 3 messages: user, assistant (tool call), merged tool result
assert len(messages) == 3
assert messages[0].role == "user"
assert messages[1].role == "assistant"
assert messages[1].tool_calls is not None
assert messages[1].tool_calls[0].id == "call_ls"
assert messages[2].role == "tool"
assert messages[2].tool_call_id == "call_ls"
# Should have merged content from both error and result
assert len(messages[2].content) == 2
content_texts = [c.text for c in messages[2].content] # type: ignore
assert any("restart occurred" in t for t in content_texts)
assert any("drwxr-xr-x" in t for t in content_texts)
Loading