5
5
import operator
6
6
import re
7
7
from fractions import Fraction
8
+ from functools import partial
8
9
from typing import Any , Callable , Dict , List , NoReturn , Optional , Set , Union
9
10
10
11
import jsonschema
38
39
lambda strategy : st .lists (strategy , max_size = 3 )
39
40
| st .dictionaries (st .text (), strategy , max_size = 3 ),
40
41
)
42
+ _FORMATS_TOKEN = object ()
41
43
42
44
43
- def merged_as_strategies (schemas : List [Schema ]) -> st .SearchStrategy [JSONType ]:
45
+ def merged_as_strategies (
46
+ schemas : List [Schema ], custom_formats : Optional [Dict [str , st .SearchStrategy [str ]]]
47
+ ) -> st .SearchStrategy [JSONType ]:
44
48
assert schemas , "internal error: must pass at least one schema to merge"
45
49
if len (schemas ) == 1 :
46
- return from_schema (schemas [0 ])
50
+ return from_schema (schemas [0 ], custom_formats = custom_formats )
47
51
# Try to merge combinations of strategies.
48
52
strats = []
49
53
combined : Set [str ] = set ()
@@ -57,22 +61,26 @@ def merged_as_strategies(schemas: List[Schema]) -> st.SearchStrategy[JSONType]:
57
61
if s is not None and s != FALSEY :
58
62
validators = [make_validator (s ) for s in schemas ]
59
63
strats .append (
60
- from_schema (s ).filter (
64
+ from_schema (s , custom_formats = custom_formats ).filter (
61
65
lambda obj : all (v .is_valid (obj ) for v in validators )
62
66
)
63
67
)
64
68
combined .update (group )
65
69
return st .one_of (strats )
66
70
67
71
68
- def from_schema (schema : Union [bool , Schema ]) -> st .SearchStrategy [JSONType ]:
72
+ def from_schema (
73
+ schema : Union [bool , Schema ],
74
+ * ,
75
+ custom_formats : Dict [str , st .SearchStrategy [str ]] = None ,
76
+ ) -> st .SearchStrategy [JSONType ]:
69
77
"""Take a JSON schema and return a strategy for allowed JSON objects.
70
78
71
79
Schema reuse with "definitions" and "$ref" is not yet supported, but
72
80
everything else in drafts 04, 05, and 07 is fully tested and working.
73
81
"""
74
82
try :
75
- return __from_schema (schema )
83
+ return __from_schema (schema , custom_formats = custom_formats )
76
84
except Exception as err :
77
85
error = err
78
86
@@ -82,13 +90,58 @@ def error_raiser() -> NoReturn:
82
90
return st .builds (error_raiser )
83
91
84
92
85
- def __from_schema (schema : Union [bool , Schema ]) -> st .SearchStrategy [JSONType ]:
93
+ def _get_format_filter (
94
+ format_name : str ,
95
+ checker : jsonschema .FormatChecker ,
96
+ strategy : st .SearchStrategy [str ],
97
+ ) -> st .SearchStrategy [str ]:
98
+ def check_valid (string : str ) -> str :
99
+ try :
100
+ assert isinstance (string , str )
101
+ checker .check (string , format = format_name )
102
+ except (AssertionError , jsonschema .FormatError ) as err :
103
+ raise InvalidArgument (
104
+ f"Got string={ string !r} from strategy { strategy !r} , but this "
105
+ f"is not a valid value for the { format_name !r} checker."
106
+ ) from err
107
+ return string
108
+
109
+ return strategy .map (check_valid )
110
+
111
+
112
+ def __from_schema (
113
+ schema : Union [bool , Schema ],
114
+ * ,
115
+ custom_formats : Dict [str , st .SearchStrategy [str ]] = None ,
116
+ ) -> st .SearchStrategy [JSONType ]:
86
117
try :
87
118
schema = resolve_all_refs (schema )
88
119
except RecursionError :
89
120
raise HypothesisRefResolutionError (
90
121
f"Could not resolve recursive references in schema={ schema !r} "
91
122
) from None
123
+ # We check for _FORMATS_TOKEN to avoid re-validating known good data.
124
+ if custom_formats is not None and _FORMATS_TOKEN not in custom_formats :
125
+ assert isinstance (custom_formats , dict )
126
+ for name , strat in custom_formats .items ():
127
+ if not isinstance (name , str ):
128
+ raise InvalidArgument (f"format name { name !r} must be a string" )
129
+ if name in STRING_FORMATS :
130
+ raise InvalidArgument (f"Cannot redefine standard format { name !r} " )
131
+ if not isinstance (strat , st .SearchStrategy ):
132
+ raise InvalidArgument (
133
+ f"custom_formats[{ name !r} ]={ strat !r} must be a Hypothesis "
134
+ "strategy which generates strings matching this format."
135
+ )
136
+ format_checker = jsonschema .FormatChecker ()
137
+ custom_formats = {
138
+ name : _get_format_filter (name , format_checker , strategy )
139
+ if name in format_checker .checkers
140
+ else strategy
141
+ for name , strategy in custom_formats .items ()
142
+ }
143
+ custom_formats [_FORMATS_TOKEN ] = None # type: ignore
144
+
92
145
schema = canonicalish (schema )
93
146
# Boolean objects are special schemata; False rejects all and True accepts all.
94
147
if schema == FALSEY :
@@ -101,31 +154,38 @@ def __from_schema(schema: Union[bool, Schema]) -> st.SearchStrategy[JSONType]:
101
154
if schema ["$schema" ] == "http://json-schema.org/draft-03/schema#" :
102
155
raise InvalidArgument ("Draft-03 schemas are not supported" )
103
156
157
+ assert isinstance (schema , dict )
104
158
# Now we handle as many validation keywords as we can...
105
159
# Applying subschemata with boolean logic
106
160
if "not" in schema :
107
161
not_ = schema .pop ("not" )
108
162
assert isinstance (not_ , dict )
109
163
validator = make_validator (not_ ).is_valid
110
- return from_schema (schema ).filter (lambda v : not validator (v ))
164
+ return from_schema (schema , custom_formats = custom_formats ).filter (
165
+ lambda v : not validator (v )
166
+ )
111
167
if "anyOf" in schema :
112
168
tmp = schema .copy ()
113
169
ao = tmp .pop ("anyOf" )
114
170
assert isinstance (ao , list )
115
- return st .one_of ([merged_as_strategies ([tmp , s ]) for s in ao ])
171
+ return st .one_of ([merged_as_strategies ([tmp , s ], custom_formats ) for s in ao ])
116
172
if "allOf" in schema :
117
173
tmp = schema .copy ()
118
174
ao = tmp .pop ("allOf" )
119
175
assert isinstance (ao , list )
120
- return merged_as_strategies ([tmp ] + ao )
176
+ return merged_as_strategies ([tmp ] + ao , custom_formats )
121
177
if "oneOf" in schema :
122
178
tmp = schema .copy ()
123
179
oo = tmp .pop ("oneOf" )
124
180
assert isinstance (oo , list )
125
181
schemas = [merged ([tmp , s ]) for s in oo ]
126
- return st .one_of ([from_schema (s ) for s in schemas if s is not None ]).filter (
127
- make_validator (schema ).is_valid
128
- )
182
+ return st .one_of (
183
+ [
184
+ from_schema (s , custom_formats = custom_formats )
185
+ for s in schemas
186
+ if s is not None
187
+ ]
188
+ ).filter (make_validator (schema ).is_valid )
129
189
# Simple special cases
130
190
if "enum" in schema :
131
191
assert schema ["enum" ], "Canonicalises to non-empty list or FALSEY"
@@ -138,9 +198,9 @@ def __from_schema(schema: Union[bool, Schema]) -> st.SearchStrategy[JSONType]:
138
198
"boolean" : lambda _ : st .booleans (),
139
199
"number" : number_schema ,
140
200
"integer" : integer_schema ,
141
- "string" : string_schema ,
142
- "array" : array_schema ,
143
- "object" : object_schema ,
201
+ "string" : partial ( string_schema , custom_formats ) ,
202
+ "array" : partial ( array_schema , custom_formats ) ,
203
+ "object" : partial ( object_schema , custom_formats ) ,
144
204
}
145
205
assert set (map_ ) == set (TYPE_STRINGS )
146
206
return st .one_of ([map_ [t ](schema ) for t in get_type (schema )])
@@ -329,21 +389,22 @@ def relative_json_pointers() -> st.SearchStrategy[str]:
329
389
}
330
390
331
391
332
- def string_schema (schema : dict ) -> st .SearchStrategy [str ]:
392
+ def string_schema (
393
+ custom_formats : Dict [str , st .SearchStrategy [str ]], schema : dict
394
+ ) -> st .SearchStrategy [str ]:
333
395
"""Handle schemata for strings."""
334
396
# also https://json-schema.org/latest/json-schema-validation.html#rfc.section.7
335
397
min_size = schema .get ("minLength" , 0 )
336
398
max_size = schema .get ("maxLength" )
337
399
strategy = st .text (min_size = min_size , max_size = max_size )
338
- if schema .get ("format" ) in STRING_FORMATS :
400
+ known_formats = {** (custom_formats or {}), ** STRING_FORMATS }
401
+ if schema .get ("format" ) in known_formats :
339
402
# Unknown "format" specifiers should be ignored for validation.
340
403
# See https://json-schema.org/latest/json-schema-validation.html#format
341
- strategy = STRING_FORMATS [schema ["format" ]]
404
+ strategy = known_formats [schema ["format" ]]
342
405
if "pattern" in schema :
343
406
# This isn't really supported, but we'll do our best.
344
- strategy = strategy .filter (
345
- lambda s : re .search (schema ["pattern" ], string = s ) is not None
346
- )
407
+ strategy = strategy .filter (re .compile (schema ["pattern" ]).search )
347
408
elif "pattern" in schema :
348
409
try :
349
410
re .compile (schema ["pattern" ])
@@ -361,8 +422,11 @@ def string_schema(schema: dict) -> st.SearchStrategy[str]:
361
422
return strategy
362
423
363
424
364
- def array_schema (schema : dict ) -> st .SearchStrategy [List [JSONType ]]:
425
+ def array_schema (
426
+ custom_formats : Dict [str , st .SearchStrategy [str ]], schema : dict
427
+ ) -> st .SearchStrategy [List [JSONType ]]:
365
428
"""Handle schemata for arrays."""
429
+ _from_schema_ = partial (from_schema , custom_formats = custom_formats )
366
430
items = schema .get ("items" , {})
367
431
additional_items = schema .get ("additionalItems" , {})
368
432
min_size = schema .get ("minItems" , 0 )
@@ -373,19 +437,19 @@ def array_schema(schema: dict) -> st.SearchStrategy[List[JSONType]]:
373
437
if max_size is not None :
374
438
max_size -= len (items )
375
439
376
- items_strats = [from_schema (s ) for s in items ]
377
- additional_items_strat = from_schema (additional_items )
440
+ items_strats = [_from_schema_ (s ) for s in items ]
441
+ additional_items_strat = _from_schema_ (additional_items )
378
442
379
443
# If we have a contains schema to satisfy, we try generating from it when
380
444
# allowed to do so. We'll skip the None (unmergable / no contains) cases
381
445
# below, and let Hypothesis ignore the FALSEY cases for us.
382
446
if "contains" in schema :
383
447
for i , mrgd in enumerate (merged ([schema ["contains" ], s ]) for s in items ):
384
448
if mrgd is not None :
385
- items_strats [i ] |= from_schema (mrgd )
449
+ items_strats [i ] |= _from_schema_ (mrgd )
386
450
contains_additional = merged ([schema ["contains" ], additional_items ])
387
451
if contains_additional is not None :
388
- additional_items_strat |= from_schema (contains_additional )
452
+ additional_items_strat |= _from_schema_ (contains_additional )
389
453
390
454
if unique :
391
455
@@ -417,9 +481,9 @@ def not_seen(elem: JSONType) -> bool:
417
481
st .lists (additional_items_strat , min_size = min_size , max_size = max_size ),
418
482
)
419
483
else :
420
- items_strat = from_schema (items )
484
+ items_strat = _from_schema_ (items )
421
485
if "contains" in schema :
422
- contains_strat = from_schema (schema ["contains" ])
486
+ contains_strat = _from_schema_ (schema ["contains" ])
423
487
if merged ([items , schema ["contains" ]]) != schema ["contains" ]:
424
488
# We only need this filter if we couldn't merge items in when
425
489
# canonicalising. Note that for list-items, above, we just skip
@@ -440,7 +504,9 @@ def not_seen(elem: JSONType) -> bool:
440
504
return strat .filter (lambda val : any (contains (x ) for x in val ))
441
505
442
506
443
- def object_schema (schema : dict ) -> st .SearchStrategy [Dict [str , JSONType ]]:
507
+ def object_schema (
508
+ custom_formats : Dict [str , st .SearchStrategy [str ]], schema : dict
509
+ ) -> st .SearchStrategy [Dict [str , JSONType ]]:
444
510
"""Handle a manageable subset of possible schemata for objects."""
445
511
required = schema .get ("required" , []) # required keys
446
512
min_size = max (len (required ), schema .get ("minProperties" , 0 ))
@@ -468,7 +534,9 @@ def object_schema(schema: dict) -> st.SearchStrategy[Dict[str, JSONType]]:
468
534
st .sampled_from (sorted (dep_names ) + sorted (dep_schemas ) + sorted (properties ))
469
535
if (dep_names or dep_schemas or properties )
470
536
else st .nothing (),
471
- from_schema (names ) if additional_allowed else st .nothing (),
537
+ from_schema (names , custom_formats = custom_formats )
538
+ if additional_allowed
539
+ else st .nothing (),
472
540
st .one_of ([st .from_regex (p ) for p in sorted (patterns )]),
473
541
)
474
542
all_names_strategy = st .one_of ([s for s in name_strats if not s .is_empty ]).filter (
@@ -515,9 +583,9 @@ def from_object_schema(draw: Any) -> Any:
515
583
pattern_schemas .insert (0 , properties [key ])
516
584
517
585
if pattern_schemas :
518
- out [key ] = draw (merged_as_strategies (pattern_schemas ))
586
+ out [key ] = draw (merged_as_strategies (pattern_schemas , custom_formats ))
519
587
else :
520
- out [key ] = draw (from_schema (additional ))
588
+ out [key ] = draw (from_schema (additional , custom_formats = custom_formats ))
521
589
522
590
for k , v in dep_schemas .items ():
523
591
if k in out and not make_validator (v ).is_valid (out ):
0 commit comments