2
2
3
3
import logging
4
4
from datetime import datetime
5
- from typing import Any , Dict , List , Optional , Type , Union
5
+ from typing import Any , Dict , List , Literal , Optional , Type , Union
6
6
7
- from pydantic import BaseModel , Field , create_model
7
+ from pydantic import BaseModel , Field , conlist , create_model
8
8
9
9
logger = logging .getLogger (__name__ )
10
10
@@ -33,7 +33,9 @@ def create_model_from_schema(
33
33
raise ValueError (f"Schema must be a dictionary, got { type (schema )} " )
34
34
35
35
if schema .get ("type" ) != "object" :
36
- raise ValueError (f"Schema must be of type 'object', got { schema .get ('type' )} " )
36
+ raise ValueError (
37
+ f"Invalid schema for model '{ model_name } ': root type must be 'object', got { schema .get ('type' )} "
38
+ )
37
39
38
40
properties = schema .get ("properties" , {})
39
41
required_fields = set (schema .get ("required" , []))
@@ -48,7 +50,7 @@ def create_model_from_schema(
48
50
try :
49
51
is_required = field_name in required_fields
50
52
field_type , field_info = PydanticModelFactory ._process_field_schema (
51
- field_name , field_schema , is_required
53
+ field_name , field_schema , is_required , model_name
52
54
)
53
55
field_definitions [field_name ] = (field_type , field_info )
54
56
except Exception as e :
@@ -69,23 +71,20 @@ def create_model_from_schema(
69
71
70
72
@staticmethod
71
73
def _process_field_schema (
72
- field_name : str , field_schema : Dict [str , Any ], is_required : bool
74
+ field_name : str , field_schema : Dict [str , Any ], is_required : bool , parent_model_name : str = ""
73
75
) -> tuple [Type [Any ], Any ]:
74
76
"""Process a single field schema into Pydantic field type and info.
75
77
76
78
Args:
77
79
field_name: Name of the field
78
80
field_schema: JSON schema for the field
79
81
is_required: Whether the field is required
82
+ parent_model_name: Name of the parent model for nested object naming
80
83
81
84
Returns:
82
85
Tuple of (field_type, field_info)
83
86
"""
84
- field_type = PydanticModelFactory ._get_python_type (field_schema )
85
-
86
- # Handle optional fields
87
- if not is_required :
88
- field_type = Optional [field_type ] # type: ignore[assignment]
87
+ field_type = PydanticModelFactory ._get_python_type (field_schema , field_name , parent_model_name )
89
88
90
89
# Create Field with metadata
91
90
field_kwargs = {}
@@ -110,45 +109,77 @@ def _process_field_schema(
110
109
if "pattern" in field_schema :
111
110
field_kwargs ["pattern" ] = field_schema ["pattern" ]
112
111
112
+ # Handle array constraints
113
+ if field_schema .get ("type" ) == "array" :
114
+ min_items = field_schema .get ("minItems" )
115
+ max_items = field_schema .get ("maxItems" )
116
+ if min_items is not None or max_items is not None :
117
+ # Use conlist for array constraints
118
+ item_type = PydanticModelFactory ._get_array_item_type (field_schema , field_name , parent_model_name )
119
+ field_type = conlist (item_type , min_length = min_items , max_length = max_items )
120
+
113
121
# Handle format constraints
114
122
if "format" in field_schema :
115
123
format_type = field_schema ["format" ]
116
124
if format_type == "email" :
117
125
try :
118
126
from pydantic import EmailStr
119
127
120
- field_type = EmailStr if is_required else Optional [ EmailStr ] # type: ignore[assignment]
128
+ field_type = EmailStr
121
129
except ImportError :
122
130
logger .warning ("EmailStr not available, using str for email field '%s'" , field_name )
123
- field_type = str if is_required else Optional [ str ] # type: ignore[assignment]
131
+ field_type = str
124
132
elif format_type == "uri" :
125
133
try :
126
134
from pydantic import HttpUrl
127
135
128
- field_type = HttpUrl if is_required else Optional [ HttpUrl ] # type: ignore[assignment]
136
+ field_type = HttpUrl
129
137
except ImportError :
130
138
logger .warning ("HttpUrl not available, using str for uri field '%s'" , field_name )
131
- field_type = str if is_required else Optional [ str ] # type: ignore[assignment]
139
+ field_type = str
132
140
elif format_type == "date-time" :
133
- field_type = datetime if is_required else Optional [datetime ] # type: ignore[assignment]
141
+ field_type = datetime
142
+
143
+ # Handle optional fields after all type processing
144
+ if not is_required :
145
+ field_type = Optional [field_type ] # type: ignore[assignment]
134
146
135
147
field_info = Field (** field_kwargs ) if field_kwargs else Field ()
136
148
137
149
return field_type , field_info
138
150
139
151
@staticmethod
140
- def _get_python_type (schema : Dict [str , Any ]) -> Type [Any ]:
152
+ def _get_array_item_type (schema : Dict [str , Any ], field_name : str = "" , parent_model_name : str = "" ) -> Type [Any ]:
153
+ """Get the item type for an array schema."""
154
+ items_schema = schema .get ("items" , {})
155
+ if items_schema :
156
+ return PydanticModelFactory ._get_python_type (items_schema , field_name , parent_model_name )
157
+ else :
158
+ return Any
159
+
160
+ @staticmethod
161
+ def _get_python_type (schema : Dict [str , Any ], field_name : str = "" , parent_model_name : str = "" ) -> Type [Any ]:
141
162
"""Convert JSON schema type to Python type.
142
163
143
164
Args:
144
165
schema: JSON schema dictionary
166
+ field_name: Name of the field (for nested object naming)
167
+ parent_model_name: Name of the parent model (for nested object naming)
145
168
146
169
Returns:
147
170
Python type corresponding to the schema
148
171
"""
149
172
schema_type = schema .get ("type" )
150
173
151
174
if schema_type == "string" :
175
+ # Handle enum constraints
176
+ if "enum" in schema :
177
+ enum_values = schema ["enum" ]
178
+ # Use Literal for string enums to preserve string values
179
+ if len (enum_values ) == 1 :
180
+ return Literal [enum_values [0 ]] # type: ignore[return-value]
181
+ else :
182
+ return Literal [tuple (enum_values )] # type: ignore[return-value]
152
183
return str
153
184
elif schema_type == "integer" :
154
185
return int
@@ -159,19 +190,23 @@ def _get_python_type(schema: Dict[str, Any]) -> Type[Any]:
159
190
elif schema_type == "array" :
160
191
items_schema = schema .get ("items" , {})
161
192
if items_schema :
162
- item_type = PydanticModelFactory ._get_python_type (items_schema )
193
+ item_type = PydanticModelFactory ._get_python_type (items_schema , field_name , parent_model_name )
163
194
return List [item_type ] # type: ignore[valid-type]
164
195
else :
165
196
return List [Any ]
166
197
elif schema_type == "object" :
167
- # For nested objects, we could recursively create models
168
- # For now, return Dict[str, Any]
169
- return Dict [str , Any ]
198
+ # For nested objects, create a nested model
199
+ nested_model_name = (
200
+ f"{ parent_model_name } { field_name .title ()} "
201
+ if parent_model_name and field_name
202
+ else f"NestedObject{ field_name .title ()} "
203
+ )
204
+ return PydanticModelFactory .create_model_from_schema (nested_model_name , schema )
170
205
elif schema_type is None and "anyOf" in schema :
171
206
# Handle anyOf by creating Union types
172
207
types = []
173
208
for sub_schema in schema ["anyOf" ]:
174
- sub_type = PydanticModelFactory ._get_python_type (sub_schema )
209
+ sub_type = PydanticModelFactory ._get_python_type (sub_schema , field_name , parent_model_name )
175
210
types .append (sub_type )
176
211
if len (types ) == 1 :
177
212
return types [0 ]
@@ -186,7 +221,75 @@ def _get_python_type(schema: Dict[str, Any]) -> Type[Any]:
186
221
return Any
187
222
188
223
@staticmethod
189
- def get_schema_info (model_class : Type [BaseModel ]) -> Dict [str , Any ]:
224
+ def validate_schema (schema : Any ) -> bool :
225
+ """Validate if a schema is valid for model creation.
226
+
227
+ Args:
228
+ schema: Schema to validate
229
+
230
+ Returns:
231
+ True if schema is valid, False otherwise
232
+ """
233
+ try :
234
+ if not isinstance (schema , dict ):
235
+ return False
236
+
237
+ if schema .get ("type" ) != "object" :
238
+ return False
239
+
240
+ # Check properties have valid types
241
+ properties = schema .get ("properties" , {})
242
+ for _ , prop_schema in properties .items ():
243
+ if not isinstance (prop_schema , dict ):
244
+ return False
245
+ if "type" not in prop_schema :
246
+ return False
247
+
248
+ return True
249
+ except Exception :
250
+ return False
251
+
252
+ @staticmethod
253
+ def get_schema_info (schema : Dict [str , Any ]) -> Dict [str , Any ]:
254
+ """Get schema information from a JSON schema dictionary.
255
+
256
+ Args:
257
+ schema: JSON schema dictionary
258
+
259
+ Returns:
260
+ Dictionary containing schema information
261
+ """
262
+ try :
263
+ properties = schema .get ("properties" , {})
264
+ required_fields = schema .get ("required" , [])
265
+
266
+ # Analyze schema features
267
+ has_nested_objects = any (prop .get ("type" ) == "object" for prop in properties .values ())
268
+ has_arrays = any (prop .get ("type" ) == "array" for prop in properties .values ())
269
+ has_enums = any ("enum" in prop for prop in properties .values ())
270
+
271
+ return {
272
+ "type" : schema .get ("type" , "unknown" ),
273
+ "properties_count" : len (properties ),
274
+ "required_fields" : required_fields ,
275
+ "has_nested_objects" : has_nested_objects ,
276
+ "has_arrays" : has_arrays ,
277
+ "has_enums" : has_enums ,
278
+ }
279
+ except Exception as e :
280
+ logger .error ("Failed to get schema info: %s" , e )
281
+ return {
282
+ "type" : "unknown" ,
283
+ "properties_count" : 0 ,
284
+ "required_fields" : [],
285
+ "has_nested_objects" : False ,
286
+ "has_arrays" : False ,
287
+ "has_enums" : False ,
288
+ "error" : str (e ),
289
+ }
290
+
291
+ @staticmethod
292
+ def get_model_schema_info (model_class : Type [BaseModel ]) -> Dict [str , Any ]:
190
293
"""Get schema information from a Pydantic model.
191
294
192
295
Args:
0 commit comments