-
Notifications
You must be signed in to change notification settings - Fork 43
Expand file tree
/
Copy pathopenapi_converter.py
More file actions
406 lines (344 loc) · 17.7 KB
/
openapi_converter.py
File metadata and controls
406 lines (344 loc) · 17.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
"""OpenAPI specification converter for UTCP tool generation.
This module provides functionality to convert OpenAPI specifications (both 2.0
and 3.0) into UTCP tool definitions. It handles schema resolution, authentication
mapping, and proper tool creation from REST API specifications.
Key Features:
- OpenAPI 2.0 and 3.0 specification support
- Automatic JSON reference ($ref) resolution
- Authentication scheme mapping (API key, Basic, OAuth2)
- Input/output schema extraction from OpenAPI schemas
- URL path parameter handling
- Request body and header field mapping
- Provider name generation from specification metadata
The converter creates UTCP tools that can be used to interact with REST APIs
defined by OpenAPI specifications, providing a bridge between OpenAPI and UTCP.
"""
import json
from typing import Any, Dict, List, Optional, Tuple
import sys
import uuid
from utcp.shared.tool import Tool, ToolInputOutputSchema
from utcp.shared.utcp_manual import UtcpManual
from urllib.parse import urlparse
from utcp.shared.provider import HttpProvider
from utcp.shared.auth import Auth, ApiKeyAuth, BasicAuth, OAuth2Auth
class OpenApiConverter:
"""Converts OpenAPI specifications into UTCP tool definitions.
Processes OpenAPI 2.0 and 3.0 specifications to generate equivalent UTCP
tools, handling schema resolution, authentication mapping, and proper
HTTP provider configuration. Each operation in the OpenAPI spec becomes
a UTCP tool with appropriate input/output schemas.
Features:
- Complete OpenAPI specification parsing
- Recursive JSON reference ($ref) resolution
- Authentication scheme conversion (API key, Basic, OAuth2)
- Input parameter and request body handling
- Response schema extraction
- URL template and path parameter support
- Provider name normalization
- Placeholder variable generation for configuration
Architecture:
The converter works by iterating through all paths and operations
in the OpenAPI spec, extracting relevant information for each
operation, and creating corresponding UTCP tools with HTTP providers.
Attributes:
spec: The parsed OpenAPI specification dictionary.
spec_url: Optional URL where the specification was retrieved from.
placeholder_counter: Counter for generating unique placeholder variables.
provider_name: Normalized name for the provider derived from the spec.
"""
def __init__(self, openapi_spec: Dict[str, Any], spec_url: Optional[str] = None, provider_name: Optional[str] = None):
"""Initialize the OpenAPI converter.
Args:
openapi_spec: Parsed OpenAPI specification as a dictionary.
spec_url: Optional URL where the specification was retrieved from.
Used for base URL determination if servers are not specified.
provider_name: Optional custom name for the provider. If not
provided, derives name from the specification title.
"""
self.spec = openapi_spec
self.spec_url = spec_url
# Single counter for all placeholder variables
self.placeholder_counter = 0
# If provider_name is None then get the first word in spec.info.title
if provider_name is None:
title = openapi_spec.get("info", {}).get("title", "openapi_provider_" + uuid.uuid4().hex)
# Replace characters that are invalid for identifiers
invalid_chars = " -.,!?'\"\\/()[]{}#@$%^&*+=~`|;:<>"
self.provider_name = ''.join('_' if c in invalid_chars else c for c in title)
else:
self.provider_name = provider_name
def _increment_placeholder_counter(self) -> int:
"""Increments the global counter and returns the new value.
Returns:
The new counter value after incrementing
"""
self.placeholder_counter += 1
return self.placeholder_counter
def _get_placeholder(self, placeholder_name: str) -> str:
"""Returns a placeholder string using the current counter value.
Args:
placeholder_name: The name of the placeholder variable
"""
return f"${{{placeholder_name}_{self.placeholder_counter}}}"
def convert(self) -> UtcpManual:
"""Parses the OpenAPI specification and returns a UtcpManual."""
self.placeholder_counter = 0
tools = []
servers = self.spec.get("servers")
if servers:
base_url = servers[0].get("url", "/")
elif self.spec_url:
parsed_url = urlparse(self.spec_url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
else:
# Fallback if no server info and no spec URL is provided
base_url = "/"
print("No server info or spec URL provided. Using fallback base URL: ", base_url, file=sys.stderr)
for path, path_item in self.spec.get("paths", {}).items():
for method, operation in path_item.items():
if method.lower() in ['get', 'post', 'put', 'delete', 'patch']:
tool = self._create_tool(path, method, operation, base_url)
if tool:
tools.append(tool)
return UtcpManual(tools=tools)
def _resolve_ref(self, ref: str) -> Dict[str, Any]:
"""Resolves a local JSON reference."""
if not ref.startswith('#/'):
raise ValueError(f"External or non-local references are not supported: {ref}")
parts = ref[2:].split('/')
node = self.spec
for part in parts:
try:
node = node[part]
except (KeyError, TypeError):
raise ValueError(f"Reference not found: {ref}")
return node
def _resolve_schema(self, schema: Dict[str, Any]) -> Dict[str, Any]:
"""Recursively resolves all $refs in a schema object."""
return schema
if isinstance(schema, dict):
if "$ref" in schema:
resolved_ref = self._resolve_ref(schema["$ref"])
# The resolved reference could itself contain refs, so we recurse
return self._resolve_schema(resolved_ref)
# Resolve refs in nested properties
new_schema = {}
for key, value in schema.items():
new_schema[key] = self._resolve_schema(value)
return new_schema
if isinstance(schema, list):
return [self._resolve_schema(item) for item in schema]
return schema
def _extract_auth(self, operation: Dict[str, Any]) -> Optional[Auth]:
"""Extracts authentication information from OpenAPI operation and global security schemes."""
# First check for operation-level security requirements
security_requirements = operation.get("security", [])
# If no operation-level security, check global security requirements
if not security_requirements:
security_requirements = self.spec.get("security", [])
# If no security requirements, return None
if not security_requirements:
return None
# Get security schemes - support both OpenAPI 2.0 and 3.0
security_schemes = self._get_security_schemes()
# Process the first security requirement (most common case)
# Each security requirement is a dict with scheme name as key
for security_req in security_requirements:
for scheme_name, scopes in security_req.items():
if scheme_name in security_schemes:
scheme = security_schemes[scheme_name]
return self._create_auth_from_scheme(scheme, scheme_name)
return None
def _get_security_schemes(self) -> Dict[str, Any]:
"""Gets security schemes supporting both OpenAPI 2.0 and 3.0."""
# OpenAPI 3.0 format
if "components" in self.spec:
return self.spec.get("components", {}).get("securitySchemes", {})
# OpenAPI 2.0 format
return self.spec.get("securityDefinitions", {})
def _create_auth_from_scheme(self, scheme: Dict[str, Any], scheme_name: str) -> Optional[Auth]:
"""Creates an Auth object from an OpenAPI security scheme."""
scheme_type = scheme.get("type", "").lower()
if scheme_type == "apikey":
# For API key auth, use the parameter name from the OpenAPI spec
location = scheme.get("in", "header") # Default to header if not specified
param_name = scheme.get("name", "Authorization") # Default name
# Use the current counter value for the placeholder
api_key_placeholder = self._get_placeholder("API_KEY")
# Increment the counter after using it
self._increment_placeholder_counter()
return ApiKeyAuth(
api_key=api_key_placeholder,
var_name=param_name,
location=location
)
elif scheme_type == "basic":
# OpenAPI 2.0 format: type: basic
# Use the current counter value for both placeholders
username_placeholder = self._get_placeholder("USERNAME")
password_placeholder = self._get_placeholder("PASSWORD")
# Increment the counter after using it
self._increment_placeholder_counter()
return BasicAuth(
username=username_placeholder,
password=password_placeholder
)
elif scheme_type == "http":
# OpenAPI 3.0 format: type: http with scheme
http_scheme = scheme.get("scheme", "").lower()
if http_scheme == "basic":
# For basic auth, use conventional environment variable names
# Use the current counter value for both placeholders
username_placeholder = self._get_placeholder("USERNAME")
password_placeholder = self._get_placeholder("PASSWORD")
# Increment the counter after using it
self._increment_placeholder_counter()
return BasicAuth(
username=username_placeholder,
password=password_placeholder
)
elif http_scheme == "bearer":
# Treat bearer tokens as API keys
# Use the current counter value for the placeholder
api_key_placeholder = self._get_placeholder("API_KEY")
# Increment the counter after using it
self._increment_placeholder_counter()
return ApiKeyAuth(
api_key=f"Bearer {api_key_placeholder}",
var_name="Authorization",
location="header"
)
elif scheme_type == "oauth2":
# Handle both OpenAPI 2.0 and 3.0 OAuth2 formats
flows = scheme.get("flows", {})
# OpenAPI 3.0 format
if flows:
for flow_type, flow_config in flows.items():
# Support both old and new flow names
if flow_type in ["authorizationCode", "accessCode", "clientCredentials", "application"]:
token_url = flow_config.get("tokenUrl")
if token_url:
# Use the current counter value for both placeholders
client_id_placeholder = self._get_placeholder("CLIENT_ID")
client_secret_placeholder = self._get_placeholder("CLIENT_SECRET")
# Increment the counter after using it
self._increment_placeholder_counter()
return OAuth2Auth(
token_url=token_url,
client_id=client_id_placeholder,
client_secret=client_secret_placeholder,
scope=" ".join(flow_config.get("scopes", {}).keys()) or None
)
# OpenAPI 2.0 format (flows directly in scheme)
else:
flow_type = scheme.get("flow", "")
token_url = scheme.get("tokenUrl")
if token_url and flow_type in ["accessCode", "application", "clientCredentials"]:
# Use the current counter value for both placeholders
client_id_placeholder = self._get_placeholder("CLIENT_ID")
client_secret_placeholder = self._get_placeholder("CLIENT_SECRET")
# Increment the counter after using it
self._increment_placeholder_counter()
return OAuth2Auth(
token_url=token_url,
client_id=client_id_placeholder,
client_secret=client_secret_placeholder,
scope=" ".join(scheme.get("scopes", {}).keys()) or None
)
return None
def _create_tool(self, path: str, method: str, operation: Dict[str, Any], base_url: str) -> Optional[Tool]:
"""Creates a Tool object from an OpenAPI operation."""
operation_id = operation.get("operationId")
if not operation_id:
return None
description = operation.get("summary") or operation.get("description", "")
tags = operation.get("tags", [])
inputs, header_fields, body_field = self._extract_inputs(operation)
outputs = self._extract_outputs(operation)
auth = self._extract_auth(operation)
provider_name = self.spec.get("info", {}).get("title", "openapi_provider_" + uuid.uuid4().hex)
# Combine base URL and path, ensuring no double slashes
full_url = base_url.rstrip('/') + '/' + path.lstrip('/')
provider = HttpProvider(
name=provider_name,
provider_type="http",
http_method=method.upper(),
url=full_url,
body_field=body_field if body_field else None,
header_fields=header_fields if header_fields else None,
auth=auth
)
return Tool(
name=operation_id,
description=description,
inputs=inputs,
outputs=outputs,
tags=tags,
tool_provider=provider
)
def _extract_inputs(self, operation: Dict[str, Any]) -> Tuple[ToolInputOutputSchema, List[str], Optional[str]]:
"""Extracts input schema, header fields, and body field from an OpenAPI operation."""
properties = {}
required = []
header_fields = []
body_field = None
# Handle parameters (query, header, path, cookie)
for param in operation.get("parameters", []):
param = self._resolve_schema(param)
param_name = param.get("name")
if not param_name:
continue
if param.get("in") == "header":
header_fields.append(param_name)
schema = self._resolve_schema(param.get("schema", {}))
properties[param_name] = {
"type": schema.get("type", "string"),
"description": param.get("description", ""),
**schema
}
if param.get("required"):
required.append(param_name)
# Handle request body
request_body = operation.get("requestBody")
if request_body:
resolved_body = self._resolve_schema(request_body)
content = resolved_body.get("content", {})
json_schema = content.get("application/json", {}).get("schema")
if json_schema:
# Add a single 'body' field to represent the request body
body_field = "body"
properties[body_field] = {
"description": resolved_body.get("description", "Request body"),
**self._resolve_schema(json_schema)
}
if resolved_body.get("required"):
required.append(body_field)
schema = ToolInputOutputSchema(properties=properties, required=required if required else None)
return schema, header_fields, body_field
def _extract_outputs(self, operation: Dict[str, Any]) -> ToolInputOutputSchema:
"""Extracts the output schema from an OpenAPI operation, resolving refs."""
success_response = operation.get("responses", {}).get("200") or operation.get("responses", {}).get("201")
if not success_response:
return ToolInputOutputSchema()
resolved_response = self._resolve_schema(success_response)
content = resolved_response.get("content", {})
json_schema = content.get("application/json", {}).get("schema")
if not json_schema:
return ToolInputOutputSchema()
resolved_json_schema = self._resolve_schema(json_schema)
schema_args = {
"type": resolved_json_schema.get("type", "object"),
"properties": resolved_json_schema.get("properties", {}),
"required": resolved_json_schema.get("required"),
"description": resolved_json_schema.get("description"),
"title": resolved_json_schema.get("title"),
}
# Handle array item types
if schema_args["type"] == "array" and "items" in resolved_json_schema:
schema_args["items"] = resolved_json_schema.get("items")
# Handle additional schema attributes
for attr in ["enum", "minimum", "maximum", "format"]:
if attr in resolved_json_schema:
schema_args[attr] = resolved_json_schema.get(attr)
return ToolInputOutputSchema(**schema_args)