4
4
import math
5
5
import operator
6
6
import re
7
+ from copy import deepcopy
7
8
from fractions import Fraction
8
9
from functools import partial
9
10
from typing import Any , Callable , Dict , List , NoReturn , Optional , Set , Union
19
20
TYPE_STRINGS ,
20
21
HypothesisRefResolutionError ,
21
22
JSONType ,
23
+ LocalResolver ,
22
24
Schema ,
23
25
canonicalish ,
24
26
encode_canonical_json ,
43
45
44
46
45
47
def merged_as_strategies (
46
- schemas : List [Schema ], custom_formats : Optional [Dict [str , st .SearchStrategy [str ]]]
48
+ schemas : List [Schema ],
49
+ custom_formats : Optional [Dict [str , st .SearchStrategy [str ]]],
50
+ resolver : LocalResolver ,
47
51
) -> st .SearchStrategy [JSONType ]:
48
52
assert schemas , "internal error: must pass at least one schema to merge"
49
53
if len (schemas ) == 1 :
50
- return from_schema (schemas [0 ], custom_formats = custom_formats )
54
+ return from_schema (schemas [0 ], custom_formats = custom_formats , resolver = resolver )
51
55
# Try to merge combinations of strategies.
52
56
strats = []
53
57
combined : Set [str ] = set ()
@@ -61,7 +65,7 @@ def merged_as_strategies(
61
65
if s is not None and s != FALSEY :
62
66
validators = [make_validator (s ) for s in schemas ]
63
67
strats .append (
64
- from_schema (s , custom_formats = custom_formats ).filter (
68
+ from_schema (s , custom_formats = custom_formats , resolver = resolver ).filter (
65
69
lambda obj : all (v .is_valid (obj ) for v in validators )
66
70
)
67
71
)
@@ -73,14 +77,15 @@ def from_schema(
73
77
schema : Union [bool , Schema ],
74
78
* ,
75
79
custom_formats : Dict [str , st .SearchStrategy [str ]] = None ,
80
+ resolver : Optional [LocalResolver ] = None ,
76
81
) -> st .SearchStrategy [JSONType ]:
77
82
"""Take a JSON schema and return a strategy for allowed JSON objects.
78
83
79
84
Schema reuse with "definitions" and "$ref" is not yet supported, but
80
85
everything else in drafts 04, 05, and 07 is fully tested and working.
81
86
"""
82
87
try :
83
- return __from_schema (schema , custom_formats = custom_formats )
88
+ return __from_schema (schema , custom_formats = custom_formats , resolver = resolver )
84
89
except Exception as err :
85
90
error = err
86
91
@@ -113,9 +118,10 @@ def __from_schema(
113
118
schema : Union [bool , Schema ],
114
119
* ,
115
120
custom_formats : Dict [str , st .SearchStrategy [str ]] = None ,
121
+ resolver : Optional [LocalResolver ] = None ,
116
122
) -> st .SearchStrategy [JSONType ]:
117
123
try :
118
- schema = resolve_all_refs (schema )
124
+ schema = resolve_all_refs (schema , resolver = resolver )
119
125
except RecursionError :
120
126
raise HypothesisRefResolutionError (
121
127
f"Could not resolve recursive references in schema={ schema !r} "
@@ -142,6 +148,9 @@ def __from_schema(
142
148
}
143
149
custom_formats [_FORMATS_TOKEN ] = None # type: ignore
144
150
151
+ if resolver is None :
152
+ resolver = LocalResolver .from_schema (deepcopy (schema ))
153
+
145
154
schema = canonicalish (schema )
146
155
# Boolean objects are special schemata; False rejects all and True accepts all.
147
156
if schema == FALSEY :
@@ -156,32 +165,44 @@ def __from_schema(
156
165
157
166
assert isinstance (schema , dict )
158
167
# Now we handle as many validation keywords as we can...
168
+ if "$ref" in schema :
169
+ ref = schema ["$ref" ]
170
+
171
+ def _recurse () -> st .SearchStrategy [JSONType ]:
172
+ _ , resolved = resolver .resolve (ref ) # type: ignore
173
+ return from_schema (
174
+ resolved , custom_formats = custom_formats , resolver = resolver
175
+ )
176
+
177
+ return st .deferred (_recurse )
159
178
# Applying subschemata with boolean logic
160
179
if "not" in schema :
161
180
not_ = schema .pop ("not" )
162
181
assert isinstance (not_ , dict )
163
182
validator = make_validator (not_ ).is_valid
164
- return from_schema (schema , custom_formats = custom_formats ). filter (
165
- lambda v : not validator ( v )
166
- )
183
+ return from_schema (
184
+ schema , custom_formats = custom_formats , resolver = resolver
185
+ ). filter ( lambda v : not validator ( v ))
167
186
if "anyOf" in schema :
168
187
tmp = schema .copy ()
169
188
ao = tmp .pop ("anyOf" )
170
189
assert isinstance (ao , list )
171
- return st .one_of ([merged_as_strategies ([tmp , s ], custom_formats ) for s in ao ])
190
+ return st .one_of (
191
+ [merged_as_strategies ([tmp , s ], custom_formats , resolver ) for s in ao ]
192
+ )
172
193
if "allOf" in schema :
173
194
tmp = schema .copy ()
174
195
ao = tmp .pop ("allOf" )
175
196
assert isinstance (ao , list )
176
- return merged_as_strategies ([tmp ] + ao , custom_formats )
197
+ return merged_as_strategies ([tmp ] + ao , custom_formats , resolver )
177
198
if "oneOf" in schema :
178
199
tmp = schema .copy ()
179
200
oo = tmp .pop ("oneOf" )
180
201
assert isinstance (oo , list )
181
202
schemas = [merged ([tmp , s ]) for s in oo ]
182
203
return st .one_of (
183
204
[
184
- from_schema (s , custom_formats = custom_formats )
205
+ from_schema (s , custom_formats = custom_formats , resolver = resolver )
185
206
for s in schemas
186
207
if s is not None
187
208
]
@@ -199,8 +220,8 @@ def __from_schema(
199
220
"number" : number_schema ,
200
221
"integer" : integer_schema ,
201
222
"string" : partial (string_schema , custom_formats ),
202
- "array" : partial (array_schema , custom_formats ),
203
- "object" : partial (object_schema , custom_formats ),
223
+ "array" : partial (array_schema , custom_formats , resolver ),
224
+ "object" : partial (object_schema , custom_formats , resolver ),
204
225
}
205
226
assert set (map_ ) == set (TYPE_STRINGS )
206
227
return st .one_of ([map_ [t ](schema ) for t in get_type (schema )])
@@ -423,10 +444,14 @@ def string_schema(
423
444
424
445
425
446
def array_schema (
426
- custom_formats : Dict [str , st .SearchStrategy [str ]], schema : dict
447
+ custom_formats : Dict [str , st .SearchStrategy [str ]],
448
+ resolver : LocalResolver ,
449
+ schema : dict ,
427
450
) -> st .SearchStrategy [List [JSONType ]]:
428
451
"""Handle schemata for arrays."""
429
- _from_schema_ = partial (from_schema , custom_formats = custom_formats )
452
+ _from_schema_ = partial (
453
+ from_schema , custom_formats = custom_formats , resolver = resolver
454
+ )
430
455
items = schema .get ("items" , {})
431
456
additional_items = schema .get ("additionalItems" , {})
432
457
min_size = schema .get ("minItems" , 0 )
@@ -437,14 +462,16 @@ def array_schema(
437
462
if max_size is not None :
438
463
max_size -= len (items )
439
464
440
- items_strats = [_from_schema_ (s ) for s in items ]
465
+ items_strats = [_from_schema_ (s ) for s in deepcopy ( items ) ]
441
466
additional_items_strat = _from_schema_ (additional_items )
442
467
443
468
# If we have a contains schema to satisfy, we try generating from it when
444
469
# allowed to do so. We'll skip the None (unmergable / no contains) cases
445
470
# below, and let Hypothesis ignore the FALSEY cases for us.
446
471
if "contains" in schema :
447
- for i , mrgd in enumerate (merged ([schema ["contains" ], s ]) for s in items ):
472
+ for i , mrgd in enumerate (
473
+ merged ([schema ["contains" ], s ]) for s in deepcopy (items )
474
+ ):
448
475
if mrgd is not None :
449
476
items_strats [i ] |= _from_schema_ (mrgd )
450
477
contains_additional = merged ([schema ["contains" ], additional_items ])
@@ -481,10 +508,10 @@ def not_seen(elem: JSONType) -> bool:
481
508
st .lists (additional_items_strat , min_size = min_size , max_size = max_size ),
482
509
)
483
510
else :
484
- items_strat = _from_schema_ (items )
511
+ items_strat = _from_schema_ (deepcopy ( items ) )
485
512
if "contains" in schema :
486
513
contains_strat = _from_schema_ (schema ["contains" ])
487
- if merged ([items , schema ["contains" ]]) != schema ["contains" ]:
514
+ if merged ([deepcopy ( items ) , schema ["contains" ]]) != schema ["contains" ]:
488
515
# We only need this filter if we couldn't merge items in when
489
516
# canonicalising. Note that for list-items, above, we just skip
490
517
# the mixed generation in this case (because they tend to be
@@ -505,7 +532,9 @@ def not_seen(elem: JSONType) -> bool:
505
532
506
533
507
534
def object_schema (
508
- custom_formats : Dict [str , st .SearchStrategy [str ]], schema : dict
535
+ custom_formats : Dict [str , st .SearchStrategy [str ]],
536
+ resolver : LocalResolver ,
537
+ schema : dict ,
509
538
) -> st .SearchStrategy [Dict [str , JSONType ]]:
510
539
"""Handle a manageable subset of possible schemata for objects."""
511
540
required = schema .get ("required" , []) # required keys
@@ -519,7 +548,7 @@ def object_schema(
519
548
return st .builds (dict )
520
549
names ["type" ] = "string"
521
550
522
- properties = schema .get ("properties" , {}) # exact name: value schema
551
+ properties = deepcopy ( schema .get ("properties" , {}) ) # exact name: value schema
523
552
patterns = schema .get ("patternProperties" , {}) # regex for names: value schema
524
553
# schema for other values; handled specially if nothing matches
525
554
additional = schema .get ("additionalProperties" , {})
@@ -534,7 +563,7 @@ def object_schema(
534
563
st .sampled_from (sorted (dep_names ) + sorted (dep_schemas ) + sorted (properties ))
535
564
if (dep_names or dep_schemas or properties )
536
565
else st .nothing (),
537
- from_schema (names , custom_formats = custom_formats )
566
+ from_schema (names , custom_formats = custom_formats , resolver = resolver )
538
567
if additional_allowed
539
568
else st .nothing (),
540
569
st .one_of ([st .from_regex (p ) for p in sorted (patterns )]),
@@ -580,12 +609,20 @@ def from_object_schema(draw: Any) -> Any:
580
609
if re .search (rgx , string = key ) is not None
581
610
]
582
611
if key in properties :
583
- pattern_schemas .insert (0 , properties [key ])
612
+ pattern_schemas .insert (0 , deepcopy ( properties [key ]) )
584
613
585
614
if pattern_schemas :
586
- out [key ] = draw (merged_as_strategies (pattern_schemas , custom_formats ))
615
+ out [key ] = draw (
616
+ merged_as_strategies (pattern_schemas , custom_formats , resolver )
617
+ )
587
618
else :
588
- out [key ] = draw (from_schema (additional , custom_formats = custom_formats ))
619
+ out [key ] = draw (
620
+ from_schema (
621
+ deepcopy (additional ),
622
+ custom_formats = custom_formats ,
623
+ resolver = resolver ,
624
+ )
625
+ )
589
626
590
627
for k , v in dep_schemas .items ():
591
628
if k in out and not make_validator (v ).is_valid (out ):
0 commit comments