Skip to content

Commit cbc9437

Browse files
feat(mcp): Implement decorator-based safe mode filtering for Cloud ops tools (#842)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 9f34de3 commit cbc9437

File tree

6 files changed

+319
-267
lines changed

6 files changed

+319
-267
lines changed

airbyte/mcp/__init__.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@
8585
"airbyte-mcp"
8686
],
8787
"env": {
88-
"AIRBYTE_MCP_ENV_FILE": "/path/to/my/.mcp/airbyte_mcp.env"
88+
"AIRBYTE_MCP_ENV_FILE": "/path/to/my/.mcp/airbyte_mcp.env",
89+
"AIRBYTE_CLOUD_MCP_READONLY_MODE": "0",
90+
"AIRBYTE_CLOUD_MCP_SAFE_MODE": "0"
8991
}
9092
}
9193
}
@@ -95,6 +97,15 @@
9597
Note:
9698
- Replace `/path/to/my/.mcp/airbyte_mcp.env` with the absolute path to your dotenv file created in
9799
Step 1.
100+
- The `AIRBYTE_CLOUD_MCP_READONLY_MODE` and `AIRBYTE_CLOUD_MCP_SAFE_MODE` environment variables
101+
control safe mode filtering for Airbyte Cloud operations:
102+
- `AIRBYTE_CLOUD_MCP_READONLY_MODE=1`: Only read-only Cloud tools are available. Write and
103+
destructive operations are disabled. Note: This mode does allow running syncs on existing
104+
connectors.
105+
- `AIRBYTE_CLOUD_MCP_SAFE_MODE=1`: Write operations are allowed, but destructive operations
106+
(update, delete) are disabled.
107+
- Both default to `0` (disabled), which means no restrictions are applied.
108+
- These settings only affect Cloud operations; local operations are never restricted.
98109
99110
### Step 3: Testing the MCP Server Connection
100111

