Skip to content

Commit bda77a6

Browse files
feat(acp): add retry logic for transient connection errors
Add automatic retry for ACP prompt failures caused by transient connection errors (OSError, ConnectionError, BrokenPipeError, EOFError). Changes: - Wrap prompt() call in retry loop for connection exception types - Retry up to 3 times with exponential backoff (5s, 15s, 30s) - Configurable via ACP_PROMPT_MAX_RETRIES env var - Reset client accumulators between retries - Timeout errors are NOT retried (handled separately) This preserves session state when connection errors occur, avoiding the need to restart instances from scratch in the evaluation framework. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 3081956 commit bda77a6

File tree

2 files changed

+196
-4
lines changed

2 files changed

+196
-4
lines changed

openhands-sdk/openhands/sdk/agent/acp_agent.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,15 @@
7878
os.environ.get("ACP_NOTIFICATION_DRAIN_DELAY", "0.1")
7979
)
8080

81+
# Retry configuration for transient ACP connection errors.
82+
# These errors can occur when the connection drops mid-conversation but the
83+
# session state is still valid on the server side.
84+
_ACP_PROMPT_MAX_RETRIES: int = int(os.environ.get("ACP_PROMPT_MAX_RETRIES", "3"))
85+
_ACP_PROMPT_RETRY_DELAYS: tuple[float, ...] = (5.0, 15.0, 30.0) # seconds
86+
87+
# Exception types that indicate transient connection issues worth retrying
88+
_RETRIABLE_CONNECTION_ERRORS = (OSError, ConnectionError, BrokenPipeError, EOFError)
89+
8190
# Limit for asyncio.StreamReader buffers used by the ACP subprocess pipes.
8291
# The default (64 KiB) is too small for session_update notifications that
8392
# carry large tool-call outputs (e.g. file contents, test results). When
@@ -704,15 +713,48 @@ async def _prompt() -> PromptResponse:
704713
await _drain_notifications()
705714
return response
706715

707-
# Send prompt to ACP server (with timeout to prevent indefinite hangs)
716+
# Send prompt to ACP server with retry logic for connection errors.
717+
# Transient connection failures (network blips, server restarts) are
718+
# retried to preserve session state and avoid losing progress.
708719
logger.info(
709720
"Sending ACP prompt (timeout=%.0fs, msg=%d chars)",
710721
self.acp_prompt_timeout,
711722
len(user_message),
712723
)
713-
response = self._executor.run_async(
714-
_prompt, timeout=self.acp_prompt_timeout
715-
)
724+
725+
response: PromptResponse | None = None
726+
max_retries = _ACP_PROMPT_MAX_RETRIES
727+
728+
for attempt in range(max_retries + 1):
729+
try:
730+
response = self._executor.run_async(
731+
_prompt, timeout=self.acp_prompt_timeout
732+
)
733+
break # Success, exit retry loop
734+
except TimeoutError:
735+
# Timeout is handled separately below, don't retry
736+
raise
737+
except _RETRIABLE_CONNECTION_ERRORS as e:
738+
if attempt < max_retries:
739+
delay = _ACP_PROMPT_RETRY_DELAYS[
740+
min(attempt, len(_ACP_PROMPT_RETRY_DELAYS) - 1)
741+
]
742+
logger.warning(
743+
"ACP prompt failed with retriable error (attempt %d/%d), "
744+
"retrying in %.0fs: %s",
745+
attempt + 1,
746+
max_retries + 1,
747+
delay,
748+
e,
749+
)
750+
time.sleep(delay)
751+
# Reset accumulators for retry (partial state may be stale)
752+
self._client.reset()
753+
self._client.on_token = on_token
754+
else:
755+
# Max retries exceeded
756+
raise
757+
716758
elapsed = time.monotonic() - t0
717759
logger.info("ACP prompt returned in %.1fs", elapsed)
718760

