-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmcp_server.py
More file actions
176 lines (143 loc) · 5.99 KB
/
mcp_server.py
File metadata and controls
176 lines (143 loc) · 5.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python3
import os
import httpx
from typing import Dict, Any, Optional, List
import asyncio
from mcp.server.fastmcp import FastMCP
# Initialize the MCP server
mcp = FastMCP("Rossum MCP Server")
# Rossum API configuration
ROSSUM_API_BASE = os.getenv("ROSSUM_API_BASE", "https://api.elis.rossum.ai/v1")
ROSSUM_API_KEY = os.getenv("ROSSUM_API_KEY")
if not ROSSUM_API_KEY:
raise ValueError("ROSSUM_API_KEY environment variable must be set")
# Reusable HTTPX client
client = httpx.AsyncClient()
async def get_rossum_headers() -> Dict[str, str]:
return {
"Authorization": f"Token {ROSSUM_API_KEY}",
"Content-Type": "application/json"
}
async def _rossum_request(method: str, path: str, **kwargs) -> Any:
"""Make a request to the Rossum API"""
try:
response = await client.request(
method=method,
url=f"{ROSSUM_API_BASE}{path}",
headers=await get_rossum_headers(),
**kwargs
)
response.raise_for_status()
# Handle cases where Rossum might return empty body on success (e.g., 204)
if response.status_code == 204:
return None
return response.json()
except httpx.HTTPStatusError as e:
# Get error detail from response if possible
error_detail = str(e)
try:
error_detail = e.response.json()
except Exception:
pass
raise Exception(f"Rossum API error {e.response.status_code}: {error_detail}")
except httpx.RequestError as e:
raise Exception(f"Rossum API request failed: {str(e)}")
# --- Tool Implementation Functions ---
async def _rossum_unpaginated_request(method: str, path: str, summary_keys: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""Make a request to the Rossum API, handling pagination and summarization."""
all_items = []
current_path = path
while current_path:
try:
page_data = await _rossum_request(method, current_path)
except Exception as e:
# Log or handle error, for now re-raising.
# Consider logging: print(f"Error fetching Rossum data from {current_path}: {e}")
raise
if page_data and "results" in page_data and isinstance(page_data.get("results"), list):
for item in page_data["results"]:
if summary_keys:
summary_item = {key: item.get(key) for key in summary_keys if key in item}
all_items.append(summary_item)
else:
all_items.append(item) # Append the full item if no summary_keys
pagination_info = page_data.get("pagination") if page_data else None
next_page_url = pagination_info.get("next") if pagination_info else None
if next_page_url:
if next_page_url.startswith(ROSSUM_API_BASE):
current_path = next_page_url[len(ROSSUM_API_BASE):]
else:
# Consider logging: print(f"Warning: next_page_url {next_page_url} does not match ROSSUM_API_BASE")
current_path = None
else:
current_path = None
return all_items
async def _get_queues_impl() -> List[Dict[str, Any]]:
"""Get all queues with essential fields only, handling pagination."""
summary_keys = [
"id",
"name",
"url",
"schema",
"workspace",
"status",
]
return await _rossum_unpaginated_request("GET", "/queues", summary_keys=summary_keys)
async def _get_schema_impl(schema_id: str):
"""Get a specific schema by ID"""
return await _rossum_request("GET", f"/schemas/{schema_id}")
async def _get_hooks_impl() -> List[Dict[str, Any]]:
"""Get all hooks with essential fields only, handling pagination."""
summary_keys = ["id", "name", "type", "url", "active", "events", "queues"]
return await _rossum_unpaginated_request("GET", "/hooks", summary_keys=summary_keys)
async def _get_hook_impl(hook_id: str):
"""Get a specific hook by ID including source code"""
return await _rossum_request("GET", f"/hooks/{hook_id}")
async def _get_workspaces_impl() -> List[Dict[str, Any]]:
"""Get all workspaces, handling pagination."""
# No summary_keys provided, so it will fetch all fields for each workspace.
return await _rossum_unpaginated_request("GET", "/workspaces")
async def _get_workspace_impl(workspace_id: str):
"""Get a specific workspace by ID"""
return await _rossum_request("GET", f"/workspaces/{workspace_id}")
# --- MCP Tool Definitions ---
@mcp.tool()
async def rossum_get_queues() -> List[Dict[str, Any]]:
"""Get all queues from the Rossum organization."""
return await _get_queues_impl()
@mcp.tool()
async def rossum_get_schema(schema_id: str) -> Dict[str, Any]:
"""Get a specific schema by its ID.
Args:
schema_id: The ID of the schema to retrieve
"""
return await _get_schema_impl(schema_id=schema_id)
@mcp.tool()
async def rossum_get_hooks() -> List[Dict[str, Any]]:
"""Get all serverless function hooks from the Rossum organization."""
return await _get_hooks_impl()
@mcp.tool()
async def rossum_get_hook(hook_id: str) -> Dict[str, Any]:
"""Get a specific serverless function hook by its ID, including its source code.
Args:
hook_id: The ID of the hook to retrieve
"""
return await _get_hook_impl(hook_id=hook_id)
@mcp.tool()
async def rossum_get_workspaces() -> List[Dict[str, Any]]:
"""Get all workspaces from the Rossum organization."""
return await _get_workspaces_impl()
@mcp.tool()
async def rossum_get_workspace(workspace_id: str) -> Dict[str, Any]:
"""Get a specific workspace by its ID.
Args:
workspace_id: The ID of the workspace to retrieve
"""
return await _get_workspace_impl(workspace_id=workspace_id)
async def cleanup():
"""Close the HTTP client on shutdown"""
await client.aclose()
if __name__ == "__main__":
import atexit
atexit.register(lambda: asyncio.run(cleanup()))
mcp.run()