-
Notifications
You must be signed in to change notification settings - Fork 233
Expand file tree
/
Copy pathacp_agent.py
More file actions
1272 lines (1119 loc) · 50.7 KB
/
acp_agent.py
File metadata and controls
1272 lines (1119 loc) · 50.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""ACPAgent — an AgentBase subclass that delegates to an ACP server.
The Agent Client Protocol (ACP) lets OpenHands power conversations using
ACP-compatible servers (Claude Code, Gemini CLI, etc.) instead of direct
LLM calls. The ACP server manages its own LLM, tools, and execution;
the ACPAgent simply relays user messages and collects the response.
Unlike the built-in Agent, one ACP ``step()`` maps to one complete remote
assistant turn. ACPAgent therefore emits a terminal ``FinishAction`` at the
end of each step to delimit that completed turn for downstream consumers.
See https://agentclientprotocol.com/protocol/overview
"""
from __future__ import annotations
import asyncio
import json
import os
import threading
import time
import uuid
from collections.abc import Generator
from typing import TYPE_CHECKING, Any
from acp.client.connection import ClientSideConnection
from acp.exceptions import RequestError as ACPRequestError
from acp.helpers import text_block
from acp.schema import (
AgentMessageChunk,
AgentThoughtChunk,
AllowedOutcome,
PromptResponse,
RequestPermissionResponse,
TextContentBlock,
ToolCallProgress,
ToolCallStart,
UsageUpdate,
)
from acp.transports import default_environment
from pydantic import Field, PrivateAttr
from openhands.sdk.agent.base import AgentBase
from openhands.sdk.conversation.state import ConversationExecutionStatus
from openhands.sdk.event import (
ACPToolCallEvent,
ActionEvent,
MessageEvent,
ObservationEvent,
SystemPromptEvent,
)
from openhands.sdk.event.conversation_error import ConversationErrorEvent
from openhands.sdk.llm import LLM, Message, MessageToolCall, TextContent
from openhands.sdk.logger import get_logger
from openhands.sdk.observability.laminar import maybe_init_laminar, observe
from openhands.sdk.tool import Tool # noqa: TC002
from openhands.sdk.tool.builtins.finish import FinishAction, FinishObservation
logger = get_logger(__name__)
maybe_init_laminar()
if TYPE_CHECKING:
from openhands.sdk.conversation import (
ConversationCallbackType,
ConversationState,
ConversationTokenCallbackType,
LocalConversation,
)
# Maximum seconds to wait for a UsageUpdate notification after prompt()
# returns. The ACP server writes UsageUpdate to the wire before the
# PromptResponse, so under normal conditions the notification handler
# completes almost immediately. This timeout is a safety net for slow
# or remote servers.
_USAGE_UPDATE_TIMEOUT: float = float(os.environ.get("ACP_USAGE_UPDATE_TIMEOUT", "2.0"))
# Retry configuration for transient ACP connection errors.
# These errors can occur when the connection drops mid-conversation but the
# session state is still valid on the server side.
_ACP_PROMPT_MAX_RETRIES: int = int(os.environ.get("ACP_PROMPT_MAX_RETRIES", "3"))
_ACP_PROMPT_RETRY_DELAYS: tuple[float, ...] = (5.0, 15.0, 30.0) # seconds
# Exception types that indicate transient connection issues worth retrying
_RETRIABLE_CONNECTION_ERRORS = (OSError, ConnectionError, BrokenPipeError, EOFError)
# JSON-RPC error codes from the ACP server that are transient and worth
# retrying. These map to server-side failures (HTTP 500 equivalents) where
# the session state is still valid but the request failed.
# -32603 = "Internal error" (JSON-RPC spec) — covers ACP server crashes,
# upstream model 500s, and transient infrastructure errors.
_RETRIABLE_SERVER_ERROR_CODES: frozenset[int] = frozenset({-32603})
# Limit for asyncio.StreamReader buffers used by the ACP subprocess pipes.
# The default (64 KiB) is too small for session_update notifications that
# carry large tool-call outputs (e.g. file contents, test results). When
# a single JSON-RPC line exceeds the limit, readline() raises
# LimitOverrunError, silently killing the filter/receive pipeline and
# leaving the prompt() future unresolved forever. 100 MiB is a pragmatic
# compatibility limit for current ACP servers, not an endorsement of huge
# JSON-RPC payloads; the long-term fix is protocol-level chunking/streaming
# for large tool output.
_STREAM_READER_LIMIT: int = 100 * 1024 * 1024 # 100 MiB
# Minimum interval between on_activity heartbeat signals (seconds).
# Throttled to avoid excessive calls while still keeping the idle timer
# well below the ~20 min runtime-api kill threshold.
_ACTIVITY_SIGNAL_INTERVAL: float = 30.0
def _make_dummy_llm() -> LLM:
"""Create a dummy LLM that should never be called directly."""
return LLM(model="acp-managed")
# ---------------------------------------------------------------------------
# ACP Client implementation
# ---------------------------------------------------------------------------
# Known ACP server name → bypass-permissions mode ID mappings.
_BYPASS_MODE_MAP: dict[str, str] = {
"claude-agent": "bypassPermissions",
"codex-acp": "full-access",
"gemini-cli": "yolo",
}
_DEFAULT_BYPASS_MODE = "full-access"
# ACP auth method ID → environment variable that supplies the credential.
# When the server reports auth_methods, we pick the first method whose
# required env var is set.
# Note: claude-login is intentionally NOT included because Claude Code ACP
# uses bypassPermissions mode instead of API key authentication.
_AUTH_METHOD_ENV_MAP: dict[str, str] = {
"codex-api-key": "CODEX_API_KEY",
"openai-api-key": "OPENAI_API_KEY",
"gemini-api-key": "GEMINI_API_KEY",
}
def _select_auth_method(
auth_methods: list[Any],
env: dict[str, str],
) -> str | None:
"""Pick an auth method whose required env var is present.
Returns the ``id`` of the first matching method, or ``None`` if no
env-var-based method is available (the server may not require auth).
"""
method_ids = {m.id for m in auth_methods}
for method_id, env_var in _AUTH_METHOD_ENV_MAP.items():
if method_id in method_ids and env_var in env:
return method_id
return None
def _resolve_bypass_mode(agent_name: str) -> str:
"""Return the session mode ID that bypasses all permission prompts.
Different ACP servers use different mode IDs for the same concept:
- claude-agent-acp → ``bypassPermissions``
- codex-acp → ``full-access``
- gemini-cli → ``yolo``
Falls back to ``full-access`` for unknown servers.
"""
for key, mode in _BYPASS_MODE_MAP.items():
if key in agent_name.lower():
return mode
return _DEFAULT_BYPASS_MODE
def _build_session_meta(agent_name: str, acp_model: str | None) -> dict[str, Any]:
"""Build ACP session metadata for server-specific model selection."""
if not acp_model:
return {}
# claude-agent-acp: model selection via session _meta (claudeCode.options.model)
if "claude" in agent_name.lower():
return {"claudeCode": {"options": {"model": acp_model}}}
# codex-acp, gemini-cli: use protocol-level set_session_model instead (see below)
return {}
async def _maybe_set_session_model(
conn: ClientSideConnection,
agent_name: str,
session_id: str,
acp_model: str | None,
) -> None:
"""Apply a protocol-level session model override when the server supports it."""
if not acp_model:
return
# codex-acp, gemini-cli: model selection via set_session_model protocol method
# claude-agent-acp: uses session _meta instead (see _build_session_meta)
if "codex-acp" in agent_name.lower() or "gemini-cli" in agent_name.lower():
await conn.set_session_model(model_id=acp_model, session_id=session_id)
def _extract_token_usage(
response: Any,
) -> tuple[int, int, int, int, int]:
"""Extract token usage from an ACP PromptResponse.
Returns (input_tokens, output_tokens, cache_read, cache_write, reasoning).
Checks two locations:
- claude-agent-acp, codex-acp: ``response.usage`` (standard ACP field)
- gemini-cli: ``response._meta.quota.token_count`` (non-standard)
"""
if response is not None and response.usage is not None:
u = response.usage
return (
u.input_tokens,
u.output_tokens,
u.cached_read_tokens or 0,
u.cached_write_tokens or 0,
u.thought_tokens or 0,
)
if response is not None and response.field_meta is not None:
quota = response.field_meta.get("quota", {})
tc = quota.get("token_count", {})
return (tc.get("input_tokens", 0), tc.get("output_tokens", 0), 0, 0, 0)
return (0, 0, 0, 0, 0)
def _estimate_cost_from_tokens(
model: str, input_tokens: int, output_tokens: int
) -> float:
"""Estimate cost from token counts using LiteLLM's pricing database.
Returns 0.0 if pricing is unavailable for the model.
"""
try:
import litellm
cost_map = litellm.model_cost
info = cost_map.get(model, {})
input_cost = info.get("input_cost_per_token", 0) or 0
output_cost = info.get("output_cost_per_token", 0) or 0
return input_tokens * input_cost + output_tokens * output_cost
except Exception:
return 0.0
def _serialize_tool_content(content: list[Any] | None) -> list[dict[str, Any]] | None:
"""Serialize ACP tool call content blocks to plain dicts for JSON storage."""
if not content:
return None
return [
c.model_dump(mode="json") if hasattr(c, "model_dump") else c for c in content
]
async def _filter_jsonrpc_lines(source: Any, dest: Any) -> None:
"""Read lines from *source* and forward only JSON-RPC lines to *dest*.
Some ACP servers (e.g. ``claude-code-acp`` v0.1.x) emit log messages
like ``[ACP] ...`` to stdout alongside JSON-RPC traffic. This coroutine
strips those non-protocol lines so the JSON-RPC connection is not confused.
"""
try:
while True:
line = await source.readline()
if not line:
dest.feed_eof()
break
# JSON-RPC messages are single-line JSON objects containing
# "jsonrpc". Filter out multi-line pretty-printed JSON from
# debug logs that also start with '{'.
stripped = line.lstrip()
if stripped.startswith(b"{") and b'"jsonrpc"' in line:
dest.feed_data(line)
else:
logger.debug(
"ACP stdout (non-JSON): %s",
line.decode(errors="replace").rstrip(),
)
except Exception:
logger.debug("_filter_jsonrpc_lines stopped", exc_info=True)
dest.feed_eof()
class _OpenHandsACPBridge:
"""Bridge between OpenHands and ACP that accumulates session updates.
Implements the ``Client`` protocol from ``agent_client_protocol``.
"""
def __init__(self) -> None:
self.accumulated_text: list[str] = []
self.accumulated_thoughts: list[str] = []
self.accumulated_tool_calls: list[dict[str, Any]] = []
self.on_token: Any = None # ConversationTokenCallbackType | None
# Live event sink — fired from session_update as ACP tool-call
# updates arrive, so the event stream reflects real subprocess
# progress instead of a single end-of-turn burst. Set by
# ACPAgent.step() for the duration of one prompt() round-trip.
self.on_event: Any = None # ConversationCallbackType | None
# Activity heartbeat — called (throttled) during session_update to
# signal that the ACP subprocess is still actively working. Set by
# ACPAgent.step() to keep the agent-server's idle timer alive.
self.on_activity: Any = None # Callable[[], None] | None
self._last_activity_signal: float = float("-inf")
# Telemetry state from UsageUpdate (persists across turns)
self._last_cost: float = 0.0 # last cumulative cost seen
self._last_cost_by_session: dict[str, float] = {}
self._context_window: int = 0 # last context window seen
self._context_window_by_session: dict[str, int] = {}
# Per-turn synchronization for UsageUpdate notifications.
self._turn_usage_updates: dict[str, Any] = {}
self._usage_received: dict[str, asyncio.Event] = {}
# Fork session state for ask_agent() — guarded by _fork_lock to
# prevent concurrent ask_agent() calls from colliding.
self._fork_lock = threading.Lock()
self._fork_session_id: str | None = None
self._fork_accumulated_text: list[str] = []
def reset(self) -> None:
self.accumulated_text.clear()
self.accumulated_thoughts.clear()
self.accumulated_tool_calls.clear()
self.on_token = None
self.on_event = None
self.on_activity = None
self._turn_usage_updates.clear()
self._usage_received.clear()
# Note: telemetry state (_last_cost, _context_window, _last_activity_signal,
# etc.) is intentionally NOT cleared — it accumulates across turns.
def prepare_usage_sync(self, session_id: str) -> asyncio.Event:
"""Prepare per-turn UsageUpdate synchronization for a session."""
event = asyncio.Event()
self._usage_received[session_id] = event
self._turn_usage_updates.pop(session_id, None)
return event
def get_turn_usage_update(self, session_id: str) -> Any:
"""Return the latest UsageUpdate observed for the current turn."""
return self._turn_usage_updates.get(session_id)
def pop_turn_usage_update(self, session_id: str) -> Any:
"""Consume per-turn UsageUpdate synchronization state for a session."""
self._usage_received.pop(session_id, None)
return self._turn_usage_updates.pop(session_id, None)
# -- Client protocol methods ------------------------------------------
async def session_update(
self,
session_id: str,
update: Any,
**kwargs: Any, # noqa: ARG002
) -> None:
logger.debug("ACP session_update: type=%s", type(update).__name__)
# Route fork session updates to the fork accumulator
if self._fork_session_id is not None and session_id == self._fork_session_id:
if isinstance(update, AgentMessageChunk):
if isinstance(update.content, TextContentBlock):
self._fork_accumulated_text.append(update.content.text)
return
if isinstance(update, AgentMessageChunk):
if isinstance(update.content, TextContentBlock):
text = update.content.text
self.accumulated_text.append(text)
if self.on_token is not None:
try:
self.on_token(text)
except Exception:
logger.debug("on_token callback failed", exc_info=True)
self._maybe_signal_activity()
elif isinstance(update, AgentThoughtChunk):
if isinstance(update.content, TextContentBlock):
self.accumulated_thoughts.append(update.content.text)
elif isinstance(update, UsageUpdate):
# Store the update for step()/ask_agent() to process in one place.
self._context_window = update.size
self._context_window_by_session[session_id] = update.size
self._turn_usage_updates[session_id] = update
event = self._usage_received.get(session_id)
if event is not None:
event.set()
elif isinstance(update, ToolCallStart):
entry = {
"tool_call_id": update.tool_call_id,
"title": update.title,
"tool_kind": update.kind,
"status": update.status,
"raw_input": update.raw_input,
"raw_output": update.raw_output,
"content": _serialize_tool_content(update.content),
}
self.accumulated_tool_calls.append(entry)
logger.debug("ACP tool call start: %s", update.tool_call_id)
self._emit_tool_call_event(entry)
self._maybe_signal_activity()
elif isinstance(update, ToolCallProgress):
# Find the existing tool call entry and merge updates
target: dict[str, Any] | None = None
for tc in self.accumulated_tool_calls:
if tc["tool_call_id"] == update.tool_call_id:
if update.title is not None:
tc["title"] = update.title
if update.kind is not None:
tc["tool_kind"] = update.kind
if update.status is not None:
tc["status"] = update.status
if update.raw_input is not None:
tc["raw_input"] = update.raw_input
if update.raw_output is not None:
tc["raw_output"] = update.raw_output
if update.content is not None:
tc["content"] = _serialize_tool_content(update.content)
target = tc
break
logger.debug("ACP tool call progress: %s", update.tool_call_id)
if target is not None:
self._emit_tool_call_event(target)
self._maybe_signal_activity()
else:
logger.debug("ACP session update: %s", type(update).__name__)
def _emit_tool_call_event(self, tc: dict[str, Any]) -> None:
"""Emit an ACPToolCallEvent reflecting the current state of ``tc``.
Called from ``session_update`` on each ``ToolCallStart`` /
``ToolCallProgress`` so downstream consumers see tool cards appear
and update as the subprocess runs. The same ``tool_call_id`` is
reused on every emission — consumers should dedupe by id and treat
the last-seen event as authoritative.
"""
if self.on_event is None:
return
try:
event = ACPToolCallEvent(
tool_call_id=tc["tool_call_id"],
title=tc["title"],
status=tc.get("status"),
tool_kind=tc.get("tool_kind"),
raw_input=tc.get("raw_input"),
raw_output=tc.get("raw_output"),
content=tc.get("content"),
is_error=tc.get("status") == "failed",
)
self.on_event(event)
except Exception:
logger.debug("on_event callback failed", exc_info=True)
def _maybe_signal_activity(self) -> None:
"""Signal activity to the agent-server's idle tracker (throttled).
During conn.prompt(), ACP tool calls run inside the subprocess and
never hit the agent-server's HTTP endpoints. Without this heartbeat
the server's idle_time grows unboundedly and the runtime-api kills
the pod (default idle threshold ~20 min).
Throttled to at most once per _ACTIVITY_SIGNAL_INTERVAL seconds to
avoid excessive overhead on chatty ACP servers.
"""
if self.on_activity is None:
return
now = time.monotonic()
if now - self._last_activity_signal >= _ACTIVITY_SIGNAL_INTERVAL:
self._last_activity_signal = now
try:
self.on_activity()
except Exception:
logger.debug("on_activity callback failed", exc_info=True)
async def request_permission(
self,
options: list[Any],
session_id: str, # noqa: ARG002
tool_call: Any,
**kwargs: Any, # noqa: ARG002
) -> Any:
"""Auto-approve all permission requests from the ACP server."""
# Pick the first option (usually "allow once")
option_id = options[0].option_id if options else "allow_once"
logger.info(
"ACP auto-approving permission: %s (option: %s)",
tool_call,
option_id,
)
return RequestPermissionResponse(
outcome=AllowedOutcome(outcome="selected", option_id=option_id),
)
# fs/terminal methods — raise NotImplementedError; ACP server handles its own
async def write_text_file(
self, content: str, path: str, session_id: str, **kwargs: Any
) -> None:
raise NotImplementedError("ACP server handles file operations")
async def read_text_file(
self,
path: str,
session_id: str,
limit: int | None = None,
line: int | None = None,
**kwargs: Any,
) -> Any:
raise NotImplementedError("ACP server handles file operations")
async def create_terminal(
self,
command: str,
session_id: str,
args: list[str] | None = None,
cwd: str | None = None,
env: Any = None,
output_byte_limit: int | None = None,
**kwargs: Any,
) -> Any:
raise NotImplementedError("ACP server handles terminal operations")
async def terminal_output(
self, session_id: str, terminal_id: str, **kwargs: Any
) -> Any:
raise NotImplementedError("ACP server handles terminal operations")
async def release_terminal(
self, session_id: str, terminal_id: str, **kwargs: Any
) -> None:
raise NotImplementedError("ACP server handles terminal operations")
async def wait_for_terminal_exit(
self, session_id: str, terminal_id: str, **kwargs: Any
) -> Any:
raise NotImplementedError("ACP server handles terminal operations")
async def kill_terminal(
self, session_id: str, terminal_id: str, **kwargs: Any
) -> None:
raise NotImplementedError("ACP server handles terminal operations")
async def ext_method(
self,
method: str, # noqa: ARG002
params: dict[str, Any], # noqa: ARG002
) -> dict[str, Any]:
return {}
async def ext_notification(
self,
method: str, # noqa: ARG002
params: dict[str, Any], # noqa: ARG002
) -> None:
pass
def on_connect(self, conn: Any) -> None: # noqa: ARG002
pass
# ---------------------------------------------------------------------------
# ACPAgent
# ---------------------------------------------------------------------------
class ACPAgent(AgentBase):
"""Agent that delegates to an ACP-compatible subprocess server."""
# Override required fields with ACP-appropriate defaults
llm: LLM = Field(default_factory=_make_dummy_llm)
tools: list[Tool] = Field(default_factory=list)
include_default_tools: list[str] = Field(default_factory=list)
# ACP-specific configuration
acp_command: list[str] = Field(
...,
description=(
"Command to start the ACP server, e.g."
" ['npx', '-y', '@agentclientprotocol/claude-agent-acp']"
),
)
acp_args: list[str] = Field(
default_factory=list,
description="Additional arguments for the ACP server command",
)
acp_env: dict[str, str] = Field(
default_factory=dict,
description="Additional environment variables for the ACP server process",
)
acp_session_mode: str | None = Field(
default=None,
description=(
"Session mode ID to set after creating a session. "
"If None (default), auto-detected from the ACP server type: "
"'bypassPermissions' for claude-agent-acp, 'full-access' for codex-acp."
),
)
acp_prompt_timeout: float = Field(
default=1800.0,
description=(
"Timeout in seconds for a single ACP prompt() call. "
"Prevents indefinite hangs when the ACP server fails to respond."
),
)
acp_model: str | None = Field(
default=None,
description=(
"Model for the ACP server to use (e.g. 'claude-opus-4-6' or "
"'gpt-5.4'). For Claude ACP, passed via session _meta. For Codex "
"ACP, applied via the protocol-level set_session_model call. "
"If None, the server picks its default."
),
)
def model_post_init(self, __context: object) -> None:
super().model_post_init(__context)
# Propagate the actual model name to metrics so that cost/token
# entries are attributed to the real model, not the sentinel
# "acp-managed" placeholder.
if self.acp_model:
self.llm.metrics.model_name = self.acp_model
if self.llm.metrics.accumulated_token_usage is not None:
self.llm.metrics.accumulated_token_usage.model = self.acp_model
# Private runtime state
_executor: Any = PrivateAttr(default=None)
_conn: Any = PrivateAttr(default=None) # ClientSideConnection
_session_id: str | None = PrivateAttr(default=None)
_process: Any = PrivateAttr(default=None) # asyncio subprocess
_client: Any = PrivateAttr(default=None) # _OpenHandsACPBridge
_filtered_reader: Any = PrivateAttr(default=None) # StreamReader
_closed: bool = PrivateAttr(default=False)
_working_dir: str = PrivateAttr(default="")
_agent_name: str = PrivateAttr(
default=""
) # ACP server name from InitializeResponse
_agent_version: str = PrivateAttr(
default=""
) # ACP server version from InitializeResponse
# Callback to signal that the ACP subprocess is actively working.
# Injected by the agent-server to call update_last_execution_time().
_on_activity: Any = PrivateAttr(default=None) # Callable[[], None] | None
# -- Helpers -----------------------------------------------------------
def _record_usage(
self,
response: PromptResponse | None,
session_id: str,
elapsed: float | None = None,
usage_update: UsageUpdate | None = None,
) -> None:
"""Record cost, token usage, latency, and notify stats callback once.
Args:
response: The ACP PromptResponse (may carry a ``usage`` field).
session_id: Session identifier used as the response_id for metrics.
elapsed: Wall-clock seconds for this prompt round-trip (optional).
usage_update: The synchronized ACP UsageUpdate for this turn, if any.
"""
# -- Cost recording ---------------------------------------------------
# claude-agent-acp, codex-acp: report cost via UsageUpdate notification
# gemini-cli: does not send UsageUpdate (cost derived from tokens below)
cost_recorded = False
if usage_update is not None and usage_update.cost is not None:
last_cost = self._client._last_cost_by_session.get(session_id, 0.0)
delta = usage_update.cost.amount - last_cost
if delta > 0:
self.llm.metrics.add_cost(delta)
cost_recorded = True
self._client._last_cost_by_session[session_id] = usage_update.cost.amount
self._client._last_cost = usage_update.cost.amount
# -- Token usage recording --------------------------------------------
input_tokens, output_tokens, cache_read, cache_write, reasoning = (
_extract_token_usage(response)
)
if input_tokens or output_tokens:
self.llm.metrics.add_token_usage(
prompt_tokens=input_tokens,
completion_tokens=output_tokens,
cache_read_tokens=cache_read,
cache_write_tokens=cache_write,
reasoning_tokens=reasoning,
context_window=self._client._context_window_by_session.get(
session_id, self._client._context_window
),
response_id=session_id,
)
# -- Cost derivation from tokens --------------------------------------
# gemini-cli: no UsageUpdate cost, so derive from token counts using
# LiteLLM's model pricing database (same source the proxy uses).
# claude-agent-acp, codex-acp: skipped since cost_recorded is True.
if not cost_recorded and (input_tokens or output_tokens) and self.acp_model:
cost = _estimate_cost_from_tokens(
self.acp_model, input_tokens, output_tokens
)
if cost > 0:
self.llm.metrics.add_cost(cost)
if not cost_recorded and not input_tokens and not output_tokens:
# gemini-cli currently returns response.usage=None and
# response.field_meta=None (ACP SDK strips _meta during
# serialization). Tracked in google-gemini/gemini-cli#24280.
logger.debug(
"No usage data from ACP server %s — token/cost tracking unavailable",
self._agent_name or "unknown",
)
if elapsed is not None:
self.llm.metrics.add_response_latency(elapsed, session_id)
if self.llm.telemetry._stats_update_callback is not None:
try:
self.llm.telemetry._stats_update_callback()
except Exception:
logger.debug("Stats update callback failed", exc_info=True)
# -- Override base properties to be no-ops for ACP ---------------------
@property
def agent_name(self) -> str:
"""Name of the ACP server (from InitializeResponse.agent_info)."""
return self._agent_name
@property
def agent_version(self) -> str:
"""Version of the ACP server (from InitializeResponse.agent_info)."""
return self._agent_version
def get_all_llms(self) -> Generator[LLM]:
yield self.llm
# -- Lifecycle ---------------------------------------------------------
def init_state(
self,
state: ConversationState,
on_event: ConversationCallbackType,
) -> None:
"""Spawn the ACP server and initialize a session."""
# Emit a placeholder system prompt so the visualizer shows a section
# even though the real system prompt is managed by the ACP server.
on_event(
SystemPromptEvent(
source="agent",
system_prompt=TextContent(
text=(
"This conversation is powered by an ACP server. "
"The system prompt and tools are managed by the "
"ACP server and are not available for display."
)
),
tools=[],
)
)
# Validate no unsupported features
if self.tools:
raise NotImplementedError(
"ACPAgent does not support custom tools; "
"the ACP server manages its own tools"
)
if self.mcp_config:
raise NotImplementedError(
"ACPAgent does not support mcp_config; "
"configure MCP on the ACP server instead"
)
if self.condenser is not None:
raise NotImplementedError(
"ACPAgent does not support condenser; "
"the ACP server manages its own context"
)
if self.agent_context is not None:
raise NotImplementedError(
"ACPAgent does not support agent_context; "
"configure the ACP server directly"
)
from openhands.sdk.utils.async_executor import AsyncExecutor
self._executor = AsyncExecutor()
try:
self._start_acp_server(state)
except Exception as e:
logger.error("Failed to start ACP server: %s", e)
self._cleanup()
raise
self._initialized = True
# Store agent info in agent_state so it's accessible from remote
# conversations (PrivateAttrs aren't serialized in state updates).
state.agent_state = {
**state.agent_state,
"acp_agent_name": self._agent_name,
"acp_agent_version": self._agent_version,
}
def _start_acp_server(self, state: ConversationState) -> None:
"""Start the ACP subprocess and initialize the session."""
client = _OpenHandsACPBridge()
self._client = client
# Build environment: inherit current env + ACP extras
env = default_environment()
env.update(os.environ)
env.update(self.acp_env)
# Strip CLAUDECODE so nested Claude Code instances don't refuse to start
env.pop("CLAUDECODE", None)
command = self.acp_command[0]
args = list(self.acp_command[1:]) + list(self.acp_args)
working_dir = str(state.workspace.working_dir)
async def _init() -> tuple[Any, Any, Any, str, str, str]:
# Spawn the subprocess directly so we can install a
# filtering reader that skips non-JSON-RPC lines some
# ACP servers (e.g. claude-code-acp v0.1.x) write to
# stdout.
process = await asyncio.create_subprocess_exec(
command,
*args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
limit=_STREAM_READER_LIMIT,
)
assert process.stdin is not None
assert process.stdout is not None
# Wrap the subprocess stdout in a filtering reader that
# only passes lines starting with '{' (JSON-RPC messages).
filtered_reader = asyncio.StreamReader(limit=_STREAM_READER_LIMIT)
asyncio.get_event_loop().create_task(
_filter_jsonrpc_lines(process.stdout, filtered_reader)
)
conn = ClientSideConnection(
client,
process.stdin, # write to subprocess
filtered_reader, # read filtered output
)
# Initialize the protocol and discover server identity
init_response = await conn.initialize(protocol_version=1)
agent_name = ""
agent_version = ""
if init_response.agent_info is not None:
agent_name = init_response.agent_info.name or ""
agent_version = init_response.agent_info.version or ""
logger.info(
"ACP server initialized: agent_name=%r, agent_version=%r",
agent_name,
agent_version,
)
# Authenticate if the server requires it. Some ACP servers
# (e.g. codex-acp) require an explicit authenticate call
# before session creation. We auto-detect the method from
# the env vars that are available to the process.
auth_methods = init_response.auth_methods or []
if auth_methods:
method_id = _select_auth_method(auth_methods, env)
if method_id is not None:
logger.info("Authenticating with ACP method: %s", method_id)
auth_kwargs: dict[str, Any] = {}
# gemini-cli: pass gateway baseUrl to route API calls
# through LiteLLM proxy. claude-agent-acp and codex-acp
# read their provider base URL from env vars directly.
if method_id == "gemini-api-key":
gemini_base_url = env.get("GEMINI_BASE_URL")
if gemini_base_url:
auth_kwargs["gateway"] = {"baseUrl": gemini_base_url}
await conn.authenticate(method_id=method_id, **auth_kwargs)
else:
logger.warning(
"ACP server offers auth methods %s but no matching "
"env var is set — session creation may fail",
[m.id for m in auth_methods],
)
# Build _meta content for session options (e.g. model selection).
# Extra kwargs to new_session() become the _meta dict in the
# JSON-RPC request — do NOT wrap in _meta= (that double-nests).
session_meta = _build_session_meta(agent_name, self.acp_model)
# Create a new session
response = await conn.new_session(cwd=working_dir, **session_meta)
session_id = response.session_id
await _maybe_set_session_model(
conn,
agent_name,
session_id,
self.acp_model,
)
# Resolve the permission mode to use. Different ACP servers
# use different mode IDs for the same concept (no-prompts):
# - claude-agent-acp → "bypassPermissions"
# - codex-acp → "full-access"
mode_id = self.acp_session_mode
if mode_id is None:
mode_id = _resolve_bypass_mode(agent_name)
logger.info("Setting ACP session mode: %s", mode_id)
await conn.set_session_mode(mode_id=mode_id, session_id=session_id)
return conn, process, filtered_reader, session_id, agent_name, agent_version
result = self._executor.run_async(_init)
(
self._conn,
self._process,
self._filtered_reader,
self._session_id,
self._agent_name,
self._agent_version,
) = result
self._working_dir = working_dir
@observe(name="acp_agent.step", ignore_inputs=["conversation", "on_event"])
def step(
self,
conversation: LocalConversation,
on_event: ConversationCallbackType,
on_token: ConversationTokenCallbackType | None = None,
) -> None:
"""Send the latest user message to the ACP server and emit the response."""
state = conversation.state
# Find the latest user message
user_message = None
for event in reversed(list(state.events)):
if isinstance(event, MessageEvent) and event.source == "user":
# Extract text from the message
for content in event.llm_message.content:
if isinstance(content, TextContent) and content.text.strip():
user_message = content.text
break
if user_message:
break
if user_message is None:
logger.warning("No user message found; finishing conversation")
state.execution_status = ConversationExecutionStatus.FINISHED
return
# Reset client accumulators and wire live callbacks. ``on_event``
# is fired from inside ``_OpenHandsACPBridge.session_update`` as
# tool-call notifications arrive, so consumers see ACPToolCallEvents
# streamed live instead of a single end-of-turn burst.
self._client.reset()
self._client.on_token = on_token
self._client.on_event = on_event
self._client.on_activity = self._on_activity
t0 = time.monotonic()
try:
async def _prompt() -> PromptResponse:
usage_sync = self._client.prepare_usage_sync(self._session_id or "")
response = await self._conn.prompt(
[text_block(user_message)],
self._session_id,
)
if self._client.get_turn_usage_update(self._session_id or "") is None:
try:
await asyncio.wait_for(
usage_sync.wait(), timeout=_USAGE_UPDATE_TIMEOUT
)
except TimeoutError:
logger.warning(
"UsageUpdate not received within %.1fs for session %s",
_USAGE_UPDATE_TIMEOUT,
self._session_id,
)
return response
# Send prompt to ACP server with retry logic for connection errors.
# Transient connection failures (network blips, server restarts) are
# retried to preserve session state and avoid losing progress.
logger.info(
"Sending ACP prompt (timeout=%.0fs, msg=%d chars)",
self.acp_prompt_timeout,
len(user_message),
)
response: PromptResponse | None = None
max_retries = _ACP_PROMPT_MAX_RETRIES
for attempt in range(max_retries + 1):
try:
response = self._executor.run_async(
_prompt, timeout=self.acp_prompt_timeout
)
break
except TimeoutError:
raise
except _RETRIABLE_CONNECTION_ERRORS as e: