Skip to content
Merged
2 changes: 2 additions & 0 deletions libs/deepagents/deepagents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
from deepagents.middleware.async_subagents import AsyncSubAgent, AsyncSubAgentMiddleware
from deepagents.middleware.filesystem import FilesystemMiddleware
from deepagents.middleware.memory import MemoryMiddleware
from deepagents.middleware.permissions import FilesystemPermission
from deepagents.middleware.subagents import CompiledSubAgent, SubAgent, SubAgentMiddleware

__all__ = [
"AsyncSubAgent",
"AsyncSubAgentMiddleware",
"CompiledSubAgent",
"FilesystemMiddleware",
"FilesystemPermission",
"MemoryMiddleware",
"SubAgent",
"SubAgentMiddleware",
Expand Down
24 changes: 24 additions & 0 deletions libs/deepagents/deepagents/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from deepagents.middleware.filesystem import FilesystemMiddleware
from deepagents.middleware.memory import MemoryMiddleware
from deepagents.middleware.patch_tool_calls import PatchToolCallsMiddleware
from deepagents.middleware.permissions import FilesystemPermission, _PermissionMiddleware
from deepagents.middleware.skills import SkillsMiddleware
from deepagents.middleware.subagents import (
GENERAL_PURPOSE_SUBAGENT,
Expand Down Expand Up @@ -114,6 +115,7 @@ def create_deep_agent( # noqa: C901, PLR0912, PLR0915 # Complex graph assembly
subagents: Sequence[SubAgent | CompiledSubAgent | AsyncSubAgent] | None = None,
skills: list[str] | None = None,
memory: list[str] | None = None,
permissions: list[FilesystemPermission] | None = None,
response_format: ResponseFormat[ResponseT] | type[ResponseT] | dict[str, Any] | None = None,
context_schema: type[ContextT] | None = None,
checkpointer: Checkpointer | None = None,
Expand Down Expand Up @@ -187,6 +189,7 @@ def create_deep_agent( # noqa: C901, PLR0912, PLR0915 # Complex graph assembly
- `AnthropicPromptCachingMiddleware`
- `MemoryMiddleware` (if `memory` is provided)
- `HumanInTheLoopMiddleware` (if `interrupt_on` is provided)
- `_PermissionMiddleware` (if permission rules are present, always last)
subagents: Subagent specs available to the main agent.

This collection supports three forms:
Expand Down Expand Up @@ -248,6 +251,17 @@ def create_deep_agent( # noqa: C901, PLR0912, PLR0915 # Complex graph assembly

For execution support, use a backend that
implements `SandboxBackendProtocol`.
permissions: List of ``FilesystemPermission`` rules for the main agent
and its subagents.

Rules are evaluated in declaration order; the first match wins.
If no rule matches, the call is allowed.

Subagents inherit these rules unless they specify their own
`permissions` field, which replaces the parent's rules entirely.

`_PermissionMiddleware` is appended last in the stack so it sees
all tools (including those injected by other middleware).
interrupt_on: Mapping of tool names to interrupt configs.

Pass to pause agent execution at specified tool calls for human
Expand Down Expand Up @@ -302,6 +316,8 @@ def create_deep_agent( # noqa: C901, PLR0912, PLR0915 # Complex graph assembly
# "ignore" silently skips cache-control header injection for non-Anthropic
# models, so this middleware can be added unconditionally.
gp_middleware.append(AnthropicPromptCachingMiddleware(unsupported_model_behavior="ignore"))
if permissions:
gp_middleware.append(_PermissionMiddleware(rules=permissions, backend=backend))
general_purpose_spec: SubAgent = { # ty: ignore[missing-typed-dict-key]
**GENERAL_PURPOSE_SUBAGENT,
"model": model,
Expand All @@ -327,6 +343,9 @@ def create_deep_agent( # noqa: C901, PLR0912, PLR0915 # Complex graph assembly
subagent_model = spec.get("model", model)
subagent_model = resolve_model(subagent_model)

# Resolve permissions: subagent's own rules take priority, else inherit parent's
subagent_permissions = spec.get("permissions", permissions)

# Build middleware: base stack + skills (if specified) + user's middleware
subagent_middleware: list[AgentMiddleware[Any, Any, Any]] = [
TodoListMiddleware(),
Expand All @@ -340,6 +359,8 @@ def create_deep_agent( # noqa: C901, PLR0912, PLR0915 # Complex graph assembly
subagent_middleware.extend(spec.get("middleware", []))
# "ignore" skips caching for non-Anthropic models (see comment above).
subagent_middleware.append(AnthropicPromptCachingMiddleware(unsupported_model_behavior="ignore"))
if subagent_permissions:
subagent_middleware.append(_PermissionMiddleware(rules=subagent_permissions, backend=backend))

subagent_interrupt_on = spec.get("interrupt_on", interrupt_on)

Expand Down Expand Up @@ -393,6 +414,9 @@ def create_deep_agent( # noqa: C901, PLR0912, PLR0915 # Complex graph assembly
deepagent_middleware.append(MemoryMiddleware(backend=backend, sources=memory))
if interrupt_on is not None:
deepagent_middleware.append(HumanInTheLoopMiddleware(interrupt_on=interrupt_on))
# _PermissionMiddleware must be last so it sees all tools from prior middleware
if permissions:
deepagent_middleware.append(_PermissionMiddleware(rules=permissions, backend=backend))

# Combine system_prompt with BASE_AGENT_PROMPT
if system_prompt is None:
Expand Down
2 changes: 2 additions & 0 deletions libs/deepagents/deepagents/middleware/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from deepagents.middleware.async_subagents import AsyncSubAgent, AsyncSubAgentMiddleware
from deepagents.middleware.filesystem import FilesystemMiddleware
from deepagents.middleware.memory import MemoryMiddleware
from deepagents.middleware.permissions import FilesystemPermission
from deepagents.middleware.skills import SkillsMiddleware
from deepagents.middleware.subagents import CompiledSubAgent, SubAgent, SubAgentMiddleware
from deepagents.middleware.summarization import (
Expand All @@ -63,6 +64,7 @@
"AsyncSubAgentMiddleware",
"CompiledSubAgent",
"FilesystemMiddleware",
"FilesystemPermission",
"MemoryMiddleware",
"SkillsMiddleware",
"SubAgent",
Expand Down
88 changes: 63 additions & 25 deletions libs/deepagents/deepagents/middleware/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ class ExecuteSchema(BaseModel):
- execute: run a shell command in the sandbox (returns output and exit code)"""


def _supports_execution(backend: BackendProtocol) -> bool:
def supports_execution(backend: BackendProtocol) -> bool:
"""Check if a backend supports command execution.

For CompositeBackend, checks if the default backend supports execution.
Expand Down Expand Up @@ -656,7 +656,7 @@ def _create_ls_tool(self) -> BaseTool:
def sync_ls(
runtime: ToolRuntime[None, FilesystemState],
path: Annotated[str, "Absolute path to the directory to list. Must be absolute, not relative."],
) -> str:
) -> ToolMessage | str:
"""Synchronous wrapper for ls tool."""
resolved_backend = self._get_backend(runtime)
try:
Expand All @@ -668,13 +668,17 @@ def sync_ls(
return f"Error: {ls_result.error}"
infos = ls_result.entries or []
paths = [fi.get("path", "") for fi in infos]
result = truncate_if_too_long(paths)
return str(result)
return ToolMessage(
content=str(truncate_if_too_long(paths)),
artifact=ls_result,
tool_call_id=runtime.tool_call_id,
name="ls",
)

async def async_ls(
runtime: ToolRuntime[None, FilesystemState],
path: Annotated[str, "Absolute path to the directory to list. Must be absolute, not relative."],
) -> str:
) -> ToolMessage | str:
"""Asynchronous wrapper for ls tool."""
resolved_backend = self._get_backend(runtime)
try:
Expand All @@ -686,8 +690,12 @@ async def async_ls(
return f"Error: {ls_result.error}"
infos = ls_result.entries or []
paths = [fi.get("path", "") for fi in infos]
result = truncate_if_too_long(paths)
return str(result)
return ToolMessage(
content=str(truncate_if_too_long(paths)),
artifact=ls_result,
tool_call_id=runtime.tool_call_id,
name="ls",
)

return StructuredTool.from_function(
name="ls",
Expand Down Expand Up @@ -773,7 +781,6 @@ def sync_read_file(
validated_path = validate_path(file_path)
except ValueError as e:
return f"Error: {e}"

read_result = resolved_backend.read(validated_path, offset=offset, limit=limit)
return _handle_read_result(read_result, validated_path, runtime.tool_call_id, offset, limit)

Expand All @@ -789,7 +796,6 @@ async def async_read_file(
validated_path = validate_path(file_path)
except ValueError as e:
return f"Error: {e}"

read_result = await resolved_backend.aread(validated_path, offset=offset, limit=limit)
return _handle_read_result(read_result, validated_path, runtime.tool_call_id, offset, limit)

Expand Down Expand Up @@ -817,6 +823,7 @@ def sync_write_file(
validated_path = validate_path(file_path)
except ValueError as e:
return f"Error: {e}"

res: WriteResult = resolved_backend.write(validated_path, content)
if res.error:
return res.error
Expand All @@ -833,6 +840,7 @@ async def async_write_file(
validated_path = validate_path(file_path)
except ValueError as e:
return f"Error: {e}"

res: WriteResult = await resolved_backend.awrite(validated_path, content)
if res.error:
return res.error
Expand Down Expand Up @@ -865,6 +873,7 @@ def sync_edit_file(
validated_path = validate_path(file_path)
except ValueError as e:
return f"Error: {e}"

res: EditResult = resolved_backend.edit(validated_path, old_string, new_string, replace_all=replace_all)
if res.error:
return res.error
Expand All @@ -884,6 +893,7 @@ async def async_edit_file(
validated_path = validate_path(file_path)
except ValueError as e:
return f"Error: {e}"

res: EditResult = await resolved_backend.aedit(validated_path, old_string, new_string, replace_all=replace_all)
if res.error:
return res.error
Expand All @@ -906,7 +916,7 @@ def sync_glob(
pattern: Annotated[str, "Glob pattern to match files (e.g., '**/*.py', '*.txt', '/subdir/**/*.md')."],
runtime: ToolRuntime[None, FilesystemState],
path: Annotated[str, "Base directory to search from. Defaults to root '/'."] = "/",
) -> str:
) -> ToolMessage | str:
"""Synchronous wrapper for glob tool."""
resolved_backend = self._get_backend(runtime)
try:
Expand All @@ -924,14 +934,18 @@ def sync_glob(
return f"Error: {glob_result.error}"
infos = glob_result.matches or []
paths = [fi.get("path", "") for fi in infos]
result = truncate_if_too_long(paths)
return str(result)
return ToolMessage(
content=str(truncate_if_too_long(paths)),
artifact=glob_result,
tool_call_id=runtime.tool_call_id,
name="glob",
)

async def async_glob(
pattern: Annotated[str, "Glob pattern to match files (e.g., '**/*.py', '*.txt', '/subdir/**/*.md')."],
runtime: ToolRuntime[None, FilesystemState],
path: Annotated[str, "Base directory to search from. Defaults to root '/'."] = "/",
) -> str:
) -> ToolMessage | str:
"""Asynchronous wrapper for glob tool."""
resolved_backend = self._get_backend(runtime)
try:
Expand All @@ -949,8 +963,12 @@ async def async_glob(
return f"Error: {glob_result.error}"
infos = glob_result.matches or []
paths = [fi.get("path", "") for fi in infos]
result = truncate_if_too_long(paths)
return str(result)
return ToolMessage(
content=str(truncate_if_too_long(paths)),
artifact=glob_result,
tool_call_id=runtime.tool_call_id,
name="glob",
)

return StructuredTool.from_function(
name="glob",
Expand All @@ -974,15 +992,25 @@ def sync_grep(
Literal["files_with_matches", "content", "count"],
"Output format: 'files_with_matches' (file paths only, default), 'content' (matching lines with context), 'count' (match counts per file).",
] = "files_with_matches",
) -> str:
) -> ToolMessage | str:
"""Synchronous wrapper for grep tool."""
if path is not None:
try:
path = validate_path(path)
except ValueError as e:
return f"Error: {e}"
resolved_backend = self._get_backend(runtime)
grep_result = resolved_backend.grep(pattern, path=path, glob=glob)
if grep_result.error:
return grep_result.error
matches = grep_result.matches or []
formatted = format_grep_matches(matches, output_mode)
return truncate_if_too_long(formatted)
return ToolMessage(
content=truncate_if_too_long(formatted),
artifact={"result": grep_result, "output_mode": output_mode},
tool_call_id=runtime.tool_call_id,
name="grep",
)

async def async_grep(
pattern: Annotated[str, "Text pattern to search for (literal string, not regex)."],
Expand All @@ -993,15 +1021,25 @@ async def async_grep(
Literal["files_with_matches", "content", "count"],
"Output format: 'files_with_matches' (file paths only, default), 'content' (matching lines with context), 'count' (match counts per file).",
] = "files_with_matches",
) -> str:
) -> ToolMessage | str:
"""Asynchronous wrapper for grep tool."""
if path is not None:
try:
path = validate_path(path)
except ValueError as e:
return f"Error: {e}"
resolved_backend = self._get_backend(runtime)
grep_result = await resolved_backend.agrep(pattern, path=path, glob=glob)
if grep_result.error:
return grep_result.error
matches = grep_result.matches or []
formatted = format_grep_matches(matches, output_mode)
return truncate_if_too_long(formatted)
return ToolMessage(
content=truncate_if_too_long(formatted),
artifact={"result": grep_result, "output_mode": output_mode},
tool_call_id=runtime.tool_call_id,
name="grep",
)

return StructuredTool.from_function(
name="grep",
Expand Down Expand Up @@ -1034,14 +1072,14 @@ def sync_execute( # noqa: PLR0911 - early returns for distinct error conditions
resolved_backend = self._get_backend(runtime)

# Runtime check - fail gracefully if not supported
if not _supports_execution(resolved_backend):
if not supports_execution(resolved_backend):
return (
"Error: Execution not available. This agent's backend "
"does not support command execution (SandboxBackendProtocol). "
"To use the execute tool, provide a backend that implements SandboxBackendProtocol."
)

# Safe cast: _supports_execution validates that execute()/aexecute() exist
# Safe cast: supports_execution validates that execute()/aexecute() exist
# (either SandboxBackendProtocol or CompositeBackend with sandbox default)
executable = cast("SandboxBackendProtocol", resolved_backend)
if timeout is not None and not execute_accepts_timeout(type(executable)):
Expand Down Expand Up @@ -1090,14 +1128,14 @@ async def async_execute( # noqa: PLR0911 - early returns for distinct error con
resolved_backend = self._get_backend(runtime)

# Runtime check - fail gracefully if not supported
if not _supports_execution(resolved_backend):
if not supports_execution(resolved_backend):
return (
"Error: Execution not available. This agent's backend "
"does not support command execution (SandboxBackendProtocol). "
"To use the execute tool, provide a backend that implements SandboxBackendProtocol."
)

# Safe cast: _supports_execution validates that execute()/aexecute() exist
# Safe cast: supports_execution validates that execute()/aexecute() exist
executable = cast("SandboxBackendProtocol", resolved_backend)
if timeout is not None and not execute_accepts_timeout(type(executable)):
return (
Expand Down Expand Up @@ -1166,7 +1204,7 @@ def wrap_model_call(
if has_execute_tool:
# Resolve backend to check execution support
backend = self._get_backend(request.runtime) # ty: ignore[invalid-argument-type]
backend_supports_execution = _supports_execution(backend)
backend_supports_execution = supports_execution(backend)

# If execute tool exists but backend doesn't support it, filter it out
if not backend_supports_execution:
Expand Down Expand Up @@ -1231,7 +1269,7 @@ async def awrap_model_call(
if has_execute_tool:
# Resolve backend to check execution support
backend = self._get_backend(request.runtime) # ty: ignore[invalid-argument-type]
backend_supports_execution = _supports_execution(backend)
backend_supports_execution = supports_execution(backend)

# If execute tool exists but backend doesn't support it, filter it out
if not backend_supports_execution:
Expand Down
Loading
Loading