4
4
import math
5
5
import operator
6
6
import re
7
+ from copy import deepcopy
7
8
from fractions import Fraction
8
9
from functools import partial
9
10
from typing import Any , Callable , Dict , List , NoReturn , Optional , Set , Union
18
19
TRUTHY ,
19
20
TYPE_STRINGS ,
20
21
HypothesisRefResolutionError ,
22
+ JSONType ,
23
+ LocalResolver ,
21
24
Schema ,
22
25
canonicalish ,
23
26
get_integer_bounds ,
42
45
43
46
44
47
def merged_as_strategies (
45
- schemas : List [Schema ], custom_formats : Optional [Dict [str , st .SearchStrategy [str ]]]
48
+ schemas : List [Schema ],
49
+ custom_formats : Optional [Dict [str , st .SearchStrategy [str ]]],
50
+ resolver : LocalResolver ,
46
51
) -> st .SearchStrategy [JSONType ]:
47
52
assert schemas , "internal error: must pass at least one schema to merge"
48
53
if len (schemas ) == 1 :
49
- return from_schema (schemas [0 ], custom_formats = custom_formats )
54
+ return from_schema (schemas [0 ], custom_formats = custom_formats , resolver = resolver )
50
55
# Try to merge combinations of strategies.
51
56
strats = []
52
57
combined : Set [str ] = set ()
@@ -60,7 +65,7 @@ def merged_as_strategies(
60
65
if s is not None and s != FALSEY :
61
66
validators = [make_validator (s ) for s in schemas ]
62
67
strats .append (
63
- from_schema (s , custom_formats = custom_formats ).filter (
68
+ from_schema (s , custom_formats = custom_formats , resolver = resolver ).filter (
64
69
lambda obj : all (v .is_valid (obj ) for v in validators )
65
70
)
66
71
)
@@ -72,14 +77,15 @@ def from_schema(
72
77
schema : Union [bool , Schema ],
73
78
* ,
74
79
custom_formats : Dict [str , st .SearchStrategy [str ]] = None ,
80
+ resolver : Optional [LocalResolver ] = None ,
75
81
) -> st .SearchStrategy [JSONType ]:
76
82
"""Take a JSON schema and return a strategy for allowed JSON objects.
77
83
78
84
Schema reuse with "definitions" and "$ref" is not yet supported, but
79
85
everything else in drafts 04, 05, and 07 is fully tested and working.
80
86
"""
81
87
try :
82
- return __from_schema (schema , custom_formats = custom_formats )
88
+ return __from_schema (schema , custom_formats = custom_formats , resolver = resolver )
83
89
except Exception as err :
84
90
error = err
85
91
@@ -112,9 +118,10 @@ def __from_schema(
112
118
schema : Union [bool , Schema ],
113
119
* ,
114
120
custom_formats : Dict [str , st .SearchStrategy [str ]] = None ,
121
+ resolver : Optional [LocalResolver ] = None ,
115
122
) -> st .SearchStrategy [JSONType ]:
116
123
try :
117
- schema = resolve_all_refs (schema )
124
+ schema = resolve_all_refs (schema , resolver = resolver )
118
125
except RecursionError :
119
126
raise HypothesisRefResolutionError (
120
127
f"Could not resolve recursive references in schema={ schema !r} "
@@ -141,6 +148,9 @@ def __from_schema(
141
148
}
142
149
custom_formats [_FORMATS_TOKEN ] = None # type: ignore
143
150
151
+ if resolver is None :
152
+ resolver = LocalResolver .from_schema (deepcopy (schema ))
153
+
144
154
schema = canonicalish (schema )
145
155
# Boolean objects are special schemata; False rejects all and True accepts all.
146
156
if schema == FALSEY :
@@ -155,32 +165,44 @@ def __from_schema(
155
165
156
166
assert isinstance (schema , dict )
157
167
# Now we handle as many validation keywords as we can...
168
+ if "$ref" in schema :
169
+ ref = schema ["$ref" ]
170
+
171
+ def _recurse () -> st .SearchStrategy [JSONType ]:
172
+ _ , resolved = resolver .resolve (ref ) # type: ignore
173
+ return from_schema (
174
+ resolved , custom_formats = custom_formats , resolver = resolver
175
+ )
176
+
177
+ return st .deferred (_recurse )
158
178
# Applying subschemata with boolean logic
159
179
if "not" in schema :
160
180
not_ = schema .pop ("not" )
161
181
assert isinstance (not_ , dict )
162
182
validator = make_validator (not_ ).is_valid
163
- return from_schema (schema , custom_formats = custom_formats ). filter (
164
- lambda v : not validator ( v )
165
- )
183
+ return from_schema (
184
+ schema , custom_formats = custom_formats , resolver = resolver
185
+ ). filter ( lambda v : not validator ( v ))
166
186
if "anyOf" in schema :
167
187
tmp = schema .copy ()
168
188
ao = tmp .pop ("anyOf" )
169
189
assert isinstance (ao , list )
170
- return st .one_of ([merged_as_strategies ([tmp , s ], custom_formats ) for s in ao ])
190
+ return st .one_of (
191
+ [merged_as_strategies ([tmp , s ], custom_formats , resolver ) for s in ao ]
192
+ )
171
193
if "allOf" in schema :
172
194
tmp = schema .copy ()
173
195
ao = tmp .pop ("allOf" )
174
196
assert isinstance (ao , list )
175
- return merged_as_strategies ([tmp ] + ao , custom_formats )
197
+ return merged_as_strategies ([tmp ] + ao , custom_formats , resolver )
176
198
if "oneOf" in schema :
177
199
tmp = schema .copy ()
178
200
oo = tmp .pop ("oneOf" )
179
201
assert isinstance (oo , list )
180
202
schemas = [merged ([tmp , s ]) for s in oo ]
181
203
return st .one_of (
182
204
[
183
- from_schema (s , custom_formats = custom_formats )
205
+ from_schema (s , custom_formats = custom_formats , resolver = resolver )
184
206
for s in schemas
185
207
if s is not None
186
208
]
@@ -198,8 +220,8 @@ def __from_schema(
198
220
"number" : number_schema ,
199
221
"integer" : integer_schema ,
200
222
"string" : partial (string_schema , custom_formats ),
201
- "array" : partial (array_schema , custom_formats ),
202
- "object" : partial (object_schema , custom_formats ),
223
+ "array" : partial (array_schema , custom_formats , resolver ),
224
+ "object" : partial (object_schema , custom_formats , resolver ),
203
225
}
204
226
assert set (map_ ) == set (TYPE_STRINGS )
205
227
return st .one_of ([map_ [t ](schema ) for t in get_type (schema )])
@@ -422,10 +444,14 @@ def string_schema(
422
444
423
445
424
446
def array_schema (
425
- custom_formats : Dict [str , st .SearchStrategy [str ]], schema : dict
447
+ custom_formats : Dict [str , st .SearchStrategy [str ]],
448
+ resolver : LocalResolver ,
449
+ schema : dict ,
426
450
) -> st .SearchStrategy [List [JSONType ]]:
427
451
"""Handle schemata for arrays."""
428
- _from_schema_ = partial (from_schema , custom_formats = custom_formats )
452
+ _from_schema_ = partial (
453
+ from_schema , custom_formats = custom_formats , resolver = resolver
454
+ )
429
455
items = schema .get ("items" , {})
430
456
additional_items = schema .get ("additionalItems" , {})
431
457
min_size = schema .get ("minItems" , 0 )
@@ -436,14 +462,16 @@ def array_schema(
436
462
if max_size is not None :
437
463
max_size -= len (items )
438
464
439
- items_strats = [_from_schema_ (s ) for s in items ]
465
+ items_strats = [_from_schema_ (s ) for s in deepcopy ( items ) ]
440
466
additional_items_strat = _from_schema_ (additional_items )
441
467
442
468
# If we have a contains schema to satisfy, we try generating from it when
443
469
# allowed to do so. We'll skip the None (unmergable / no contains) cases
444
470
# below, and let Hypothesis ignore the FALSEY cases for us.
445
471
if "contains" in schema :
446
- for i , mrgd in enumerate (merged ([schema ["contains" ], s ]) for s in items ):
472
+ for i , mrgd in enumerate (
473
+ merged ([schema ["contains" ], s ]) for s in deepcopy (items )
474
+ ):
447
475
if mrgd is not None :
448
476
items_strats [i ] |= _from_schema_ (mrgd )
449
477
contains_additional = merged ([schema ["contains" ], additional_items ])
@@ -480,10 +508,10 @@ def not_seen(elem: JSONType) -> bool:
480
508
st .lists (additional_items_strat , min_size = min_size , max_size = max_size ),
481
509
)
482
510
else :
483
- items_strat = _from_schema_ (items )
511
+ items_strat = _from_schema_ (deepcopy ( items ) )
484
512
if "contains" in schema :
485
513
contains_strat = _from_schema_ (schema ["contains" ])
486
- if merged ([items , schema ["contains" ]]) != schema ["contains" ]:
514
+ if merged ([deepcopy ( items ) , schema ["contains" ]]) != schema ["contains" ]:
487
515
# We only need this filter if we couldn't merge items in when
488
516
# canonicalising. Note that for list-items, above, we just skip
489
517
# the mixed generation in this case (because they tend to be
@@ -504,7 +532,9 @@ def not_seen(elem: JSONType) -> bool:
504
532
505
533
506
534
def object_schema (
507
- custom_formats : Dict [str , st .SearchStrategy [str ]], schema : dict
535
+ custom_formats : Dict [str , st .SearchStrategy [str ]],
536
+ resolver : LocalResolver ,
537
+ schema : dict ,
508
538
) -> st .SearchStrategy [Dict [str , JSONType ]]:
509
539
"""Handle a manageable subset of possible schemata for objects."""
510
540
required = schema .get ("required" , []) # required keys
@@ -518,7 +548,7 @@ def object_schema(
518
548
return st .builds (dict )
519
549
names ["type" ] = "string"
520
550
521
- properties = schema .get ("properties" , {}) # exact name: value schema
551
+ properties = deepcopy ( schema .get ("properties" , {}) ) # exact name: value schema
522
552
patterns = schema .get ("patternProperties" , {}) # regex for names: value schema
523
553
# schema for other values; handled specially if nothing matches
524
554
additional = schema .get ("additionalProperties" , {})
@@ -533,7 +563,7 @@ def object_schema(
533
563
st .sampled_from (sorted (dep_names ) + sorted (dep_schemas ) + sorted (properties ))
534
564
if (dep_names or dep_schemas or properties )
535
565
else st .nothing (),
536
- from_schema (names , custom_formats = custom_formats )
566
+ from_schema (names , custom_formats = custom_formats , resolver = resolver )
537
567
if additional_allowed
538
568
else st .nothing (),
539
569
st .one_of ([st .from_regex (p ) for p in sorted (patterns )]),
@@ -579,12 +609,20 @@ def from_object_schema(draw: Any) -> Any:
579
609
if re .search (rgx , string = key ) is not None
580
610
]
581
611
if key in properties :
582
- pattern_schemas .insert (0 , properties [key ])
612
+ pattern_schemas .insert (0 , deepcopy ( properties [key ]) )
583
613
584
614
if pattern_schemas :
585
- out [key ] = draw (merged_as_strategies (pattern_schemas , custom_formats ))
615
+ out [key ] = draw (
616
+ merged_as_strategies (pattern_schemas , custom_formats , resolver )
617
+ )
586
618
else :
587
- out [key ] = draw (from_schema (additional , custom_formats = custom_formats ))
619
+ out [key ] = draw (
620
+ from_schema (
621
+ deepcopy (additional ),
622
+ custom_formats = custom_formats ,
623
+ resolver = resolver ,
624
+ )
625
+ )
588
626
589
627
for k , v in dep_schemas .items ():
590
628
if k in out and not make_validator (v ).is_valid (out ):
0 commit comments