6
6
7
7
import json
8
8
import logging
9
- from typing import Any , Dict , List , Optional
9
+ from typing import Any , Callable , Dict , List , Optional
10
10
11
11
import httpx
12
12
from fastapi import FastAPI
13
13
from fastapi .openapi .utils import get_openapi
14
14
from mcp .server .fastmcp import FastMCP
15
15
from pydantic import Field
16
16
17
- logger = logging .getLogger ("fastapi_mcp" )
18
-
19
-
20
- def resolve_schema_references (schema : Dict [str , Any ], openapi_schema : Dict [str , Any ]) -> Dict [str , Any ]:
21
- """
22
- Resolve schema references in OpenAPI schemas.
23
-
24
- Args:
25
- schema: The schema that may contain references
26
- openapi_schema: The full OpenAPI schema to resolve references from
17
+ from .openapi_utils import (
18
+ OPENAPI_PYTHON_TYPES_MAP ,
19
+ clean_schema_for_display ,
20
+ extract_model_examples_from_components ,
21
+ generate_example_from_schema ,
22
+ parse_parameters_for_args_schema ,
23
+ resolve_schema_references ,
24
+ PYTHON_TYPE_IMPORTS ,
25
+ )
27
26
28
- Returns:
29
- The schema with references resolved
30
- """
31
- # Make a copy to avoid modifying the input schema
32
- schema = schema .copy ()
33
-
34
- # Handle $ref directly in the schema
35
- if "$ref" in schema :
36
- ref_path = schema ["$ref" ]
37
- # Standard OpenAPI references are in the format "#/components/schemas/ModelName"
38
- if ref_path .startswith ("#/components/schemas/" ):
39
- model_name = ref_path .split ("/" )[- 1 ]
40
- if "components" in openapi_schema and "schemas" in openapi_schema ["components" ]:
41
- if model_name in openapi_schema ["components" ]["schemas" ]:
42
- # Replace with the resolved schema
43
- ref_schema = openapi_schema ["components" ]["schemas" ][model_name ].copy ()
44
- # Remove the $ref key and merge with the original schema
45
- schema .pop ("$ref" )
46
- schema .update (ref_schema )
47
-
48
- # Handle array items
49
- if "type" in schema and schema ["type" ] == "array" and "items" in schema :
50
- schema ["items" ] = resolve_schema_references (schema ["items" ], openapi_schema )
51
-
52
- # Handle object properties
53
- if "properties" in schema :
54
- for prop_name , prop_schema in schema ["properties" ].items ():
55
- schema ["properties" ][prop_name ] = resolve_schema_references (prop_schema , openapi_schema )
56
-
57
- return schema
58
-
59
-
60
- def clean_schema_for_display (schema : Dict [str , Any ]) -> Dict [str , Any ]:
61
- """
62
- Clean up a schema for display by removing internal fields.
63
-
64
- Args:
65
- schema: The schema to clean
66
-
67
- Returns:
68
- The cleaned schema
69
- """
70
- # Make a copy to avoid modifying the input schema
71
- schema = schema .copy ()
72
-
73
- # Remove common internal fields that are not helpful for LLMs
74
- fields_to_remove = [
75
- "allOf" ,
76
- "anyOf" ,
77
- "oneOf" ,
78
- "nullable" ,
79
- "discriminator" ,
80
- "readOnly" ,
81
- "writeOnly" ,
82
- "xml" ,
83
- "externalDocs" ,
84
- ]
85
- for field in fields_to_remove :
86
- if field in schema :
87
- schema .pop (field )
88
-
89
- # Process nested properties
90
- if "properties" in schema :
91
- for prop_name , prop_schema in schema ["properties" ].items ():
92
- if isinstance (prop_schema , dict ):
93
- schema ["properties" ][prop_name ] = clean_schema_for_display (prop_schema )
94
-
95
- # Process array items
96
- if "type" in schema and schema ["type" ] == "array" and "items" in schema :
97
- if isinstance (schema ["items" ], dict ):
98
- schema ["items" ] = clean_schema_for_display (schema ["items" ])
99
-
100
- return schema
27
+ logger = logging .getLogger ("fastapi_mcp" )
101
28
102
29
103
30
def create_mcp_tools_from_openapi (
@@ -172,6 +99,29 @@ def create_mcp_tools_from_openapi(
172
99
describe_full_response_schema = describe_full_response_schema ,
173
100
)
174
101
102
+ def _create_http_tool_function (function_template : Callable , args_schema : Dict [str , Any ], additional_variables : Dict [str , Any ]) -> Callable :
103
+ # Build parameter string with type hints
104
+ param_list = []
105
+ for name , type_info in args_schema .items ():
106
+ type_hint = OPENAPI_PYTHON_TYPES_MAP .get (type_info , 'Any' )
107
+ param_list .append (f"{ name } : { type_hint } " )
108
+ parameters_str = ", " .join (param_list )
109
+
110
+ dynamic_function_body = f"""async def dynamic_http_tool_function({ parameters_str } ):
111
+ kwargs = {{{ ', ' .join ([f"'{ k } ': { k } " for k in args_schema .keys ()])} }}
112
+ return await http_tool_function_template(**kwargs)
113
+ """
114
+
115
+ # Create function namespace with required imports
116
+ namespace = {
117
+ "http_tool_function_template" : function_template ,
118
+ ** PYTHON_TYPE_IMPORTS ,
119
+ ** additional_variables
120
+ }
121
+
122
+ # Execute the dynamic function definition
123
+ exec (dynamic_function_body , namespace )
124
+ return namespace ["dynamic_http_tool_function" ]
175
125
176
126
def create_http_tool (
177
127
mcp_server : FastMCP ,
@@ -435,8 +385,14 @@ def create_http_tool(
435
385
if param_required :
436
386
required_props .append (param_name )
437
387
438
- # Function to dynamically call the API endpoint
439
- async def http_tool_function (kwargs : Dict [str , Any ] = Field (default_factory = dict )):
388
+ # Create a proper input schema for the tool
389
+ input_schema = {"type" : "object" , "properties" : properties , "title" : f"{ operation_id } Arguments" }
390
+
391
+ if required_props :
392
+ input_schema ["required" ] = required_props
393
+
394
+ # Dynamically create a function to call the API endpoint
395
+ async def http_tool_function_template (** kwargs ):
440
396
# Prepare URL with path parameters
441
397
url = f"{ base_url } { path } "
442
398
for param_name , _ in path_params :
@@ -480,13 +436,10 @@ async def http_tool_function(kwargs: Dict[str, Any] = Field(default_factory=dict
480
436
except ValueError :
481
437
return response .text
482
438
483
- # Create a proper input schema for the tool
484
- input_schema = {"type" : "object" , "properties" : properties , "title" : f"{ operation_id } Arguments" }
485
-
486
- if required_props :
487
- input_schema ["required" ] = required_props
488
-
489
- # Set the function name and docstring
439
+ # Create the http_tool_function (with name and docstring)
440
+ args_schema = parse_parameters_for_args_schema (parameters )
441
+ additional_variables = {"path_params" : path_params , "query_params" : query_params , "header_params" : header_params }
442
+ http_tool_function = _create_http_tool_function (http_tool_function_template , args_schema , additional_variables )
490
443
http_tool_function .__name__ = operation_id
491
444
http_tool_function .__doc__ = tool_description
492
445
@@ -499,116 +452,3 @@ async def http_tool_function(kwargs: Dict[str, Any] = Field(default_factory=dict
499
452
500
453
# Update the tool's parameters to use our custom schema instead of the auto-generated one
501
454
tool .parameters = input_schema
502
-
503
-
504
- def extract_model_examples_from_components (
505
- model_name : str , openapi_schema : Dict [str , Any ]
506
- ) -> Optional [List [Dict [str , Any ]]]:
507
- """
508
- Extract examples from a model definition in the OpenAPI components.
509
-
510
- Args:
511
- model_name: The name of the model to extract examples from
512
- openapi_schema: The full OpenAPI schema
513
-
514
- Returns:
515
- List of example dictionaries if found, None otherwise
516
- """
517
- if "components" not in openapi_schema or "schemas" not in openapi_schema ["components" ]:
518
- return None
519
-
520
- if model_name not in openapi_schema ["components" ]["schemas" ]:
521
- return None
522
-
523
- schema = openapi_schema ["components" ]["schemas" ][model_name ]
524
-
525
- # Look for examples in the schema
526
- examples = None
527
-
528
- # Check for examples field directly (OpenAPI 3.1.0+)
529
- if "examples" in schema :
530
- examples = schema ["examples" ]
531
- # Check for example field (older OpenAPI versions)
532
- elif "example" in schema :
533
- examples = [schema ["example" ]]
534
-
535
- return examples
536
-
537
-
538
- def generate_example_from_schema (schema : Dict [str , Any ], model_name : Optional [str ] = None ) -> Any :
539
- """
540
- Generate a simple example response from a JSON schema.
541
-
542
- Args:
543
- schema: The JSON schema to generate an example from
544
- model_name: Optional model name for special handling
545
-
546
- Returns:
547
- An example object based on the schema
548
- """
549
- if not schema or not isinstance (schema , dict ):
550
- return None
551
-
552
- # Special handling for known model types
553
- if model_name == "Item" :
554
- # Create a realistic Item example since this is commonly used
555
- return {
556
- "id" : 1 ,
557
- "name" : "Hammer" ,
558
- "description" : "A tool for hammering nails" ,
559
- "price" : 9.99 ,
560
- "tags" : ["tool" , "hardware" ],
561
- }
562
- elif model_name == "HTTPValidationError" :
563
- # Create a realistic validation error example
564
- return {"detail" : [{"loc" : ["body" , "name" ], "msg" : "field required" , "type" : "value_error.missing" }]}
565
-
566
- # Handle different types
567
- schema_type = schema .get ("type" )
568
-
569
- if schema_type == "object" :
570
- result = {}
571
- if "properties" in schema :
572
- for prop_name , prop_schema in schema ["properties" ].items ():
573
- # Generate an example for each property
574
- prop_example = generate_example_from_schema (prop_schema )
575
- if prop_example is not None :
576
- result [prop_name ] = prop_example
577
- return result
578
-
579
- elif schema_type == "array" :
580
- if "items" in schema :
581
- # Generate a single example item
582
- item_example = generate_example_from_schema (schema ["items" ])
583
- if item_example is not None :
584
- return [item_example ]
585
- return []
586
-
587
- elif schema_type == "string" :
588
- # Check if there's a format
589
- format_type = schema .get ("format" )
590
- if format_type == "date-time" :
591
- return "2023-01-01T00:00:00Z"
592
- elif format_type == "date" :
593
- return "2023-01-01"
594
- elif format_type == "email" :
595
-
596
- elif format_type == "uri" :
597
- return "https://example.com"
598
- # Use title or property name if available
599
- return schema .get ("title" , "string" )
600
-
601
- elif schema_type == "integer" :
602
- return 1
603
-
604
- elif schema_type == "number" :
605
- return 1.0
606
-
607
- elif schema_type == "boolean" :
608
- return True
609
-
610
- elif schema_type == "null" :
611
- return None
612
-
613
- # Default case
614
- return None
0 commit comments