Skip to content

Commit 6ebb2d8

Browse files
committed
code only schema generation (without serverless configs)
1 parent 1bbf413 commit 6ebb2d8

File tree

10 files changed

+3037
-38
lines changed

10 files changed

+3037
-38
lines changed
Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
import importlib.util
2+
import sys
3+
from dataclasses import dataclass
4+
from pathlib import Path
5+
from typing import Any, Dict, List, Optional
6+
7+
from rich import print as rprint
8+
9+
from .python_discovery import EndpointInfo, RouteInfo
10+
11+
12+
@dataclass
13+
class DocsModule:
14+
"""Information about a discovered docs.py module."""
15+
file_path: Path
16+
module_name: str
17+
endpoint_name: str
18+
19+
20+
@dataclass
21+
class EndpointDocs:
22+
"""Documentation extracted from docs.py files."""
23+
summary: Optional[str]
24+
description: Optional[str]
25+
response_model: Optional[str]
26+
status_code: Optional[int]
27+
responses: Dict[int, Dict[str, Any]]
28+
tags: List[str]
29+
name: Optional[str]
30+
response_description: Optional[str]
31+
request_model: Optional[str]
32+
33+
34+
@dataclass
35+
class CompleteEndpointInfo:
36+
"""Complete endpoint information combining function metadata and docs."""
37+
function_name: str
38+
file_path: Path
39+
module_path: str
40+
endpoint_name: str
41+
http_method: str
42+
path: str
43+
44+
# Documentation
45+
summary: Optional[str]
46+
description: Optional[str]
47+
48+
# Request/Response
49+
request_model: Optional[str]
50+
response_model: Optional[str]
51+
status_code: int
52+
responses: Dict[int, Dict[str, Any]]
53+
54+
# OpenAPI metadata
55+
tags: List[str]
56+
parameters: List[Dict[str, Any]]
57+
58+
# Function metadata
59+
docstring: Optional[str]
60+
function_signature: Optional[str]
61+
is_async: bool
62+
decorators: List[str]
63+
64+
65+
class DocsExtractor:
66+
"""Extracts documentation from docs.py files and merges with function metadata."""
67+
68+
def __init__(self):
69+
self.docs_cache = {}
70+
71+
def discover_docs_files(self, source_dir: Path,
72+
docs_pattern: str = "**/docs.py") -> List[DocsModule]:
73+
"""
74+
Discover docs.py files in the source directory.
75+
76+
Args:
77+
source_dir: Root directory to search
78+
docs_pattern: Glob pattern for docs files
79+
80+
Returns:
81+
List of discovered docs modules
82+
"""
83+
docs_modules = []
84+
85+
for docs_file in source_dir.glob(docs_pattern):
86+
try:
87+
# Extract endpoint name from directory structure
88+
endpoint_name = self._extract_endpoint_name(docs_file)
89+
module_name = self._get_module_name(docs_file, source_dir)
90+
91+
docs_modules.append(DocsModule(
92+
file_path=docs_file,
93+
module_name=module_name,
94+
endpoint_name=endpoint_name
95+
))
96+
97+
except Exception as e:
98+
rprint(f"[yellow]Warning: Could not process docs file {docs_file}: {e}[/yellow]")
99+
100+
return docs_modules
101+
102+
def _extract_endpoint_name(self, docs_file: Path) -> str:
103+
"""Extract endpoint name from docs.py file path."""
104+
# For path like .../http_endpoints/register/docs.py -> "register"
105+
parts = docs_file.parts
106+
107+
if len(parts) >= 2 and docs_file.name == 'docs.py':
108+
return parts[-2] # Parent directory name
109+
110+
return docs_file.stem
111+
112+
def _get_module_name(self, docs_file: Path, source_dir: Path) -> str:
113+
"""Convert docs file path to Python module path."""
114+
try:
115+
relative_path = docs_file.relative_to(source_dir)
116+
module_parts = list(relative_path.with_suffix('').parts)
117+
return '.'.join(module_parts)
118+
except Exception:
119+
return str(docs_file.stem)
120+
121+
def extract_docs_metadata(self, docs_module: DocsModule, source_dir: Optional[Path] = None) -> Optional[EndpointDocs]:
122+
"""
123+
Extract documentation metadata from a docs.py module.
124+
125+
Args:
126+
docs_module: Information about the docs module
127+
source_dir: Source directory to add to Python path for imports
128+
129+
Returns:
130+
Extracted documentation metadata or None if extraction fails
131+
"""
132+
try:
133+
# Add source directory to Python path temporarily for imports
134+
old_path = sys.path.copy()
135+
if source_dir:
136+
# Add parent directory of source_dir to handle project imports
137+
parent_dir = str(source_dir.parent)
138+
if parent_dir not in sys.path:
139+
sys.path.insert(0, parent_dir)
140+
141+
try:
142+
# Load the module dynamically
143+
spec = importlib.util.spec_from_file_location(
144+
docs_module.module_name,
145+
docs_module.file_path
146+
)
147+
if not spec or not spec.loader:
148+
return None
149+
150+
module = importlib.util.module_from_spec(spec)
151+
152+
# Add to sys.modules temporarily to handle relative imports
153+
old_module = sys.modules.get(docs_module.module_name)
154+
sys.modules[docs_module.module_name] = module
155+
156+
try:
157+
spec.loader.exec_module(module)
158+
159+
# Extract docs_input dictionary
160+
docs_input = getattr(module, 'docs_input', {})
161+
162+
if not docs_input:
163+
rprint(f"[yellow]Warning: No docs_input found in {docs_module.file_path}[/yellow]")
164+
return None
165+
166+
return self._parse_docs_input(docs_input)
167+
168+
finally:
169+
# Restore original module
170+
if old_module is not None:
171+
sys.modules[docs_module.module_name] = old_module
172+
else:
173+
sys.modules.pop(docs_module.module_name, None)
174+
175+
finally:
176+
# Restore original Python path
177+
sys.path[:] = old_path
178+
179+
except Exception as e:
180+
rprint(f"[yellow]Warning: Could not extract docs from {docs_module.file_path}: {e}[/yellow]")
181+
return None
182+
183+
def _parse_docs_input(self, docs_input: Dict[str, Any]) -> EndpointDocs:
184+
"""Parse the docs_input dictionary into structured documentation."""
185+
186+
# Extract response model information
187+
response_model = None
188+
if 'response_model' in docs_input:
189+
response_model_obj = docs_input['response_model']
190+
if hasattr(response_model_obj, '__name__'):
191+
response_model = response_model_obj.__name__
192+
else:
193+
response_model = str(response_model_obj)
194+
195+
# Extract request model information
196+
request_model = None
197+
if 'request_model' in docs_input:
198+
request_model_obj = docs_input['request_model']
199+
if hasattr(request_model_obj, '__name__'):
200+
request_model = request_model_obj.__name__
201+
else:
202+
request_model = str(request_model_obj)
203+
204+
# Extract error responses - check both 'responses' and 'error_responses'
205+
responses = {}
206+
207+
# Handle 'responses' field
208+
if 'responses' in docs_input:
209+
for status_code, response_info in docs_input['responses'].items():
210+
if isinstance(response_info, dict) and 'model' in response_info:
211+
model_obj = response_info['model']
212+
model_name = model_obj.__name__ if hasattr(model_obj, '__name__') else str(model_obj)
213+
responses[int(status_code)] = {
214+
'description': response_info.get('description', f'Error response {status_code}'),
215+
'model': model_name
216+
}
217+
elif isinstance(response_info, dict):
218+
responses[int(status_code)] = {
219+
'description': response_info.get('description', f'Response {status_code}')
220+
}
221+
else:
222+
responses[int(status_code)] = {
223+
'description': str(response_info)
224+
}
225+
226+
# Handle 'error_responses' field
227+
if 'error_responses' in docs_input:
228+
for status_code, response_info in docs_input['error_responses'].items():
229+
if isinstance(response_info, dict) and 'model' in response_info:
230+
model_obj = response_info['model']
231+
model_name = model_obj.__name__ if hasattr(model_obj, '__name__') else str(model_obj)
232+
responses[int(status_code)] = {
233+
'description': response_info.get('description', f'Error response {status_code}'),
234+
'model': model_name
235+
}
236+
elif isinstance(response_info, dict):
237+
responses[int(status_code)] = {
238+
'description': response_info.get('description', f'Error response {status_code}')
239+
}
240+
else:
241+
responses[int(status_code)] = {
242+
'description': str(response_info)
243+
}
244+
245+
# Add success response if we have a response model
246+
status_code = docs_input.get('status_code', 200)
247+
if response_model and status_code not in responses:
248+
responses[status_code] = {
249+
'description': docs_input.get('response_description', 'Successful response'),
250+
'model': response_model
251+
}
252+
253+
return EndpointDocs(
254+
summary=docs_input.get('summary'),
255+
description=docs_input.get('description'),
256+
response_model=response_model,
257+
status_code=status_code,
258+
responses=responses,
259+
tags=docs_input.get('tags', []),
260+
name=docs_input.get('name'),
261+
response_description=docs_input.get('response_description'),
262+
request_model=request_model
263+
)
264+
265+
def merge_with_function_metadata(self,
266+
func_info: 'EndpointInfo', # Forward reference
267+
route_info: 'RouteInfo', # Forward reference
268+
docs_meta: Optional[EndpointDocs] = None) -> CompleteEndpointInfo:
269+
"""
270+
Merge function metadata with documentation metadata.
271+
272+
Args:
273+
func_info: Function information from PythonEndpointDiscovery
274+
route_info: Route information
275+
docs_meta: Documentation metadata from docs.py (optional)
276+
277+
Returns:
278+
Complete endpoint information
279+
"""
280+
from .python_discovery import DocstringMetadata
281+
282+
# Parse docstring for additional info
283+
docstring_meta = DocstringMetadata(
284+
summary=None, description=None, args={},
285+
returns=None, raises={}, openapi_info={}
286+
)
287+
288+
if func_info.docstring:
289+
# Simple parsing - could be enhanced
290+
lines = func_info.docstring.strip().split('\n')
291+
if lines:
292+
docstring_meta.summary = lines[0].strip()
293+
if len(lines) > 2:
294+
docstring_meta.description = ' '.join(line.strip() for line in lines[2:] if line.strip())
295+
296+
# Determine final values, preferring docs.py over docstring
297+
summary = (docs_meta.summary if docs_meta else None) or docstring_meta.summary or func_info.function_name
298+
description = (docs_meta.description if docs_meta else None) or docstring_meta.description or ""
299+
300+
# Determine request model - prefer docs.py over function signature
301+
request_model = None
302+
if docs_meta and docs_meta.request_model:
303+
request_model = docs_meta.request_model
304+
elif func_info.request_model:
305+
request_model = func_info.request_model
306+
else:
307+
# Fallback: extract from function parameters
308+
for param in func_info.parameters:
309+
if param['annotation'] and param['name'] not in ['context', 'event']:
310+
annotation = param['annotation']
311+
if 'Request' in annotation or 'DTO' in annotation or param.get('is_request_model', False):
312+
request_model = annotation
313+
break
314+
315+
# Determine response model - prefer docs.py over function signature
316+
response_model = None
317+
if docs_meta and docs_meta.response_model:
318+
response_model = docs_meta.response_model
319+
elif func_info.response_model:
320+
response_model = func_info.response_model
321+
322+
# Merge responses
323+
responses = docs_meta.responses if docs_meta else {}
324+
if not responses:
325+
# Default response
326+
responses[200] = {'description': 'Successful response'}
327+
328+
# Determine tags
329+
tags = []
330+
if docs_meta and docs_meta.tags:
331+
tags = docs_meta.tags
332+
else:
333+
# Infer from path
334+
if route_info.endpoint_name:
335+
tags = [route_info.endpoint_name]
336+
337+
return CompleteEndpointInfo(
338+
function_name=func_info.function_name,
339+
file_path=func_info.file_path,
340+
module_path=func_info.module_path,
341+
endpoint_name=route_info.endpoint_name or func_info.function_name,
342+
http_method=route_info.method or 'POST',
343+
path=route_info.path or f"/{func_info.function_name}",
344+
345+
summary=summary,
346+
description=description,
347+
348+
request_model=request_model,
349+
response_model=response_model,
350+
status_code=docs_meta.status_code if docs_meta else 200,
351+
responses=responses,
352+
353+
tags=tags,
354+
parameters=[], # Will be populated later
355+
356+
docstring=func_info.docstring,
357+
function_signature=func_info.function_signature,
358+
is_async=func_info.is_async,
359+
decorators=func_info.decorators
360+
)

0 commit comments

Comments
 (0)