airbyte/mcp/_tool_utils.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
"""MCP tool utility functions.
3+
4+
This module provides a decorator to tag tool functions with MCP annotations
5+
for deferred registration with safe mode filtering.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import os
11+
from collections.abc import Callable
12+
from typing import Any, Literal, TypeVar
13+
14+
from airbyte.mcp._annotations import (
15+
DESTRUCTIVE_HINT,
16+
IDEMPOTENT_HINT,
17+
OPEN_WORLD_HINT,
18+
READ_ONLY_HINT,
19+
)
20+
21+
22+
F = TypeVar("F", bound=Callable[..., Any])
23+
24+
AIRBYTE_CLOUD_MCP_READONLY_MODE = (
25+
os.environ.get("AIRBYTE_CLOUD_MCP_READONLY_MODE", "").strip() == "1"
26+
)
27+
AIRBYTE_CLOUD_MCP_SAFE_MODE = os.environ.get("AIRBYTE_CLOUD_MCP_SAFE_MODE", "").strip() == "1"
28+
29+
_REGISTERED_TOOLS: list[tuple[Callable[..., Any], dict[str, Any]]] = []
30+
31+
32+
class SafeModeError(Exception):
33+
"""Raised when a tool is blocked by safe mode restrictions."""
34+
35+
pass
36+
37+
38+
def should_register_tool(annotations: dict[str, Any]) -> bool:
39+
"""Check if a tool should be registered based on safe mode settings.
40+
41+
Args:
42+
annotations: Tool annotations dict containing domain, readOnlyHint, and destructiveHint
43+
44+
Returns:
45+
True if the tool should be registered, False if it should be filtered out
46+
"""
47+
if annotations.get("domain") != "cloud":
48+
return True
49+
50+
if not AIRBYTE_CLOUD_MCP_READONLY_MODE and not AIRBYTE_CLOUD_MCP_SAFE_MODE:
51+
return True
52+
53+
if AIRBYTE_CLOUD_MCP_READONLY_MODE:
54+
is_readonly = annotations.get(READ_ONLY_HINT, False)
55+
if not is_readonly:
56+
return False
57+
58+
if AIRBYTE_CLOUD_MCP_SAFE_MODE:
59+
is_destructive = annotations.get(DESTRUCTIVE_HINT, True) # Default is True per FastMCP
60+
if is_destructive:
61+
return False
62+
63+
return True
64+
65+
66+
def get_registered_tools(
67+
domain: Literal["cloud", "local", "registry"] | None = None,
68+
) -> list[tuple[Callable[..., Any], dict[str, Any]]]:
69+
"""Get all registered tools, optionally filtered by domain.
70+
71+
Args:
72+
domain: The domain to filter by (e.g., "cloud", "local", "registry").
73+
If None, returns all tools.
74+
75+
Returns:
76+
List of tuples containing (function, annotations) for each registered tool
77+
"""
78+
if domain is None:
79+
return _REGISTERED_TOOLS.copy()
80+
return [(func, ann) for func, ann in _REGISTERED_TOOLS if ann.get("domain") == domain]
81+
82+
83+
def mcp_tool(
84+
domain: Literal["cloud", "local", "registry"],
85+
*,
86+
read_only: bool = False,
87+
destructive: bool = False,
88+
idempotent: bool = False,
89+
open_world: bool = False,
90+
extra_help_text: str | None = None,
91+
) -> Callable[[F], F]:
92+
"""Decorator to tag an MCP tool function with annotations for deferred registration.
93+
94+
This decorator stores the annotations on the function for later use during
95+
deferred registration. It does not register the tool immediately.
96+
97+
Args:
98+
domain: The domain this tool belongs to (e.g., "cloud", "local", "registry")
99+
read_only: If True, tool only reads without making changes (default: False)
100+
destructive: If True, tool modifies/deletes existing data (default: False)
101+
idempotent: If True, repeated calls have same effect (default: False)
102+
open_world: If True, tool interacts with external systems (default: False)
103+
extra_help_text: Optional text to append to the function's docstring
104+
with a newline delimiter
105+
106+
Returns:
107+
Decorator function that tags the tool with annotations
108+
109+
Example:
110+
@mcp_tool("cloud", read_only=True, idempotent=True)
111+
def list_sources():
112+
...
113+
"""
114+
annotations: dict[str, Any] = {
115+
"domain": domain,
116+
READ_ONLY_HINT: read_only,
117+
DESTRUCTIVE_HINT: destructive,
118+
IDEMPOTENT_HINT: idempotent,
119+
OPEN_WORLD_HINT: open_world,
120+
}
121+
122+
def decorator(func: F) -> F:
123+
func._mcp_annotations = annotations # type: ignore[attr-defined] # noqa: SLF001
124+
func._mcp_domain = domain # type: ignore[attr-defined] # noqa: SLF001
125+
func._mcp_extra_help_text = extra_help_text # type: ignore[attr-defined] # noqa: SLF001
126+
_REGISTERED_TOOLS.append((func, annotations))
127+
return func
128+
129+
return decorator
130+
131+
132+
def register_tools(app: Any, domain: Literal["cloud", "local", "registry"]) -> None: # noqa: ANN401
133+
"""Register tools with the FastMCP app, filtered by domain and safe mode settings.
134+
135+
Args:
136+
app: The FastMCP app instance
137+
domain: The domain to register tools for (e.g., "cloud", "local", "registry")
138+
"""
139+
for func, tool_annotations in get_registered_tools(domain):
140+
if should_register_tool(tool_annotations):
141+
extra_help_text = getattr(func, "_mcp_extra_help_text", None)
142+
if extra_help_text:
143+
description = (func.__doc__ or "").rstrip() + "\n" + extra_help_text
144+
app.tool(func, annotations=tool_annotations, description=description)
145+
else:
146+
app.tool(func, annotations=tool_annotations)

0 commit comments

Comments
 (0)