-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.py
More file actions
1467 lines (1261 loc) · 56.5 KB
/
Copy pathclient.py
File metadata and controls
1467 lines (1261 loc) · 56.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Claude SDK Client Configuration
===============================
Functions for creating and configuring the Claude Agent SDK client.
All AI interactions should use `create_client()` to ensure consistent OAuth authentication
and proper tool/MCP configuration. For simple message calls without full agent sessions,
use `create_simple_client()` from `core.simple_client`.
The client factory now uses AGENT_CONFIGS from agents/tools_pkg/models.py as the
single source of truth for phase-aware tool and MCP server configuration.
Architecture Decision:
See ADR-001 (docs/architecture/adr/ADR-001-claude-agent-sdk.md) for rationale
on adopting the Claude Agent SDK for all AI interactions.
"""
from __future__ import annotations
import copy
import json
import logging
import os
import threading
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any
from core.platform import (
is_windows,
validate_cli_path,
)
logger = logging.getLogger(__name__)
# =============================================================================
# Async Event Loop Optimization
# =============================================================================
# uvloop provides significantly faster event loop implementation for asyncio.
# On Linux/macOS it can improve async performance by 2-4x. Windows uses proactor
# event loop which is already optimized, so we skip uvloop installation there.
if not is_windows():
try:
import uvloop
uvloop.install()
logger.debug("uvloop installed for improved async performance")
except ImportError:
logger.debug("uvloop not available, using default asyncio event loop")
except Exception as e:
logger.warning(f"Failed to install uvloop: {e}")
# =============================================================================
# Windows System Prompt Limits
# =============================================================================
# Windows CreateProcessW has a 32,768 character limit for the entire command line.
# When CLAUDE.md is very large and passed as --system-prompt, the command can exceed
# this limit, causing ERROR_FILE_NOT_FOUND. We cap CLAUDE.md content to stay safe.
# 20,000 chars leaves ~12KB headroom for CLI overhead (model, tools, MCP config, etc.)
WINDOWS_MAX_SYSTEM_PROMPT_CHARS = 20000
WINDOWS_TRUNCATION_MESSAGE = (
"\n\n[... CLAUDE.md truncated due to Windows command-line length limit ...]"
)
CLAUDE_MD_HEADER = "\n\n# Project Instructions (from CLAUDE.md)\n\n"
# =============================================================================
# Project Index Cache
# =============================================================================
# Caches project index and capabilities to avoid reloading on every create_client() call.
# This significantly reduces the time to create new agent sessions.
_PROJECT_INDEX_CACHE: dict[str, tuple[dict[str, Any], dict[str, bool], float]] = {}
_CACHE_TTL_SECONDS = 300 # 5 minute TTL
_CACHE_LOCK = threading.Lock() # Protects _PROJECT_INDEX_CACHE access
def _get_cached_project_data(
project_dir: Path,
) -> tuple[dict[str, Any], dict[str, bool]]:
"""
Get project index and capabilities with caching.
Args:
project_dir: Path to the project directory
Returns:
Tuple of (project_index, project_capabilities)
"""
key = str(project_dir.resolve())
now = time.time()
debug = os.environ.get("DEBUG", "").lower() in ("true", "1")
# Check cache with lock
with _CACHE_LOCK:
if key in _PROJECT_INDEX_CACHE:
cached_index, cached_capabilities, cached_time = _PROJECT_INDEX_CACHE[key]
cache_age = now - cached_time
if cache_age < _CACHE_TTL_SECONDS:
if debug:
print(
f"[ClientCache] Cache HIT for project index (age: {cache_age:.1f}s / TTL: {_CACHE_TTL_SECONDS}s)"
)
logger.debug(f"Using cached project index for {project_dir}")
# Return deep copies to prevent callers from corrupting the cache
return copy.deepcopy(cached_index), copy.deepcopy(cached_capabilities)
elif debug:
print(
f"[ClientCache] Cache EXPIRED for project index (age: {cache_age:.1f}s > TTL: {_CACHE_TTL_SECONDS}s)"
)
# Cache miss or expired - load fresh data (outside lock to avoid blocking)
load_start = time.time()
logger.debug(f"Loading project index for {project_dir}")
project_index = load_project_index(project_dir)
project_capabilities = detect_project_capabilities(project_index)
if debug:
load_duration = (time.time() - load_start) * 1000
print(
f"[ClientCache] Cache MISS - loaded project index in {load_duration:.1f}ms"
)
# Store in cache with lock - use double-checked locking pattern
# Re-check if another thread populated the cache while we were loading
with _CACHE_LOCK:
if key in _PROJECT_INDEX_CACHE:
cached_index, cached_capabilities, cached_time = _PROJECT_INDEX_CACHE[key]
cache_age = time.time() - cached_time
if cache_age < _CACHE_TTL_SECONDS:
# Another thread already cached valid data while we were loading
if debug:
print(
"[ClientCache] Cache was populated by another thread, using cached data"
)
# Return deep copies to prevent callers from corrupting the cache
return copy.deepcopy(cached_index), copy.deepcopy(cached_capabilities)
# Either no cache entry or it's expired - store our fresh data
_PROJECT_INDEX_CACHE[key] = (project_index, project_capabilities, time.time())
# Return the freshly loaded data (no need to copy since it's not from cache)
return project_index, project_capabilities
def invalidate_project_cache(project_dir: Path | None = None) -> None:
"""
Invalidate the project index cache.
Args:
project_dir: Specific project to invalidate, or None to clear all
"""
with _CACHE_LOCK:
if project_dir is None:
_PROJECT_INDEX_CACHE.clear()
logger.debug("Cleared all project index cache entries")
else:
key = str(project_dir.resolve())
if key in _PROJECT_INDEX_CACHE:
del _PROJECT_INDEX_CACHE[key]
logger.debug(f"Invalidated project index cache for {project_dir}")
if TYPE_CHECKING:
from agents.templates.models import AgentTemplate
from agents.tools_pkg import (
CONTEXT7_TOOLS,
ELECTRON_TOOLS,
GRAPHITI_MCP_TOOLS,
LINEAR_TOOLS,
PUPPETEER_TOOLS,
create_auto_claude_mcp_server,
get_allowed_tools,
get_required_mcp_servers,
is_tools_available,
)
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
from claude_agent_sdk.types import HookMatcher
from core.auth import (
get_sdk_env_vars,
require_auth_token,
validate_token_not_encrypted,
)
from core.providers.config import get_provider_config
from enterprise.data_residency import get_data_residency_config
from linear_updater import is_linear_enabled
from prompts_pkg.project_context import detect_project_capabilities, load_project_index
from security import bash_security_hook
def _validate_custom_mcp_server(server: dict) -> bool:
"""
Validate a custom MCP server configuration for security.
Ensures only expected fields with valid types are present.
Rejects configurations that could lead to command injection.
Args:
server: Dict representing a custom MCP server configuration
Returns:
True if valid, False otherwise
"""
if not isinstance(server, dict):
return False
# Required fields
required_fields = {"id", "name", "type"}
if not all(field in server for field in required_fields):
logger.warning(
f"Custom MCP server missing required fields: {required_fields - server.keys()}"
)
return False
# Validate field types
if not isinstance(server.get("id"), str) or not server["id"]:
return False
if not isinstance(server.get("name"), str) or not server["name"]:
return False
# FIX: Changed from ('command', 'url') to ('command', 'http') to match actual usage
if server.get("type") not in ("command", "http"):
logger.warning(f"Invalid MCP server type: {server.get('type')}")
return False
# Allowlist of safe executable commands for MCP servers
# Only allow known package managers and interpreters - NO shell commands
SAFE_COMMANDS = {
"npx",
"npm",
"node",
"python",
"python3",
"uv",
"uvx",
}
# Blocklist of dangerous shell commands that should never be allowed
DANGEROUS_COMMANDS = {
"bash",
"sh",
"cmd",
"powershell",
"pwsh", # PowerShell Core
"/bin/bash",
"/bin/sh",
"/bin/zsh",
"/usr/bin/bash",
"/usr/bin/sh",
"zsh",
"fish",
}
# Dangerous interpreter flags that allow arbitrary code execution
# Covers Python (-e, -c, -m, -p), Node.js (--eval, --print, loaders), and general
DANGEROUS_FLAGS = {
"--eval",
"-e",
"-c",
"--exec",
"-m", # Python module execution
"-p", # Python eval+print
"--print", # Node.js print
"--input-type=module", # Node.js ES module mode
"--experimental-loader", # Node.js custom loaders
"--require", # Node.js require injection
"-r", # Node.js require shorthand
}
# Type-specific validation
if server["type"] == "command":
if not isinstance(server.get("command"), str) or not server["command"]:
logger.warning("Command-type MCP server missing 'command' field")
return False
# SECURITY FIX: Validate command is in safe list and not in dangerous list
command = server.get("command", "")
# Reject paths - commands must be bare names only (no / or \)
# This prevents path traversal like '/custom/malicious' or './evil'
if "/" in command or "\\" in command:
logger.warning(
f"Rejected command with path in MCP server: {command}. "
f"Commands must be bare names without path separators."
)
return False
if command in DANGEROUS_COMMANDS:
logger.warning(
f"Rejected dangerous command in MCP server: {command}. "
f"Shell commands are not allowed for security reasons."
)
return False
if command not in SAFE_COMMANDS:
logger.warning(
f"Rejected unknown command in MCP server: {command}. "
f"Only allowed commands: {', '.join(sorted(SAFE_COMMANDS))}"
)
return False
# Validate args is a list of strings if present
if "args" in server:
if not isinstance(server["args"], list):
return False
if not all(isinstance(arg, str) for arg in server["args"]):
return False
# Check for dangerous interpreter flags that allow code execution
for arg in server["args"]:
if arg in DANGEROUS_FLAGS:
logger.warning(
f"Rejected dangerous flag '{arg}' in MCP server args. "
f"Interpreter code execution flags are not allowed."
)
return False
elif server["type"] == "http":
if not isinstance(server.get("url"), str) or not server["url"]:
logger.warning("HTTP-type MCP server missing 'url' field")
return False
# Validate headers is a dict of strings if present
if "headers" in server:
if not isinstance(server["headers"], dict):
return False
if not all(
isinstance(k, str) and isinstance(v, str)
for k, v in server["headers"].items()
):
return False
# Optional description must be string if present
if "description" in server and not isinstance(server.get("description"), str):
return False
# Optional cached tools/list payload must contain safe, serializable metadata.
if "tools" in server:
tools = server["tools"]
if not isinstance(tools, list):
return False
for tool in tools:
if not isinstance(tool, dict):
return False
if not isinstance(tool.get("name"), str) or not tool["name"]:
return False
if "description" in tool and not isinstance(tool["description"], str):
return False
schema = (
tool.get("inputSchema")
or tool.get("input_schema")
or tool.get("parameters")
)
if schema is not None and not isinstance(schema, dict):
return False
# Reject any unexpected fields that could be exploited
allowed_fields = {
"id",
"name",
"type",
"command",
"args",
"url",
"headers",
"description",
"tools",
}
unexpected_fields = set(server.keys()) - allowed_fields
if unexpected_fields:
logger.warning(f"Custom MCP server has unexpected fields: {unexpected_fields}")
return False
return True
def load_project_mcp_config(project_dir: Path) -> dict:
"""
Load MCP configuration from project's .auto-claude/.env file.
Returns a dict of MCP-related env vars:
- CONTEXT7_ENABLED (default: true)
- LINEAR_MCP_ENABLED (default: true)
- ELECTRON_MCP_ENABLED (default: false)
- PUPPETEER_MCP_ENABLED (default: false)
- AGENT_MCP_<agent>_ADD (per-agent MCP additions)
- AGENT_MCP_<agent>_REMOVE (per-agent MCP removals)
- CUSTOM_MCP_SERVERS (JSON array or object map of custom server configs)
Args:
project_dir: Path to the project directory
Returns:
Dict of MCP configuration values (string values, except CUSTOM_MCP_SERVERS which is parsed JSON)
"""
env_path = project_dir / ".auto-claude" / ".env"
if not env_path.exists():
return {}
config = {}
mcp_keys = {
"CONTEXT7_ENABLED",
"LINEAR_MCP_ENABLED",
"ELECTRON_MCP_ENABLED",
"PUPPETEER_MCP_ENABLED",
}
try:
with open(env_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip("\"'")
# Include global MCP toggles
if key in mcp_keys:
config[key] = value
# Include per-agent MCP overrides (AGENT_MCP_<agent>_ADD/REMOVE)
elif key.startswith("AGENT_MCP_"):
config[key] = value
# Include custom MCP servers (parse JSON with schema validation)
elif key == "CUSTOM_MCP_SERVERS":
try:
parsed = json.loads(value)
if isinstance(parsed, dict):
parsed_servers = [
{
**server,
"id": server.get("id") or str(server_id),
}
for server_id, server in parsed.items()
if isinstance(server, dict)
]
elif isinstance(parsed, list):
parsed_servers = parsed
else:
logger.warning(
"CUSTOM_MCP_SERVERS must be a JSON array or object"
)
config["CUSTOM_MCP_SERVERS"] = []
continue
if len(parsed_servers) != len(parsed):
logger.warning(
"CUSTOM_MCP_SERVERS contains non-object entries"
)
if parsed_servers:
# Validate each server and filter out invalid ones
valid_servers = []
for i, server in enumerate(parsed_servers):
if _validate_custom_mcp_server(server):
valid_servers.append(server)
else:
logger.warning(
f"Skipping invalid custom MCP server at index {i}"
)
config["CUSTOM_MCP_SERVERS"] = valid_servers
else:
config["CUSTOM_MCP_SERVERS"] = []
except json.JSONDecodeError:
logger.warning(
f"Failed to parse CUSTOM_MCP_SERVERS JSON: {value}"
)
config["CUSTOM_MCP_SERVERS"] = []
except Exception as e:
logger.debug(f"Failed to load project MCP config from {env_path}: {e}")
return config
def is_graphiti_mcp_enabled() -> bool:
"""
Check if Graphiti MCP server integration is enabled.
Requires GRAPHITI_MCP_URL to be set (e.g., http://localhost:8000/mcp/)
This is separate from GRAPHITI_ENABLED which controls the Python library integration.
"""
return bool(os.environ.get("GRAPHITI_MCP_URL"))
def get_graphiti_mcp_url() -> str:
"""Get the Graphiti MCP server URL."""
return os.environ.get("GRAPHITI_MCP_URL", "http://localhost:8000/mcp/")
def is_electron_mcp_enabled() -> bool:
"""
Check if Electron MCP server integration is enabled.
Requires ELECTRON_MCP_ENABLED to be set to 'true'.
When enabled, QA agents can use MCP tools to connect to Electron apps.
"""
return os.environ.get("ELECTRON_MCP_ENABLED", "").lower() == "true"
_VALID_ELECTRON_MCP_MODES: tuple[str, ...] = ("cdp", "embedded")
_VALID_ELECTRON_MCP_LOG_LEVELS: tuple[str, ...] = ("debug", "info", "warn", "error")
def get_electron_mcp_mode() -> str:
"""
Get the Electron MCP server mode.
Returns:
"embedded" - MCP server runs inside Electron process (stdio transport)
"cdp" - External CDP-based server (electron-mcp-server package)
Default: "cdp" for backward compatibility
"""
mode = os.environ.get("ELECTRON_MCP_MODE", "cdp").lower()
if mode not in _VALID_ELECTRON_MCP_MODES:
logger.warning(
"Invalid ELECTRON_MCP_MODE '%s'. Valid values: %s. Using default: cdp",
mode,
", ".join(_VALID_ELECTRON_MCP_MODES),
)
return "cdp"
return mode
def get_electron_debug_port() -> int:
"""
Get the Electron remote debugging port (default: 9222).
Returns:
Port number for Chrome DevTools Protocol
Raises:
ValueError: If port is not a valid number or out of range
"""
port_str = os.environ.get("ELECTRON_DEBUG_PORT", "9222")
try:
port = int(port_str)
except ValueError:
raise ValueError(
f"Invalid ELECTRON_DEBUG_PORT: '{port_str}'. Must be a number."
)
if not (1024 <= port <= 65535):
raise ValueError(
f"Invalid ELECTRON_DEBUG_PORT: {port}. Must be between 1024 and 65535."
)
return port
def get_electron_mcp_log_level() -> str:
"""
Get the Electron MCP server log level.
Returns:
Log level: "debug", "info", "warn", or "error"
Default: "info"
"""
level = os.environ.get("ELECTRON_MCP_LOG_LEVEL", "info").lower()
if level not in _VALID_ELECTRON_MCP_LOG_LEVELS:
logger.warning(
"Invalid ELECTRON_MCP_LOG_LEVEL '%s'. Valid values: %s. Using default: info",
level,
", ".join(_VALID_ELECTRON_MCP_LOG_LEVELS),
)
return "info"
return level
def is_actor_critic_mcp_enabled() -> bool:
"""
Check if Actor-Critic MCP server integration is enabled and available.
Delegates to actor_critic_config.is_actor_critic_enabled() which checks
both the ACTOR_CRITIC_MCP_ENABLED env var AND npx availability.
"""
from core.actor_critic_config import is_actor_critic_enabled
return is_actor_critic_enabled()
def should_use_claude_md() -> bool:
"""Check if CLAUDE.md instructions should be included in system prompt."""
return os.environ.get("USE_CLAUDE_MD", "").lower() == "true"
def load_claude_md(project_dir: Path) -> str | None:
"""
Load CLAUDE.md content from project root if it exists.
Args:
project_dir: Root directory of the project
Returns:
Content of CLAUDE.md if found, None otherwise
"""
claude_md_path = project_dir / "CLAUDE.md"
if claude_md_path.exists():
try:
return claude_md_path.read_text(encoding="utf-8")
except Exception:
return None
return None
def load_plugin_mcp_integrations(
project_dir: Path,
spec_dir: Path,
) -> tuple[dict[str, Any], list[str]]:
"""
Load MCP servers and allowed tool names from enabled integration plugins.
Queries the PluginRegistry for enabled integration plugins and creates
MCP servers from their tools. This allows third-party plugins to extend
Auto Code with custom integrations.
Args:
project_dir: Root directory of the project
spec_dir: Directory containing the current spec
Returns:
Tuple of:
- Dictionary mapping integration server IDs to MCP server instances
- Allowed tool names for those server tools
Example: ({"my-plugin-integration": <server>}, ["mcp__..."])
"""
try:
from claude_agent_sdk import create_sdk_mcp_server
from plugins.base import PluginType
from plugins.registry import PluginRegistry
from plugins.sdk.integration import IntegrationContext, IntegrationPlugin
except ImportError:
logger.debug("Plugin system not available")
return {}, []
plugin_servers = {}
allowed_tools = []
try:
# Get singleton registry instance
registry = PluginRegistry.get_instance()
if not registry.list_plugins():
registry.load_all_plugins()
# Get all enabled integration plugins
integration_plugins = registry.list_plugins(
plugin_type=PluginType.INTEGRATION,
enabled_only=True,
)
logger.debug(f"Found {len(integration_plugins)} enabled integration plugin(s)")
# Create MCP server for each enabled plugin
for plugin in integration_plugins:
if not isinstance(plugin, IntegrationPlugin):
logger.warning(
f"Plugin {plugin.name} is not an IntegrationPlugin, skipping"
)
continue
# Check if plugin is available (has valid config, connectivity, etc.)
if not plugin.is_available():
logger.debug(
f"Integration plugin {plugin.name} is not available, skipping"
)
continue
# Create integration context
context = IntegrationContext(
project_dir=project_dir,
spec_dir=spec_dir,
)
# Create MCP server from plugin
try:
tools = plugin.create_mcp_tools(context)
if not tools:
logger.debug(f"Plugin {plugin.name} returned no MCP tools")
continue
server_name = f"{plugin.name}-integration"
mcp_server = create_sdk_mcp_server(
name=server_name,
version=plugin.version,
tools=tools,
)
if mcp_server:
plugin_servers[server_name] = mcp_server
allowed_tools.extend(
f"mcp__{server_name}__{tool.__name__}" for tool in tools
)
logger.info(f"Loaded MCP server from plugin: {plugin.name}")
else:
logger.debug(f"Plugin {plugin.name} returned no MCP server")
except Exception as e:
logger.error(
f"Failed to create MCP server for plugin {plugin.name}: {e}"
)
continue
except Exception as e:
logger.error(f"Error loading plugin MCP servers: {e}")
return plugin_servers, sorted(set(allowed_tools))
def load_plugin_mcp_servers(project_dir: Path, spec_dir: Path) -> dict[str, Any]:
"""
Load MCP servers from enabled integration plugins.
Kept as the server-only compatibility wrapper for callers that do not need
explicit allowed tool names.
"""
plugin_servers, _allowed_tools = load_plugin_mcp_integrations(
project_dir,
spec_dir,
)
return plugin_servers
def load_preferences(
base_prompt: str,
spec_dir: Path,
project_dir: Path,
) -> str:
"""
Load user preference profile and adapt the system prompt accordingly.
This function retrieves the user's preference profile from Graphiti memory
and applies adaptive behavior instructions to the prompt based on learned
patterns and explicit user settings.
Args:
base_prompt: Original system prompt
spec_dir: Directory containing the spec
project_dir: Project root directory
Returns:
Modified prompt with adaptive instructions, or original if no preferences found
"""
try:
# Import here to avoid circular dependencies
from agents.preferences import PreferenceProfile, modify_prompt_for_preferences
from integrations.graphiti.memory import (
get_graphiti_memory,
is_graphiti_enabled,
)
# Only load preferences if Graphiti is enabled
if not is_graphiti_enabled():
logger.debug("Graphiti not enabled, skipping preference loading")
print(" - User preferences: Graphiti not enabled")
return base_prompt
# Get preference profile from Graphiti memory
memory = get_graphiti_memory(spec_dir, project_dir)
# Run async operation in sync context.
# If a loop is already running (e.g. inside Ideation async process),
# delegate to a worker thread that owns its own event loop.
import asyncio
import concurrent.futures
try:
asyncio.get_running_loop()
# Already inside an async context — run in a separate thread
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(asyncio.run, memory.get_preference_profile())
profile_data = future.result(timeout=30)
except RuntimeError:
# No running loop — safe to create one
profile_data = asyncio.run(memory.get_preference_profile())
if not profile_data:
logger.debug("No preference profile found, using defaults")
print(" - User preferences: No profile found, using defaults")
return base_prompt
# Convert dict to PreferenceProfile object
profile = PreferenceProfile.from_dict(profile_data)
# Apply preferences to prompt
modified_prompt = modify_prompt_for_preferences(base_prompt, profile)
# Log preference application
verbosity = profile.get_effective_verbosity().value
risk = profile.get_effective_risk_tolerance().value
acceptance_rate = profile.get_feedback_acceptance_rate()
logger.info(
f"Applied user preferences: verbosity={verbosity}, risk={risk}, "
f"acceptance_rate={acceptance_rate:.1%}"
)
print(
f" - User preferences: Applied (verbosity={verbosity}, risk={risk}, "
f"feedback={len(profile.feedback_history)} records)"
)
return modified_prompt
except ImportError as e:
logger.debug(f"Preference modules not available: {e}")
print(" - User preferences: Modules not available")
return base_prompt
except Exception as e:
logger.warning(f"Failed to load preferences: {e}")
print(f" - User preferences: Failed to load ({type(e).__name__})")
return base_prompt
def create_client(
project_dir: Path,
spec_dir: Path,
model: str,
agent_type: str = "coder",
max_thinking_tokens: int | None = None,
output_format: dict | None = None,
agents: dict | None = None,
session_config: Any | None = None,
custom_template: AgentTemplate | None = None,
runtime_metadata: dict | None = None,
) -> ClaudeSDKClient:
"""
Create a Claude Agent SDK client with multi-layered security.
Uses AGENT_CONFIGS for phase-aware tool and MCP server configuration.
Only starts MCP servers that the agent actually needs, reducing context
window bloat and startup latency.
**NOTE:** This function creates Claude-specific clients only. For other
AI providers (OpenAI, Google Gemini, Ollama, etc.), use the provider factory:
`create_engine_provider()` from `core.providers.factory`.
Args:
project_dir: Root directory for the project (working directory)
spec_dir: Directory containing the spec (for settings file)
model: Claude model to use
agent_type: Agent type identifier from AGENT_CONFIGS
(e.g., 'coder', 'planner', 'qa_reviewer', 'spec_gatherer')
max_thinking_tokens: Token budget for extended thinking (None = disabled)
- ultrathink: 16000 (spec creation)
- high: 10000 (QA review)
- medium: 5000 (planning, validation)
- None: disabled (coding)
output_format: Optional structured output format for validated JSON responses.
Use {"type": "json_schema", "schema": Model.model_json_schema()}
See: https://platform.claude.com/docs/en/agent-sdk/structured-outputs
agents: Optional dict of subagent definitions for SDK parallel execution.
Format: {"agent-name": {"description": "...", "prompt": "...",
"tools": [...], "model": "inherit"}}
See: https://platform.claude.com/docs/en/agent-sdk/subagents
session_config: Optional SessionConfig with provider/model overrides.
If provided, checks for provider override before using defaults.
Used for runtime provider selection (e.g., --provider zhipuai).
custom_template: Optional custom agent template with user-defined prompts,
tools, and MCP server configuration. When provided, overrides
default agent_type configuration from AGENT_CONFIGS.
runtime_metadata: Optional task/file metadata made available to plugin runtime
prompt and tool hooks.
Returns:
Configured ClaudeSDKClient
Raises:
ValueError: If agent_type is not found in AGENT_CONFIGS or if custom_template
validation fails
Security layers (defense in depth):
1. Sandbox - OS-level bash command isolation prevents filesystem escape
2. Permissions - File operations restricted to project_dir only
3. Security hooks - Bash commands validated against an allowlist
(see security.py for ALLOWED_COMMANDS)
4. Tool filtering - Each agent type only sees relevant tools (prevents misuse)
"""
if runtime_metadata is not None and not isinstance(runtime_metadata, dict):
raise TypeError("runtime_metadata must be a dict when provided")
# Check configured AI provider and log it
provider_config = get_provider_config()
configured_provider = provider_config.provider
provider_summary = provider_config.get_provider_summary()
# Log provider information
logger.info(f"AI Engine Provider: {provider_summary}")
print(f"AI Engine Provider: {provider_summary}")
# Warn if non-Claude provider is configured
if configured_provider != "claude":
logger.warning(
f"Non-Claude provider configured ({configured_provider}), but create_client() "
f"only supports Claude Agent SDK. For {configured_provider}, use create_engine_provider() "
f"from core.providers.factory instead."
)
print(
f"⚠️ Note: create_client() is Claude-specific. "
f"Configured provider is '{configured_provider}'. "
f"Proceeding with Claude Agent SDK."
)
# Get OAuth token - Claude CLI handles token lifecycle internally
oauth_token = require_auth_token()
# Validate token is not encrypted before passing to SDK
# Encrypted tokens (enc:...) should have been decrypted by require_auth_token()
# If we still have an encrypted token here, it means decryption failed or was skipped
validate_token_not_encrypted(oauth_token)
# Ensure SDK can access it via its expected env var
os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = oauth_token
# Check for provider override from SessionConfig
# This enables runtime provider selection (e.g., --provider zhipuai)
# When provider override is present, the caller should use the provider
# abstraction layer instead of this Claude SDK client
if session_config is not None:
if hasattr(session_config, "provider") and session_config.provider:
if session_config.provider != "claude":
raise ValueError(
f"SessionConfig provider override detected: {session_config.provider}. "
f"create_client() only creates Claude SDK clients. "
f"For alternative providers, use create_engine_provider() instead."
)
# Apply model override from SessionConfig if present
if (
session_config is not None
and hasattr(session_config, "model")
and session_config.model
):
if session_config.model != model:
logger.info(
f"SessionConfig model override: {session_config.model} "
f"(parameter model: {model})"
)
model = session_config.model
# Collect env vars to pass to SDK (ANTHROPIC_BASE_URL, etc.)
sdk_env = get_sdk_env_vars()
# Configure data residency (regional API endpoints)
data_residency_config = get_data_residency_config()
regional_endpoint = data_residency_config.get_endpoint()
# Override ANTHROPIC_BASE_URL if custom regional endpoint is configured
if data_residency_config.custom_endpoint:
sdk_env["ANTHROPIC_BASE_URL"] = regional_endpoint
logger.info(
f"Data residency: Using custom endpoint for region {data_residency_config.region}: {regional_endpoint}"
)
# Debug: Log git-bash path detection on Windows
if "CLAUDE_CODE_GIT_BASH_PATH" in sdk_env:
logger.info(f"Git Bash path found: {sdk_env['CLAUDE_CODE_GIT_BASH_PATH']}")
elif is_windows():
logger.warning("Git Bash path not detected on Windows!")
# Check if Linear integration is enabled
linear_enabled = is_linear_enabled()
linear_api_key = os.environ.get("LINEAR_API_KEY", "")
# Check if custom auto-claude tools are available
auto_claude_tools_enabled = is_tools_available()
# Load project capabilities for dynamic MCP tool selection
# This enables context-aware tool injection based on project type
# Uses caching to avoid reloading on every create_client() call
project_index, project_capabilities = _get_cached_project_data(project_dir)
# Load per-project MCP configuration from .auto-claude/.env
mcp_config = load_project_mcp_config(project_dir)
# Handle custom template configuration
# Custom templates override AGENT_CONFIGS for tools, MCP servers, and thinking level
if custom_template:
# Validate custom template before using it
from agents.templates.validator import validate_template
is_valid, errors = validate_template(custom_template)
if not is_valid:
raise ValueError(f"Custom template validation failed: {'; '.join(errors)}")
# Use template's tool configuration
allowed_tools_list = list(custom_template.tools or [])
# Use template's MCP server configuration
mcp_servers_raw = custom_template.mcp_servers or []
required_servers = list(mcp_servers_raw)
# Override max_thinking_tokens based on template's thinking level if not explicitly set
if max_thinking_tokens is None:
thinking_level_tokens = {
"none": None,
"low": 2000,
"medium": 5000,
"high": 10000,