Skip to content

Commit aead27b

Browse files
Add TCP network transport to ACPAgent
ACPAgent currently requires spawning the ACP server as a local subprocess, which needs Node.js installed in the container. When using APIRemoteWorkspace with K8s pods that lack Node.js, this forces custom container images. Add an alternative TCP transport mode where ACPAgent connects to an already-running ACP server over the network via asyncio.open_connection(). The ClientSideConnection works unchanged since it accepts any StreamReader/StreamWriter pair. Transport is configured via mutually exclusive fields validated by a model_validator: either acp_command (subprocess) or acp_host+acp_port (TCP). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 684bc6e commit aead27b

File tree

2 files changed

+223
-64
lines changed

2 files changed

+223
-64
lines changed

openhands-sdk/openhands/sdk/agent/acp_agent.py

Lines changed: 113 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from collections.abc import Generator
1515
from typing import TYPE_CHECKING, Any
1616

17-
from pydantic import Field, PrivateAttr
17+
from pydantic import Field, PrivateAttr, model_validator
1818

1919
from openhands.sdk.agent.base import AgentBase
2020
from openhands.sdk.conversation.state import ConversationExecutionStatus
@@ -263,20 +263,21 @@ def on_connect(self, conn: Any) -> None: # noqa: ARG002
263263
class ACPAgent(AgentBase):
264264
"""Agent that delegates to an ACP (Agent Client Protocol) server.
265265
266-
Instead of calling an LLM directly, this agent spawns an ACP-compatible
267-
server (e.g. ``claude-code-acp``) as a subprocess and communicates with
268-
it via the ACP protocol. The server manages its own LLM, tools, and
269-
execution lifecycle.
266+
Instead of calling an LLM directly, this agent communicates with an
267+
ACP-compatible server (e.g. ``claude-code-acp``) via the ACP protocol.
268+
The server manages its own LLM, tools, and execution lifecycle.
270269
271-
Example::
270+
Two transport modes are supported (mutually exclusive):
272271
273-
from openhands.sdk.agent import ACPAgent
274-
from openhands.sdk.conversation import Conversation
272+
**Subprocess mode** — the agent spawns the ACP server as a child process
273+
and communicates via stdin/stdout JSON-RPC::
275274
276275
agent = ACPAgent(acp_command=["npx", "-y", "claude-code-acp"])
277-
conversation = Conversation(agent=agent, workspace="./workspace")
278-
conversation.send_message("Hello! What is 2+2?")
279-
conversation.run()
276+
277+
**TCP mode** — the agent connects to an already-running ACP server over
278+
the network::
279+
280+
agent = ACPAgent(acp_host="acp-server.internal", acp_port=4001)
280281
"""
281282

282283
# Override required fields with ACP-appropriate defaults
@@ -285,12 +286,39 @@ class ACPAgent(AgentBase):
285286
include_default_tools: list[str] = Field(default_factory=list)
286287

