|
| 1 | +""" |
| 2 | +Canonicalisation logic for JSON schemas. |
| 3 | +
|
| 4 | +The canonical format that we transform to is not intended for human consumption. |
| 5 | +Instead, it prioritises locality of reasoning - for example, we convert oneOf |
| 6 | +arrays into an anyOf of allOf (each sub-schema being the original plus not anyOf |
| 7 | +the rest). Resolving references and merging subschemas is also really helpful. |
| 8 | +
|
| 9 | +All this effort is justified by the huge performance improvements that we get |
| 10 | +when converting to Hypothesis strategies. To the extent possible there is only |
| 11 | +one way to generate any given value... but much more importantly, we can do |
| 12 | +most things by construction instead of by filtering. That's the difference |
| 13 | +between "I'd like it to be faster" and "doesn't finish at all". |
| 14 | +""" |
| 15 | +from copy import deepcopy |
| 16 | +from typing import NoReturn, Union |
| 17 | + |
| 18 | +import jsonschema |
| 19 | +from hypothesis.errors import InvalidArgument |
| 20 | + |
| 21 | +from ._canonicalise import ( |
| 22 | + SCHEMA_KEYS, |
| 23 | + SCHEMA_OBJECT_KEYS, |
| 24 | + HypothesisRefResolutionError, |
| 25 | + Schema, |
| 26 | + canonicalish, |
| 27 | + merged, |
| 28 | +) |
| 29 | + |
| 30 | + |
| 31 | +class LocalResolver(jsonschema.RefResolver): |
| 32 | + def resolve_remote(self, uri: str) -> NoReturn: |
| 33 | + raise HypothesisRefResolutionError( |
| 34 | + f"hypothesis-jsonschema does not fetch remote references (uri={uri!r})" |
| 35 | + ) |
| 36 | + |
| 37 | + |
| 38 | +def resolve_all_refs( |
| 39 | + schema: Union[bool, Schema], *, resolver: LocalResolver = None |
| 40 | +) -> Schema: |
| 41 | + """ |
| 42 | + Resolve all references in the given schema. |
| 43 | +
|
| 44 | + This handles nested definitions, but not recursive definitions. |
| 45 | + The latter require special handling to convert to strategies and are much |
| 46 | + less common, so we just ignore them (and error out) for now. |
| 47 | + """ |
| 48 | + if isinstance(schema, bool): |
| 49 | + return canonicalish(schema) |
| 50 | + assert isinstance(schema, dict), schema |
| 51 | + if resolver is None: |
| 52 | + resolver = LocalResolver.from_schema(deepcopy(schema)) |
| 53 | + if not isinstance(resolver, jsonschema.RefResolver): |
| 54 | + raise InvalidArgument( |
| 55 | + f"resolver={resolver} (type {type(resolver).__name__}) is not a RefResolver" |
| 56 | + ) |
| 57 | + |
| 58 | + if "$ref" in schema: |
| 59 | + s = dict(schema) |
| 60 | + ref = s.pop("$ref") |
| 61 | + with resolver.resolving(ref) as got: |
| 62 | + m = merged([s, resolve_all_refs(got, resolver=resolver)]) |
| 63 | + if m is None: # pragma: no cover |
| 64 | + msg = f"$ref:{ref!r} had incompatible base schema {s!r}" |
| 65 | + raise HypothesisRefResolutionError(msg) |
| 66 | + assert "$ref" not in m |
| 67 | + return m |
| 68 | + assert "$ref" not in schema |
| 69 | + |
| 70 | + for key in SCHEMA_KEYS: |
| 71 | + val = schema.get(key, False) |
| 72 | + if isinstance(val, list): |
| 73 | + schema[key] = [ |
| 74 | + resolve_all_refs(v, resolver=resolver) if isinstance(v, dict) else v |
| 75 | + for v in val |
| 76 | + ] |
| 77 | + elif isinstance(val, dict): |
| 78 | + schema[key] = resolve_all_refs(val, resolver=resolver) |
| 79 | + else: |
| 80 | + assert isinstance(val, bool) |
| 81 | + for key in SCHEMA_OBJECT_KEYS: # values are keys-to-schema-dicts, not schemas |
| 82 | + if key in schema: |
| 83 | + subschema = schema[key] |
| 84 | + assert isinstance(subschema, dict) |
| 85 | + schema[key] = { |
| 86 | + k: resolve_all_refs(v, resolver=resolver) if isinstance(v, dict) else v |
| 87 | + for k, v in subschema.items() |
| 88 | + } |
| 89 | + assert isinstance(schema, dict) |
| 90 | + assert "$ref" not in schema |
| 91 | + return schema |
0 commit comments