-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathclient.py
More file actions
146 lines (119 loc) · 4.66 KB
/
client.py
File metadata and controls
146 lines (119 loc) · 4.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""REST API client with unsafe method blocking."""
import fnmatch
import logging
from typing import Any
from urllib.parse import urlencode, urljoin
import httpx
from ..utils.http_errors import build_http_error_response
logger = logging.getLogger(__name__)
# Unsafe HTTP methods (blocked by default)
_UNSAFE_METHODS = {"POST", "PUT", "DELETE", "PATCH"}
def _is_path_allowed(path: str, patterns: list[str]) -> bool:
"""Check if path matches any allowed pattern (fnmatch glob)."""
for pattern in patterns:
if fnmatch.fnmatch(path, pattern):
return True
return False
def _build_url(
path: str,
base_url: str,
path_params: dict[str, Any] | None = None,
query_params: dict[str, Any] | None = None,
) -> str:
"""Build full URL with path and query params.
Args:
path: API path (e.g., /users/{id})
base_url: Base URL for API
path_params: Values to substitute in path (e.g., {"id": "123"})
query_params: Query string parameters (e.g., {"limit": 10})
Returns:
Full URL string
"""
# Substitute path params
if path_params:
for key, value in path_params.items():
path = path.replace(f"{{{key}}}", str(value))
if not base_url:
raise ValueError("No base URL provided")
url = urljoin(base_url.rstrip("/") + "/", path.lstrip("/"))
# Add query params
if query_params:
# Filter out None values
filtered = {k: v for k, v in query_params.items() if v is not None}
if filtered:
url = f"{url}?{urlencode(filtered)}"
return url
async def execute_request(
method: str,
path: str,
path_params: dict[str, Any] | None = None,
query_params: dict[str, Any] | None = None,
body: dict[str, Any] | None = None,
base_url: str = "",
headers: dict[str, str] | None = None,
allow_unsafe: bool = False,
allow_unsafe_paths: list[str] | None = None,
) -> dict[str, Any]:
"""Execute REST API request. Unsafe methods blocked unless explicitly allowed.
Args:
method: HTTP method (GET, POST, PUT, DELETE, PATCH)
path: API path (e.g., /users/{id})
path_params: Values to substitute in path
query_params: Query string parameters
body: Request body (for POST/PUT/PATCH)
base_url: Base URL for API (required)
headers: Headers to send (e.g., Authorization)
allow_unsafe: Allow all POST/PUT/DELETE/PATCH methods
allow_unsafe_paths: Glob patterns for paths where unsafe methods are allowed
Returns:
Dict with success/data or error
"""
method = method.upper()
# Block unsafe methods by default
if method in _UNSAFE_METHODS and not allow_unsafe:
# Check if path matches allowlist
if not allow_unsafe_paths or not _is_path_allowed(path, allow_unsafe_paths):
return {
"success": False,
"error": f"{method} method not allowed (read-only mode). Use X-Allow-Unsafe-Paths header.",
}
try:
url = _build_url(path, base_url, path_params, query_params)
except ValueError as e:
return {"success": False, "error": str(e)}
request_headers = {"Accept": "application/json"}
if headers:
request_headers.update(headers)
# Log request details without leaking header values (e.g., auth tokens).
logger.info(
"REST request resolved: method=%s base_url=%s path=%s url=%s header_keys=%s",
method,
base_url,
path,
url,
sorted(request_headers.keys()),
)
async with httpx.AsyncClient(timeout=30.0) as client:
try:
if method == "GET":
resp = await client.get(url, headers=request_headers)
elif method in {"POST", "PUT", "PATCH"}:
request_headers["Content-Type"] = "application/json"
resp = await client.request(method, url, json=body, headers=request_headers)
elif method == "DELETE":
resp = await client.delete(url, headers=request_headers)
else:
return {"success": False, "error": f"Unsupported method: {method}"}
resp.raise_for_status()
# Handle different content types
content_type = resp.headers.get("content-type", "")
if "application/json" in content_type:
data = resp.json()
else:
data = resp.text
return {"success": True, "data": data}
except httpx.HTTPStatusError as e:
return build_http_error_response(e)
except Exception as e:
logger.exception("REST API error")
return {"success": False, "error": str(e)}