Skip to content
Merged
2 changes: 2 additions & 0 deletions libs/deepagents/deepagents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
from deepagents.middleware.filesystem import FilesystemMiddleware
from deepagents.middleware.memory import MemoryMiddleware
from deepagents.middleware.subagents import CompiledSubAgent, SubAgent, SubAgentMiddleware
from deepagents.permissions import FilesystemPermission

__all__ = [
"AsyncSubAgent",
"AsyncSubAgentMiddleware",
"CompiledSubAgent",
"FilesystemMiddleware",
"FilesystemPermission",
"MemoryMiddleware",
"SubAgent",
"SubAgentMiddleware",
Expand Down
25 changes: 25 additions & 0 deletions libs/deepagents/deepagents/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from deepagents.middleware.filesystem import FilesystemMiddleware
from deepagents.middleware.memory import MemoryMiddleware
from deepagents.middleware.patch_tool_calls import PatchToolCallsMiddleware
from deepagents.middleware.permissions import _PermissionMiddleware
from deepagents.middleware.skills import SkillsMiddleware
from deepagents.middleware.subagents import (
GENERAL_PURPOSE_SUBAGENT,
Expand All @@ -39,6 +40,7 @@
SubAgentMiddleware,
)
from deepagents.middleware.summarization import create_summarization_middleware
from deepagents.permissions import FilesystemPermission

BASE_AGENT_PROMPT = """You are a Deep Agent, an AI assistant that helps users accomplish tasks using tools. You respond with text and tool calls. The user can see your responses and tool outputs in real time.

Expand Down Expand Up @@ -114,6 +116,7 @@ def create_deep_agent( # noqa: C901, PLR0912, PLR0915 # Complex graph assembly
subagents: Sequence[SubAgent | CompiledSubAgent | AsyncSubAgent] | None = None,
skills: list[str] | None = None,
memory: list[str] | None = None,
permissions: list[FilesystemPermission] | None = None,
response_format: ResponseFormat[ResponseT] | type[ResponseT] | dict[str, Any] | None = None,
context_schema: type[ContextT] | None = None,
checkpointer: Checkpointer | None = None,
Expand Down Expand Up @@ -187,6 +190,7 @@ def create_deep_agent( # noqa: C901, PLR0912, PLR0915 # Complex graph assembly
- `AnthropicPromptCachingMiddleware`
- `MemoryMiddleware` (if `memory` is provided)
- `HumanInTheLoopMiddleware` (if `interrupt_on` is provided)
- `_PermissionMiddleware` (if permission rules are present, always last)
subagents: Subagent specs available to the main agent.

This collection supports three forms:
Expand Down Expand Up @@ -248,6 +252,17 @@ def create_deep_agent( # noqa: C901, PLR0912, PLR0915 # Complex graph assembly

For execution support, use a backend that
implements `SandboxBackendProtocol`.
permissions: List of ``FilesystemPermission`` rules for the main agent
and its subagents.

Rules are evaluated in declaration order; the first match wins.
If no rule matches, the call is allowed.

Subagents inherit these rules unless they specify their own
`permissions` field, which replaces the parent's rules entirely.

`_PermissionMiddleware` is appended last in the stack so it sees
all tools (including those injected by other middleware).
interrupt_on: Mapping of tool names to interrupt configs.

Pass to pause agent execution at specified tool calls for human
Expand Down Expand Up @@ -302,6 +317,8 @@ def create_deep_agent( # noqa: C901, PLR0912, PLR0915 # Complex graph assembly
# "ignore" silently skips cache-control header injection for non-Anthropic
# models, so this middleware can be added unconditionally.
gp_middleware.append(AnthropicPromptCachingMiddleware(unsupported_model_behavior="ignore"))
if permissions:
gp_middleware.append(_PermissionMiddleware(rules=permissions, backend=backend))
general_purpose_spec: SubAgent = { # ty: ignore[missing-typed-dict-key]
**GENERAL_PURPOSE_SUBAGENT,
"model": model,
Expand All @@ -327,6 +344,9 @@ def create_deep_agent( # noqa: C901, PLR0912, PLR0915 # Complex graph assembly
subagent_model = spec.get("model", model)
subagent_model = resolve_model(subagent_model)

# Resolve permissions: subagent's own rules take priority, else inherit parent's
subagent_permissions = spec.get("permissions", permissions)

# Build middleware: base stack + skills (if specified) + user's middleware
subagent_middleware: list[AgentMiddleware[Any, Any, Any]] = [
TodoListMiddleware(),
Expand All @@ -340,6 +360,8 @@ def create_deep_agent( # noqa: C901, PLR0912, PLR0915 # Complex graph assembly
subagent_middleware.extend(spec.get("middleware", []))
# "ignore" skips caching for non-Anthropic models (see comment above).
subagent_middleware.append(AnthropicPromptCachingMiddleware(unsupported_model_behavior="ignore"))
if subagent_permissions:
subagent_middleware.append(_PermissionMiddleware(rules=subagent_permissions, backend=backend))

subagent_interrupt_on = spec.get("interrupt_on", interrupt_on)

Expand Down Expand Up @@ -393,6 +415,9 @@ def create_deep_agent( # noqa: C901, PLR0912, PLR0915 # Complex graph assembly
deepagent_middleware.append(MemoryMiddleware(backend=backend, sources=memory))
if interrupt_on is not None:
deepagent_middleware.append(HumanInTheLoopMiddleware(interrupt_on=interrupt_on))
# _PermissionMiddleware must be last so it sees all tools from prior middleware
if permissions:
deepagent_middleware.append(_PermissionMiddleware(rules=permissions, backend=backend))

# Combine system_prompt with BASE_AGENT_PROMPT
if system_prompt is None:
Expand Down
88 changes: 63 additions & 25 deletions libs/deepagents/deepagents/middleware/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ class ExecuteSchema(BaseModel):
- execute: run a shell command in the sandbox (returns output and exit code)"""


def _supports_execution(backend: BackendProtocol) -> bool:
def supports_execution(backend: BackendProtocol) -> bool:
Comment thread
nick-hollon-lc marked this conversation as resolved.
"""Check if a backend supports command execution.

For CompositeBackend, checks if the default backend supports execution.
Expand Down Expand Up @@ -656,7 +656,7 @@ def _create_ls_tool(self) -> BaseTool:
def sync_ls(
runtime: ToolRuntime[None, FilesystemState],
path: Annotated[str, "Absolute path to the directory to list. Must be absolute, not relative."],
) -> str:
) -> ToolMessage | str:
"""Synchronous wrapper for ls tool."""
resolved_backend = self._get_backend(runtime)
try:
Expand All @@ -668,13 +668,17 @@ def sync_ls(
return f"Error: {ls_result.error}"
infos = ls_result.entries or []
paths = [fi.get("path", "") for fi in infos]
result = truncate_if_too_long(paths)
return str(result)
return ToolMessage(
content=str(truncate_if_too_long(paths)),
artifact=ls_result,
tool_call_id=runtime.tool_call_id,
name="ls",
)

async def async_ls(
runtime: ToolRuntime[None, FilesystemState],
path: Annotated[str, "Absolute path to the directory to list. Must be absolute, not relative."],
) -> str:
) -> ToolMessage | str:
"""Asynchronous wrapper for ls tool."""
resolved_backend = self._get_backend(runtime)
try:
Expand All @@ -686,8 +690,12 @@ async def async_ls(
return f"Error: {ls_result.error}"
infos = ls_result.entries or []
paths = [fi.get("path", "") for fi in infos]
result = truncate_if_too_long(paths)
return str(result)
return ToolMessage(
content=str(truncate_if_too_long(paths)),
artifact=ls_result,
tool_call_id=runtime.tool_call_id,
name="ls",
)

return StructuredTool.from_function(
name="ls",
Expand Down Expand Up @@ -773,7 +781,6 @@ def sync_read_file(
validated_path = validate_path(file_path)
except ValueError as e:
return f"Error: {e}"

read_result = resolved_backend.read(validated_path, offset=offset, limit=limit)
return _handle_read_result(read_result, validated_path, runtime.tool_call_id, offset, limit)

Expand All @@ -789,7 +796,6 @@ async def async_read_file(
validated_path = validate_path(file_path)
except ValueError as e:
return f"Error: {e}"

read_result = await resolved_backend.aread(validated_path, offset=offset, limit=limit)
return _handle_read_result(read_result, validated_path, runtime.tool_call_id, offset, limit)

Expand Down Expand Up @@ -817,6 +823,7 @@ def sync_write_file(
validated_path = validate_path(file_path)
except ValueError as e:
return f"Error: {e}"

res: WriteResult = resolved_backend.write(validated_path, content)
if res.error:
return res.error
Expand All @@ -833,6 +840,7 @@ async def async_write_file(
validated_path = validate_path(file_path)
except ValueError as e:
return f"Error: {e}"

res: WriteResult = await resolved_backend.awrite(validated_path, content)
if res.error:
return res.error
Expand Down Expand Up @@ -865,6 +873,7 @@ def sync_edit_file(
validated_path = validate_path(file_path)
except ValueError as e:
return f"Error: {e}"

res: EditResult = resolved_backend.edit(validated_path, old_string, new_string, replace_all=replace_all)
if res.error:
return res.error
Expand All @@ -884,6 +893,7 @@ async def async_edit_file(
validated_path = validate_path(file_path)
except ValueError as e:
return f"Error: {e}"

res: EditResult = await resolved_backend.aedit(validated_path, old_string, new_string, replace_all=replace_all)
if res.error:
return res.error
Expand All @@ -906,7 +916,7 @@ def sync_glob(
pattern: Annotated[str, "Glob pattern to match files (e.g., '**/*.py', '*.txt', '/subdir/**/*.md')."],
runtime: ToolRuntime[None, FilesystemState],
path: Annotated[str, "Base directory to search from. Defaults to root '/'."] = "/",
) -> str:
) -> ToolMessage | str:
"""Synchronous wrapper for glob tool."""
resolved_backend = self._get_backend(runtime)
try:
Expand All @@ -924,14 +934,18 @@ def sync_glob(
return f"Error: {glob_result.error}"
infos = glob_result.matches or []
paths = [fi.get("path", "") for fi in infos]
result = truncate_if_too_long(paths)
return str(result)
return ToolMessage(
content=str(truncate_if_too_long(paths)),
artifact=glob_result,
tool_call_id=runtime.tool_call_id,
name="glob",
)

async def async_glob(
pattern: Annotated[str, "Glob pattern to match files (e.g., '**/*.py', '*.txt', '/subdir/**/*.md')."],
runtime: ToolRuntime[None, FilesystemState],
path: Annotated[str, "Base directory to search from. Defaults to root '/'."] = "/",
) -> str:
) -> ToolMessage | str:
"""Asynchronous wrapper for glob tool."""
resolved_backend = self._get_backend(runtime)
try:
Expand All @@ -949,8 +963,12 @@ async def async_glob(
return f"Error: {glob_result.error}"
infos = glob_result.matches or []
paths = [fi.get("path", "") for fi in infos]
result = truncate_if_too_long(paths)
return str(result)
return ToolMessage(
content=str(truncate_if_too_long(paths)),
artifact=glob_result,
tool_call_id=runtime.tool_call_id,
name="glob",
)

return StructuredTool.from_function(
name="glob",
Expand All @@ -974,15 +992,25 @@ def sync_grep(
Literal["files_with_matches", "content", "count"],
"Output format: 'files_with_matches' (file paths only, default), 'content' (matching lines with context), 'count' (match counts per file).",
] = "files_with_matches",
) -> str:
) -> ToolMessage | str:
"""Synchronous wrapper for grep tool."""
if path is not None:
try:
path = validate_path(path)
except ValueError as e:
return f"Error: {e}"
resolved_backend = self._get_backend(runtime)
grep_result = resolved_backend.grep(pattern, path=path, glob=glob)
if grep_result.error:
return grep_result.error
matches = grep_result.matches or []
formatted = format_grep_matches(matches, output_mode)
return truncate_if_too_long(formatted)
return ToolMessage(
content=truncate_if_too_long(formatted),
artifact={"result": grep_result, "output_mode": output_mode},
tool_call_id=runtime.tool_call_id,
name="grep",
)

async def async_grep(
pattern: Annotated[str, "Text pattern to search for (literal string, not regex)."],
Expand All @@ -993,15 +1021,25 @@ async def async_grep(
Literal["files_with_matches", "content", "count"],
"Output format: 'files_with_matches' (file paths only, default), 'content' (matching lines with context), 'count' (match counts per file).",
] = "files_with_matches",
) -> str:
) -> ToolMessage | str:
"""Asynchronous wrapper for grep tool."""
if path is not None:
try:
path = validate_path(path)
except ValueError as e:
return f"Error: {e}"
resolved_backend = self._get_backend(runtime)
grep_result = await resolved_backend.agrep(pattern, path=path, glob=glob)
if grep_result.error:
return grep_result.error
matches = grep_result.matches or []
formatted = format_grep_matches(matches, output_mode)
return truncate_if_too_long(formatted)
return ToolMessage(
content=truncate_if_too_long(formatted),
artifact={"result": grep_result, "output_mode": output_mode},
tool_call_id=runtime.tool_call_id,
name="grep",
)

return StructuredTool.from_function(
name="grep",
Expand Down Expand Up @@ -1034,14 +1072,14 @@ def sync_execute( # noqa: PLR0911 - early returns for distinct error conditions
resolved_backend = self._get_backend(runtime)

# Runtime check - fail gracefully if not supported
if not _supports_execution(resolved_backend):
if not supports_execution(resolved_backend):
return (
"Error: Execution not available. This agent's backend "
"does not support command execution (SandboxBackendProtocol). "
"To use the execute tool, provide a backend that implements SandboxBackendProtocol."
)

# Safe cast: _supports_execution validates that execute()/aexecute() exist
# Safe cast: supports_execution validates that execute()/aexecute() exist
# (either SandboxBackendProtocol or CompositeBackend with sandbox default)
executable = cast("SandboxBackendProtocol", resolved_backend)
if timeout is not None and not execute_accepts_timeout(type(executable)):
Expand Down Expand Up @@ -1090,14 +1128,14 @@ async def async_execute( # noqa: PLR0911 - early returns for distinct error con
resolved_backend = self._get_backend(runtime)

# Runtime check - fail gracefully if not supported
if not _supports_execution(resolved_backend):
if not supports_execution(resolved_backend):
return (
"Error: Execution not available. This agent's backend "
"does not support command execution (SandboxBackendProtocol). "
"To use the execute tool, provide a backend that implements SandboxBackendProtocol."
)

# Safe cast: _supports_execution validates that execute()/aexecute() exist
# Safe cast: supports_execution validates that execute()/aexecute() exist
executable = cast("SandboxBackendProtocol", resolved_backend)
if timeout is not None and not execute_accepts_timeout(type(executable)):
return (
Expand Down Expand Up @@ -1166,7 +1204,7 @@ def wrap_model_call(
if has_execute_tool:
# Resolve backend to check execution support
backend = self._get_backend(request.runtime) # ty: ignore[invalid-argument-type]
backend_supports_execution = _supports_execution(backend)
backend_supports_execution = supports_execution(backend)

# If execute tool exists but backend doesn't support it, filter it out
if not backend_supports_execution:
Expand Down Expand Up @@ -1231,7 +1269,7 @@ async def awrap_model_call(
if has_execute_tool:
# Resolve backend to check execution support
backend = self._get_backend(request.runtime) # ty: ignore[invalid-argument-type]
backend_supports_execution = _supports_execution(backend)
backend_supports_execution = supports_execution(backend)

# If execute tool exists but backend doesn't support it, filter it out
if not backend_supports_execution:
Expand Down
Loading
Loading