-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathcopilot_cli_agent.py
More file actions
151 lines (121 loc) · 5.14 KB
/
copilot_cli_agent.py
File metadata and controls
151 lines (121 loc) · 5.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""
Copilot CLI agent implementation.
"""
import asyncio
import itertools
import json
import os
from dataclasses import dataclass
from pathlib import Path
from typing import ClassVar
from src.udbpy.fileutil import mkdtemp
from .agents import BaseAgent
from .assets import FRAMING_PROMPT, SYSTEM_PROMPT
from .output_utils import print_assistant_message
@dataclass
class CopilotCLIAgent(BaseAgent):
"""Copilot CLI agent implementation."""
_tempdir: Path | None = None
_resume: bool = False
name: ClassVar[str] = "copilot"
program_name: ClassVar[str] = "copilot"
display_name: ClassVar[str] = "Copilot CLI"
async def _handle_messages(self, stdout: asyncio.StreamReader) -> str:
"""
Handle streamed messages from Copilot until a final result, which is returned.
Copilot doesn't natively provide framing from its messages but we prompt to request a
particular format, which this function handles.
"""
result = ""
msg: list[str] = []
thinking = False
answering = False
async for line_bytes in stdout:
line = line_bytes.decode("utf-8").rstrip()
if self.log_level == "DEBUG":
print("Line:", line)
if "<thinking>" in line:
assert not thinking and not answering
thinking = True
elif "</thinking>" in line:
assert thinking and not answering
thinking = False
print_assistant_message("\n".join(msg))
msg = []
elif "<answer>" in line:
assert not thinking and not answering
answering = True
elif "</answer>" in line:
assert answering and not thinking
answering = False
result = "\n".join(msg)
elif thinking or answering:
msg.append(line)
assert not thinking and not answering
return result
async def ask(self, question: str, port: int, tools: list[str]) -> str:
"""
Pose a question to an external `copilot` program, supplying access to a UDB MCP server.
"""
if self.log_level == "DEBUG":
print(f"Connecting Copilot CLI to MCP server on port {port}")
copilot_config = {
"mcpServers": {
"UDB_Server": {"type": "sse", "url": f"http://localhost:{port}/sse", "tools": ["*"]}
}
}
# We run Copilot CLI with a temporary "home directory" so that we can apply a temporary MCP
# configuration and rely on "--resume" finding our previous session automatically.
if not self._tempdir:
self._tempdir = mkdtemp(prefix="udb_explain_copilot_home")
# We always need to re-create the MCP config as the dynamically allocated port may change
# between invocations of the tool.
config_dir = self._tempdir / ".copilot"
config_dir.mkdir(exist_ok=True)
(config_dir / "mcp-config.json").write_text(json.dumps(copilot_config) + "\n")
result = ""
copilot = None
# If Copilot hasn't answered any questions yet we prepend our prompt to the question.
if not self._resume:
prompt = "\n".join([FRAMING_PROMPT, SYSTEM_PROMPT, question])
else:
prompt = question
allowed_tools = ["UDB_Server", "shell(grep)", "shell(find)", "shell(cat)", "shell(xargs)"]
env_changes = {
"XDG_CONFIG_HOME": str(self._tempdir),
"XDG_STATE_HOME": str(self._tempdir),
}
if runtime_dir := os.environ.get("XDG_RUNTIME_DIR"):
# Pass through runtime state so it has auth access.
env_changes["XDG_RUNTIME_DIR"] = runtime_dir
try:
copilot = await asyncio.create_subprocess_exec(
str(self.agent_bin),
# We can resume unambiguously without specifying a session ID because we're using a
# temporary home directory for the state generated in this session.
*(["--resume"] if self._resume else []),
# Don't allow any tools that may write output (for now).
"--deny-tool",
"write",
*itertools.chain(*[("--allow-tool", t) for t in allowed_tools]),
"--model",
"claude-sonnet-4.5",
"-p",
prompt,
env=os.environ.copy().update(env_changes),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
assert copilot.stdin and copilot.stdout and copilot.stderr
result = await self._handle_messages(copilot.stdout)
finally:
if copilot and copilot.returncode is None:
copilot.terminate()
await copilot.wait()
if copilot and copilot.stderr:
stderr_bytes = await copilot.stderr.read()
if copilot.returncode and stderr_bytes:
print("Errors:\n", stderr_bytes.decode("utf-8"))
self._resume = True
return result