Skip to content

Commit cd821e0

Browse files
authored
Aura api add tool annotations add api client file (#156)
* split client, service and mcp classes into separate files * Update CHANGELOG.md * add tool annotations * update test imports
1 parent 4ec75ac commit cd821e0

File tree

9 files changed

+516
-426
lines changed

9 files changed

+516
-426
lines changed

servers/mcp-neo4j-cloud-aura-api/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
### Fixed
44

55
### Changed
6+
* Split client, service and MCP classes into separate files
7+
* Create centralized logger config in `utils.py`
68

79
### Added
10+
* Add tool annotations to tools to better describe their effects
811

912
## v0.3.0
1013

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
import requests
2+
import time
3+
import json
4+
from typing import Dict, Any, List, Optional, Union
5+
from .utils import _validate_region, get_logger
6+
7+
logger = get_logger(__name__)
8+
9+
class AuraAPIClient:
10+
"""Client for interacting with Neo4j Aura API."""
11+
12+
BASE_URL = "https://api.neo4j.io/v1"
13+
14+
def __init__(self, client_id: str, client_secret: str):
15+
self.client_id = client_id
16+
self.client_secret = client_secret
17+
self.token = None
18+
self.token_expiry = 0
19+
20+
def _get_auth_token(self) -> str:
21+
"""Get authentication token for Aura API."""
22+
auth_url = "https://api.neo4j.io/oauth/token"
23+
24+
# Create base64 encoded credentials
25+
import base64
26+
credentials = f"{self.client_id}:{self.client_secret}"
27+
encoded_credentials = base64.b64encode(credentials.encode()).decode()
28+
29+
headers = {
30+
"Authorization": f"Basic {encoded_credentials}",
31+
"Content-Type": "application/x-www-form-urlencoded"
32+
}
33+
34+
payload = {
35+
"grant_type": "client_credentials"
36+
}
37+
38+
try:
39+
response = requests.post(auth_url, headers=headers, data=payload)
40+
response.raise_for_status()
41+
token_data = response.json()
42+
if not isinstance(token_data, dict) or \
43+
not token_data.get("access_token") or \
44+
not token_data.get("expires_in") or \
45+
not token_data.get("token_type") or \
46+
token_data.get("token_type").lower() != "bearer":
47+
raise Exception("Invalid token response format")
48+
self.token = token_data["access_token"]
49+
return self.token
50+
except requests.RequestException as e:
51+
logger.error(f"Authentication error: {str(e)}")
52+
raise Exception(f"Failed to authenticate with Neo4j Aura API: {str(e)}")
53+
54+
def _get_headers(self) -> Dict[str, str]:
55+
"""Get headers for API requests including authentication."""
56+
current_time = time.time()
57+
if not self.token or current_time >= self.token_expiry:
58+
self.token = self._get_auth_token()
59+
60+
return {
61+
"Authorization": f"Bearer {self.token}",
62+
"Content-Type": "application/json",
63+
"Accept": "application/json"
64+
}
65+
66+
def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
67+
"""Handle API response and errors."""
68+
try:
69+
response.raise_for_status()
70+
data = response.json()
71+
if "data" in data:
72+
return data["data"]
73+
else:
74+
return data
75+
except requests.HTTPError as e:
76+
error_msg = f"HTTP error: {e}"
77+
try:
78+
error_data = response.json()
79+
if "message" in error_data:
80+
error_msg = f"{error_msg} - {error_data['message']}"
81+
except:
82+
pass
83+
logger.error(error_msg)
84+
raise Exception(error_msg)
85+
except requests.RequestException as e:
86+
logger.error(f"Request error: {str(e)}")
87+
raise Exception(f"API request failed: {str(e)}")
88+
except json.JSONDecodeError:
89+
logger.error("Failed to parse API response")
90+
raise Exception("Failed to parse API response")
91+
92+
def list_instances(self) -> List[Dict[str, Any]]:
93+
"""List all database instances."""
94+
url = f"{self.BASE_URL}/instances"
95+
response = requests.get(url, headers=self._get_headers())
96+
return self._handle_response(response)
97+
98+
def get_instance_details(self, instance_ids: Union[str, List[str]]) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
99+
"""Get details for one or more instances by ID.
100+
101+
Args:
102+
instance_ids: Either a single instance ID string or a list of instance ID strings
103+
104+
Returns:
105+
A single instance details dict or a list of instance details dicts
106+
"""
107+
if isinstance(instance_ids, str):
108+
# Handle single instance ID
109+
url = f"{self.BASE_URL}/instances/{instance_ids}"
110+
response = requests.get(url, headers=self._get_headers())
111+
return self._handle_response(response)
112+
else:
113+
# Handle list of instance IDs
114+
results = []
115+
for instance_id in instance_ids:
116+
url = f"{self.BASE_URL}/instances/{instance_id}"
117+
response = requests.get(url, headers=self._get_headers())
118+
try:
119+
data = self._handle_response(response)
120+
results.append(data)
121+
except Exception as e:
122+
results.append({"error": str(e), "instance_id": instance_id})
123+
return results
124+
125+
def get_instance_by_name(self, name: str) -> Optional[Dict[str, Any]]:
126+
"""Find an instance by name."""
127+
instances = self.list_instances()
128+
for instance in instances:
129+
if name.lower() in instance.get("name", "").lower():
130+
# Get full instance details using the instance ID
131+
return self.get_instance_details(instance.get("id"))
132+
return None
133+
134+
def create_instance(self, tenant_id: str, name: str, memory: int = 1, region: str = "europe-west1",
135+
version: str = "5", type: str = "free-db",
136+
vector_optimized: bool = False,
137+
cloud_provider: str = "gcp", graph_analytics_plugin: bool = False,
138+
source_instance_id: str = None) -> Dict[str, Any]:
139+
"""Create a new database instance."""
140+
if tenant_id is None:
141+
raise ValueError("tenant_id is required")
142+
143+
# Always set version to "5"
144+
version = "5"
145+
146+
# Validate based on instance type
147+
if type == "free-db":
148+
if memory != 1:
149+
raise ValueError("free-db instances can only have 1GB memory")
150+
151+
if not cloud_provider == "gcp":
152+
raise ValueError("free-db instances can only be created in GCP regions")
153+
154+
if vector_optimized:
155+
raise ValueError("free-db instances cannot be vector optimized")
156+
157+
# Validate for professional/enterprise/business-critical types
158+
elif type in ["professional-db", "enterprise-db", "business-critical"]:
159+
if cloud_provider and cloud_provider not in ["gcp", "aws", "azure"]:
160+
raise ValueError("cloud_provider must be one of: gcp, aws, azure")
161+
162+
if vector_optimized and memory < 4:
163+
raise ValueError("vector optimized instances must have at least 4GB memory")
164+
165+
# If cloning, source_instance_id is required
166+
if source_instance_id is not None:
167+
if not isinstance(source_instance_id, str):
168+
raise ValueError("source_instance for clone from instance must be defined")
169+
else:
170+
raise ValueError(f"Invalid type {type}")
171+
172+
_validate_region(cloud_provider, region)
173+
174+
if graph_analytics_plugin and type not in ["professional-db", "enterprise-db", "business-critical"]:
175+
raise ValueError("graph analytics plugin is only available for professional, enterprise, and business-critical instances")
176+
177+
if vector_optimized and type not in ["professional-db", "enterprise-db", "business-critical"]:
178+
raise ValueError("vector optimized instances are only available for professional, enterprise, and business-critical instances")
179+
180+
181+
payload = {
182+
"tenant_id": tenant_id,
183+
"name": name,
184+
"memory": f"{memory}GB",
185+
"region": region,
186+
"version": version,
187+
"type": type,
188+
"vector_optimized": vector_optimized,
189+
"cloud_provider": cloud_provider,
190+
"graph_analytics_plugin": graph_analytics_plugin
191+
}
192+
193+
# Add source_instance_id if provided (for cloning)
194+
if source_instance_id is not None:
195+
payload["source_instance_id"] = source_instance_id
196+
197+
url = f"{self.BASE_URL}/instances"
198+
response = requests.post(url, headers=self._get_headers(), json=payload)
199+
return self._handle_response(response)
200+
201+
def update_instance(self, instance_id: str, name: Optional[str] = None,
202+
memory: Optional[int] = None,
203+
vector_optimized: Optional[bool] = None,
204+
storage: Optional[int] = None) -> Dict[str, Any]:
205+
"""Update an existing instance."""
206+
payload = {}
207+
if name is not None:
208+
payload["name"] = name
209+
if memory is not None:
210+
payload["memory"] = f"{memory}GB"
211+
payload["storage"] = f"{2*memory}GB"
212+
if storage is not None:
213+
payload["storage"] = f"{storage}GB"
214+
if vector_optimized is not None:
215+
payload["vector_optimized"] = str(vector_optimized).lower()
216+
217+
# Validate vector optimization requirements only if both memory and vector_optimized are being updated
218+
if (memory is not None and vector_optimized is not None and
219+
vector_optimized and memory < 4):
220+
raise ValueError("vector optimized instances must have at least 4GB memory")
221+
222+
url = f"{self.BASE_URL}/instances/{instance_id}"
223+
response = requests.patch(url, headers=self._get_headers(), json=payload)
224+
return self._handle_response(response)
225+
226+
def pause_instance(self, instance_id: str) -> Dict[str, Any]:
227+
"""Pause a database instance."""
228+
url = f"{self.BASE_URL}/instances/{instance_id}/pause"
229+
response = requests.post(url, headers=self._get_headers())
230+
return self._handle_response(response)
231+
232+
def resume_instance(self, instance_id: str) -> Dict[str, Any]:
233+
"""Resume a paused database instance."""
234+
url = f"{self.BASE_URL}/instances/{instance_id}/resume"
235+
response = requests.post(url, headers=self._get_headers())
236+
return self._handle_response(response)
237+
238+
def list_tenants(self) -> List[Dict[str, Any]]:
239+
"""List all tenants/projects."""
240+
url = f"{self.BASE_URL}/tenants"
241+
response = requests.get(url, headers=self._get_headers())
242+
return self._handle_response(response)
243+
244+
def get_tenant_details(self, tenant_id: str) -> Dict[str, Any]:
245+
"""Get details for a specific tenant/project."""
246+
url = f"{self.BASE_URL}/tenants/{tenant_id}"
247+
response = requests.get(url, headers=self._get_headers())
248+
return self._handle_response(response)
249+
250+
def delete_instance(self, instance_id: str) -> Dict[str, Any]:
251+
"""Delete a database instance.
252+
253+
Args:
254+
instance_id: ID of the instance to delete
255+
256+
Returns:
257+
Response dict with status information
258+
"""
259+
url = f"{self.BASE_URL}/instances/{instance_id}"
260+
response = requests.delete(url, headers=self._get_headers())
261+
return self._handle_response(response)

0 commit comments

Comments
 (0)