tests/sdk/agent/test_acp_agent.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1387,3 +1387,153 @@ def test_serialization_roundtrip(self):
13871387
restored = AgentBase.model_validate_json(dumped)
13881388
assert isinstance(restored, ACPAgent)
13891389
assert restored.acp_session_mode == "full-access"
1390+
1391+
1392+
# ---------------------------------------------------------------------------
1393+
# Connection retry logic
1394+
# ---------------------------------------------------------------------------
1395+
1396+
1397+
class TestACPPromptRetry:
1398+
"""Test retry logic for ACP prompt failures."""
1399+
1400+
def _make_conversation_with_message(self, tmp_path, text="Hello"):
1401+
"""Create a mock conversation with a user message."""
1402+
state = _make_state(tmp_path)
1403+
state.events.append(
1404+
SystemPromptEvent(
1405+
source="agent",
1406+
system_prompt=TextContent(text="ACP-managed agent"),
1407+
tools=[],
1408+
)
1409+
)
1410+
state.events.append(
1411+
MessageEvent(
1412+
source="user",
1413+
llm_message=Message(role="user", content=[TextContent(text=text)]),
1414+
)
1415+
)
1416+
1417+
conversation = MagicMock()
1418+
conversation.state = state
1419+
return conversation
1420+
1421+
def test_retry_on_connection_error_then_success(self, tmp_path):
1422+
"""Retry succeeds after transient connection error."""
1423+
agent = _make_agent()
1424+
conversation = self._make_conversation_with_message(tmp_path)
1425+
events: list = []
1426+
1427+
mock_client = _OpenHandsACPBridge()
1428+
agent._client = mock_client
1429+
agent._conn = MagicMock()
1430+
agent._session_id = "test-session"
1431+
1432+
call_count = 0
1433+
1434+
def _fake_run_async(_coro, **_kwargs):
1435+
nonlocal call_count
1436+
call_count += 1
1437+
if call_count == 1:
1438+
raise ConnectionError("Connection reset by peer")
1439+
# Second call succeeds - must populate text and return a response
1440+
mock_client.accumulated_text.append("Success after retry")
1441+
# Return a mock PromptResponse (can be MagicMock since we only check usage)
1442+
return MagicMock(usage=None)
1443+
1444+
mock_executor = MagicMock()
1445+
mock_executor.run_async = _fake_run_async
1446+
agent._executor = mock_executor
1447+
1448+
# Patch sleep to avoid actual delays in tests
1449+
with patch("openhands.sdk.agent.acp_agent.time.sleep"):
1450+
agent.step(conversation, on_event=events.append)
1451+
1452+
assert call_count == 2 # First failed, second succeeded
1453+
assert conversation.state.execution_status == ConversationExecutionStatus.FINISHED
1454+
assert len(events) == 3 # MessageEvent, ActionEvent, ObservationEvent
1455+
assert "Success after retry" in events[0].llm_message.content[0].text
1456+
1457+
def test_no_retry_on_non_connection_error(self, tmp_path):
1458+
"""Non-connection errors (e.g., RuntimeError) fail immediately without retry."""
1459+
agent = _make_agent()
1460+
conversation = self._make_conversation_with_message(tmp_path)
1461+
events: list = []
1462+
1463+
mock_client = _OpenHandsACPBridge()
1464+
agent._client = mock_client
1465+
agent._conn = MagicMock()
1466+
agent._session_id = "test-session"
1467+
1468+
call_count = 0
1469+
1470+
def _fake_run_async(_coro, **_kwargs):
1471+
nonlocal call_count
1472+
call_count += 1
1473+
raise RuntimeError("Some application error")
1474+
1475+
mock_executor = MagicMock()
1476+
mock_executor.run_async = _fake_run_async
1477+
agent._executor = mock_executor
1478+
1479+
with pytest.raises(RuntimeError, match="Some application error"):
1480+
agent.step(conversation, on_event=events.append)
1481+
1482+
assert call_count == 1 # No retry attempted
1483+
assert conversation.state.execution_status == ConversationExecutionStatus.ERROR
1484+
1485+
def test_no_retry_on_timeout(self, tmp_path):
1486+
"""Timeout errors are not retried (handled separately)."""
1487+
agent = _make_agent()
1488+
conversation = self._make_conversation_with_message(tmp_path)
1489+
1490+
mock_client = _OpenHandsACPBridge()
1491+
agent._client = mock_client
1492+
agent._conn = MagicMock()
1493+
agent._session_id = "test-session"
1494+
1495+
call_count = 0
1496+
1497+
def _fake_run_async(_coro, **_kwargs):
1498+
nonlocal call_count
1499+
call_count += 1
1500+
raise TimeoutError("ACP prompt timed out")
1501+
1502+
mock_executor = MagicMock()
1503+
mock_executor.run_async = _fake_run_async
1504+
agent._executor = mock_executor
1505+
1506+
agent.step(conversation, on_event=lambda _: None)
1507+
1508+
assert call_count == 1 # No retry for timeout
1509+
assert conversation.state.execution_status == ConversationExecutionStatus.ERROR
1510+
1511+
def test_max_retries_exceeded(self, tmp_path):
1512+
"""Error raised after max retries exhausted."""
1513+
agent = _make_agent()
1514+
conversation = self._make_conversation_with_message(tmp_path)
1515+
events: list = []
1516+
1517+
mock_client = _OpenHandsACPBridge()
1518+
agent._client = mock_client
1519+
agent._conn = MagicMock()
1520+
agent._session_id = "test-session"
1521+
1522+
call_count = 0
1523+
1524+
def _fake_run_async(_coro, **_kwargs):
1525+
nonlocal call_count
1526+
call_count += 1
1527+
raise ConnectionError("Persistent connection failure")
1528+
1529+
mock_executor = MagicMock()
1530+
mock_executor.run_async = _fake_run_async
1531+
agent._executor = mock_executor
1532+
1533+
with patch("openhands.sdk.agent.acp_agent.time.sleep"):
1534+
with pytest.raises(ConnectionError, match="Persistent connection failure"):
1535+
agent.step(conversation, on_event=events.append)
1536+
1537+
# Default max retries is 3, so 4 total attempts (1 initial + 3 retries)
1538+
assert call_count == 4
1539+
assert conversation.state.execution_status == ConversationExecutionStatus.ERROR

0 commit comments

Comments
 (0)