287288
# ACP-specific configuration
288-
acp_command: list[str] = Field(
289-
...,
289+
acp_command: list[str] | None = Field(
290+
default=None,
290291
description=(
291-
"Command to start the ACP server, e.g. ['npx', '-y', 'claude-code-acp']"
292+
"Command to start the ACP server, e.g. ['npx', '-y', 'claude-code-acp']. "
293+
"Mutually exclusive with acp_host."
292294
),
293295
)
296+
acp_host: str | None = Field(
297+
default=None,
298+
description=(
299+
"Hostname of a remote ACP server. Mutually exclusive with acp_command."
300+
),
301+
)
302+
acp_port: int | None = Field(
303+
default=None,
304+
description="Port of the remote ACP server. Required when acp_host is set.",
305+
)
306+
307+
@model_validator(mode="before")
308+
@classmethod
309+
def _validate_transport(cls, data):
310+
if not isinstance(data, dict):
311+
return data
312+
has_command = data.get("acp_command") is not None
313+
has_host = data.get("acp_host") is not None
314+
if has_command and has_host:
315+
raise ValueError("acp_command and acp_host are mutually exclusive")
316+
if not has_command and not has_host:
317+
raise ValueError("Either acp_command or acp_host must be provided")
318+
if has_host and data.get("acp_port") is None:
319+
raise ValueError("acp_port is required when acp_host is set")
320+
return data
321+
294322
acp_args: list[str] = Field(
295323
default_factory=list,
296324
description="Additional arguments for the ACP server command",
@@ -307,6 +335,7 @@ class ACPAgent(AgentBase):
307335
_process: Any = PrivateAttr(default=None) # asyncio subprocess
308336
_client: Any = PrivateAttr(default=None) # _OpenHandsACPClient
309337
_filtered_reader: Any = PrivateAttr(default=None) # StreamReader
338+
_tcp_writer: Any = PrivateAttr(default=None) # asyncio.StreamWriter (TCP mode)
310339
_closed: bool = PrivateAttr(default=False)
311340
_working_dir: str = PrivateAttr(default="")
312341

@@ -375,70 +404,86 @@ def init_state(
375404
self._initialized = True
376405

377406
def _start_acp_server(self, state: ConversationState) -> None:
378-
"""Start the ACP subprocess and initialize the session."""
407+
"""Start the ACP connection and initialize the session.
408+
409+
In subprocess mode (``acp_command``), spawns the server as a child
410+
process and communicates via stdin/stdout. In TCP mode
411+
(``acp_host``/``acp_port``), connects to an already-running server
412+
over the network.
413+
"""
379414
import asyncio
380415

381416
from acp.client.connection import (
382417
ClientSideConnection,
383418
)
384-
from acp.transports import (
385-
default_environment,
386-
)
387419

388420
client = _OpenHandsACPClient()
389421
client._llm_ref = self.llm
390422
self._client = client
391423

392-
# Build environment: inherit current env + ACP extras
393-
env = default_environment()
394-
env.update(os.environ)
395-
env.update(self.acp_env)
396-
397-
command = self.acp_command[0]
398-
args = list(self.acp_command[1:]) + list(self.acp_args)
399-
400424
working_dir = str(state.workspace.working_dir)
401425

402-
async def _init() -> tuple[Any, Any, Any, str]:
403-
# Spawn the subprocess directly so we can install a
404-
# filtering reader that skips non-JSON-RPC lines some
405-
# ACP servers (e.g. claude-code-acp v0.1.x) write to
406-
# stdout.
407-
process = await asyncio.create_subprocess_exec(
408-
command,
409-
*args,
410-
stdin=asyncio.subprocess.PIPE,
411-
stdout=asyncio.subprocess.PIPE,
412-
stderr=asyncio.subprocess.PIPE,
413-
env=env,
414-
)
415-
assert process.stdin is not None
416-
assert process.stdout is not None
417-
418-
# Wrap the subprocess stdout in a filtering reader that
419-
# only passes lines starting with '{' (JSON-RPC messages).
420-
filtered_reader = asyncio.StreamReader()
421-
asyncio.get_event_loop().create_task(
422-
_filter_jsonrpc_lines(process.stdout, filtered_reader)
423-
)
426+
if self.acp_host is not None:
427+
# --- TCP mode ---
428+
async def _init_tcp() -> tuple[Any, str, Any]:
429+
reader, writer = await asyncio.open_connection(
430+
self.acp_host, self.acp_port
431+
)
432+
conn = ClientSideConnection(
433+
client,
434+
writer, # input_stream (write to server)
435+
reader, # output_stream (read from server)
436+
)
437+
await conn.initialize(protocol_version=1)
438+
response = await conn.new_session(cwd=working_dir)
439+
return conn, response.session_id, writer
424440

425-
conn = ClientSideConnection(
426-
client,
427-
process.stdin, # write to subprocess
428-
filtered_reader, # read filtered output
441+
result = self._executor.run_async(_init_tcp)
442+
self._conn, self._session_id, self._tcp_writer = result
443+
else:
444+
# --- Subprocess mode ---
445+
from acp.transports import (
446+
default_environment,
429447
)
430448

431-
# Initialize the protocol
432-
await conn.initialize(protocol_version=1)
449+
env = default_environment()
450+
env.update(os.environ)
451+
env.update(self.acp_env)
452+
453+
assert self.acp_command is not None
454+
command = self.acp_command[0]
455+
args = list(self.acp_command[1:]) + list(self.acp_args)
456+
457+
async def _init_subprocess() -> tuple[Any, Any, Any, str]:
458+
process = await asyncio.create_subprocess_exec(
459+
command,
460+
*args,
461+
stdin=asyncio.subprocess.PIPE,
462+
stdout=asyncio.subprocess.PIPE,
463+
stderr=asyncio.subprocess.PIPE,
464+
env=env,
465+
)
466+
assert process.stdin is not None
467+
assert process.stdout is not None
433468

434-
# Create a new session
435-
response = await conn.new_session(cwd=working_dir)
436-
session_id = response.session_id
469+
filtered_reader = asyncio.StreamReader()
470+
asyncio.get_event_loop().create_task(
471+
_filter_jsonrpc_lines(process.stdout, filtered_reader)
472+
)
437473

438-
return conn, process, filtered_reader, session_id
474+
conn = ClientSideConnection(
475+
client,
476+
process.stdin,
477+
filtered_reader,
478+
)
479+
480+
await conn.initialize(protocol_version=1)
481+
response = await conn.new_session(cwd=working_dir)
482+
return conn, process, filtered_reader, response.session_id
483+
484+
result = self._executor.run_async(_init_subprocess)
485+
self._conn, self._process, self._filtered_reader, self._session_id = result
439486

440-
result = self._executor.run_async(_init)
441-
self._conn, self._process, self._filtered_reader, self._session_id = result
442487
self._working_dir = working_dir
443488

444489
def step(
@@ -640,6 +685,14 @@ def _cleanup(self) -> None:
640685
logger.debug("Error closing ACP connection: %s", e)
641686
self._conn = None
642687

688+
# Close TCP writer if in network mode
689+
if self._tcp_writer is not None:
690+
try:
691+
self._tcp_writer.close()
692+
except Exception as e:
693+
logger.debug("Error closing TCP writer: %s", e)
694+
self._tcp_writer = None
695+
643696
# Terminate the subprocess
644697
if self._process is not None:
645698
try:

tests/sdk/agent/test_acp_agent.py

Lines changed: 110 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,15 @@
2626

2727

2828
def _make_agent(**kwargs) -> ACPAgent:
29-
return ACPAgent(acp_command=["echo", "test"], **kwargs)
29+
if "acp_command" not in kwargs and "acp_host" not in kwargs:
30+
kwargs["acp_command"] = ["echo", "test"]
31+
return ACPAgent(**kwargs)
32+
33+
34+
def _make_tcp_agent(**kwargs) -> ACPAgent:
35+
kwargs.setdefault("acp_host", "localhost")
36+
kwargs.setdefault("acp_port", 4001)
37+
return ACPAgent(**kwargs)
3038

3139

3240
def _make_state(tmp_path) -> ConversationState:
@@ -57,9 +65,23 @@ def test_creates_with_empty_default_tools(self):
5765
agent = _make_agent()
5866
assert agent.include_default_tools == []
5967

60-
def test_requires_acp_command(self):
61-
with pytest.raises(Exception):
62-
ACPAgent() # type: ignore[call-arg]
68+
def test_creates_with_tcp_transport(self):
69+
agent = ACPAgent(acp_host="localhost", acp_port=4001)
70+
assert agent.acp_host == "localhost"
71+
assert agent.acp_port == 4001
72+
assert agent.acp_command is None
73+
74+
def test_rejects_both_command_and_host(self):
75+
with pytest.raises(Exception, match="mutually exclusive"):
76+
ACPAgent(acp_command=["echo"], acp_host="localhost", acp_port=4001)
77+
78+
def test_rejects_neither_command_nor_host(self):
79+
with pytest.raises(Exception, match="Either acp_command or acp_host"):
80+
ACPAgent()
81+
82+
def test_rejects_host_without_port(self):
83+
with pytest.raises(Exception, match="acp_port is required"):
84+
ACPAgent(acp_host="localhost")
6385

6486
def test_acp_command_stored(self):
6587
agent = ACPAgent(acp_command=["npx", "-y", "claude-code-acp"])
@@ -122,6 +144,26 @@ def test_deserialization_from_dict(self):
122144
assert isinstance(agent, ACPAgent)
123145
assert agent.acp_command == ["echo", "test"]
124146

147+
def test_roundtrip_serialization_tcp(self):
148+
agent = ACPAgent(acp_host="acp.internal", acp_port=5000)
149+
dumped = agent.model_dump_json()
150+
restored = AgentBase.model_validate_json(dumped)
151+
assert isinstance(restored, ACPAgent)
152+
assert restored.acp_host == "acp.internal"
153+
assert restored.acp_port == 5000
154+
assert restored.acp_command is None
155+
156+
def test_deserialization_from_dict_tcp(self):
157+
data = {
158+
"kind": "ACPAgent",
159+
"acp_host": "10.0.0.1",
160+
"acp_port": 4001,
161+
}
162+
agent = AgentBase.model_validate(data)
163+
assert isinstance(agent, ACPAgent)
164+
assert agent.acp_host == "10.0.0.1"
165+
assert agent.acp_port == 4001
166+
125167

126168
# ---------------------------------------------------------------------------
127169
# Feature validation (init_state guards)
@@ -175,6 +217,31 @@ def test_emits_system_prompt_event(self, tmp_path):
175217
assert events[0].system_prompt.text == "ACP-managed agent"
176218
assert events[0].tools == []
177219

220+
def test_tcp_transport_connects(self, tmp_path):
221+
"""TCP mode stores connection, session_id, and tcp_writer."""
222+
agent = _make_tcp_agent()
223+
state = _make_state(tmp_path)
224+
events: list = []
225+
226+
mock_conn = MagicMock()
227+
mock_writer = MagicMock()
228+
229+
# Mock AsyncExecutor so run_async returns the expected TCP tuple
230+
mock_executor = MagicMock()
231+
mock_executor.run_async.return_value = (mock_conn, "tcp-session-1", mock_writer)
232+
233+
with patch(
234+
"openhands.sdk.utils.async_executor.AsyncExecutor",
235+
return_value=mock_executor,
236+
):
237+
agent.init_state(state, on_event=events.append)
238+
239+
assert agent._conn is mock_conn
240+
assert agent._session_id == "tcp-session-1"
241+
assert agent._tcp_writer is mock_writer
242+
assert agent._process is None
243+
assert agent._filtered_reader is None
244+
178245

179246
# ---------------------------------------------------------------------------
180247
# _OpenHandsACPClient
@@ -490,6 +557,45 @@ def test_close_handles_errors_gracefully(self):
490557
# Should not raise
491558
agent.close()
492559

560+
def test_close_closes_tcp_writer(self):
561+
agent = _make_tcp_agent()
562+
mock_writer = MagicMock()
563+
agent._tcp_writer = mock_writer
564+
agent._executor = MagicMock()
565+
agent._conn = None
566+
agent._process = None
567+
568+
agent.close()
569+
570+
mock_writer.close.assert_called_once()
571+
assert agent._tcp_writer is None
572+
573+
def test_close_skips_process_in_tcp_mode(self):
574+
agent = _make_tcp_agent()
575+
mock_writer = MagicMock()
576+
agent._tcp_writer = mock_writer
577+
agent._executor = MagicMock()
578+
agent._conn = None
579+
agent._process = None # No process in TCP mode
580+
581+
agent.close()
582+
583+
# No process terminate/kill should be attempted
584+
mock_writer.close.assert_called_once()
585+
586+
def test_close_tcp_writer_handles_errors(self):
587+
agent = _make_tcp_agent()
588+
mock_writer = MagicMock()
589+
mock_writer.close.side_effect = OSError("connection reset")
590+
agent._tcp_writer = mock_writer
591+
agent._executor = MagicMock()
592+
agent._conn = None
593+
agent._process = None
594+
595+
# Should not raise
596+
agent.close()
597+
assert agent._tcp_writer is None
598+
493599

494600
# ---------------------------------------------------------------------------
495601
# _filter_jsonrpc_lines

0 commit comments

Comments
 (0)