diff --git a/dandischema/metadata.py b/dandischema/metadata.py index 190e473b..5be9294b 100644 --- a/dandischema/metadata.py +++ b/dandischema/metadata.py @@ -1,12 +1,12 @@ from copy import deepcopy from enum import Enum -from functools import lru_cache +from functools import cache from inspect import isclass import json from pathlib import Path from typing import Any, Dict, Iterable, Optional, TypeVar, Union, cast, get_args -import jsonschema +from jsonschema.protocols import Validator as JsonschemaValidator import pydantic import requests @@ -21,12 +21,16 @@ from .utils import ( TransitionalGenerateJsonSchema, _ensure_newline, + dandi_jsonschema_validator, + json_object_adapter, sanitize_value, strip_top_level_optional, + validate_json, version2tuple, ) -schema_map = { +# A mapping of the schema keys of DANDI models to the names of their JSON schema files +SCHEMA_MAP = { "Dandiset": "dandiset.json", "PublishedDandiset": "published-dandiset.json", "Asset": "asset.json", @@ -130,7 +134,7 @@ def publish_model_schemata(releasedir: Union[str, Path]) -> Path: version = models.get_schema_version() vdir = Path(releasedir, version) vdir.mkdir(exist_ok=True, parents=True) - for class_, filename in schema_map.items(): + for class_, filename in SCHEMA_MAP.items(): (vdir / filename).write_text( _ensure_newline( json.dumps( @@ -147,49 +151,122 @@ def publish_model_schemata(releasedir: Union[str, Path]) -> Path: return vdir -def _validate_obj_json(data: dict, schema: dict, missing_ok: bool = False) -> None: - validator: Union[jsonschema.Draft202012Validator, jsonschema.Draft7Validator] - - if version2tuple(data["schemaVersion"]) >= version2tuple("0.6.5"): - # schema version 0.7.0 and above is produced with Pydantic V2 - # which is compliant with JSON Schema Draft 2020-12 - validator = jsonschema.Draft202012Validator( - schema, format_checker=jsonschema.Draft202012Validator.FORMAT_CHECKER - ) - else: - validator = jsonschema.Draft7Validator( - schema, format_checker=jsonschema.Draft7Validator.FORMAT_CHECKER - ) - - error_list = [] - for error in sorted(validator.iter_errors(data), key=str): - if missing_ok and "is a required property" in error.message: - continue - error_list.append(error) - if error_list: - raise JsonschemaValidationError(error_list) +def _validate_obj_json( + instance: Any, validator: JsonschemaValidator, *, missing_ok: bool = False +) -> None: + """ + Validate a data instance using a jsonschema validator with an option to filter out + errors related to missing required properties + + :param instance: The data instance to validate + :param validator: The JSON schema validator to use + :param missing_ok: Indicates whether to filter out errors related to missing + required properties + :raises JsonschemaValidationError: If the metadata instance is invalid, and there + are errors detected in the validation, optionally discounting errors + related to missing required properties. An instance of this exception containing + a list of `jsonschema.exceptions.ValidationError` instances representing all the + (remaining) errors detected in the validation + """ + try: + validate_json(instance, validator) + except JsonschemaValidationError as e: + if missing_ok: + remaining_errs = [ + err for err in e.errors if "is a required property" not in err.message + ] + # Raise an exception only if there are errors left after filtering + if remaining_errs: + raise JsonschemaValidationError(remaining_errs) from e + else: + raise e def _validate_dandiset_json(data: dict, schema_dir: Union[str, Path]) -> None: with Path(schema_dir, "dandiset.json").open() as fp: schema = json.load(fp) - _validate_obj_json(data, schema) + _validate_obj_json(data, dandi_jsonschema_validator(schema)) def _validate_asset_json(data: dict, schema_dir: Union[str, Path]) -> None: with Path(schema_dir, "asset.json").open() as fp: schema = json.load(fp) - _validate_obj_json(data, schema) + _validate_obj_json(data, dandi_jsonschema_validator(schema)) + +@cache +def _get_jsonschema_validator( + schema_version: str, schema_key: str +) -> JsonschemaValidator: + """ + Get jsonschema validator for validating instances against a specific DANDI schema + + :param schema_version: The version of the specific DANDI schema + :param schema_key: The schema key that identifies the specific DANDI schema + :return: The jsonschema validator appropriate for validating instances against the + specific DANDI schema + :raises ValueError: If the provided schema version is among the allowed versions, + `ALLOWED_VALIDATION_SCHEMAS` + :raises ValueError: If the provided schema key is not among the keys in `SCHEMA_MAP` + :raises requests.HTTPError: If the schema cannot be fetched from the `dandi/schema` + repository + :raises RuntimeError: If the fetched schema is not a valid JSON object + """ + if schema_version not in ALLOWED_VALIDATION_SCHEMAS: + raise ValueError( + f"DANDI schema version {schema_version} is not allowed. " + f"Allowed are: {', '.join(ALLOWED_VALIDATION_SCHEMAS)}." + ) + if schema_key not in SCHEMA_MAP: + raise ValueError( + f"Schema key must be one of {', '.join(map(repr, SCHEMA_MAP.keys()))}" + ) -@lru_cache -def _get_schema(schema_version: str, schema_name: str) -> Any: - r = requests.get( - "https://raw.githubusercontent.com/dandi/schema/" - f"master/releases/{schema_version}/{schema_name}" + # Fetch the schema from the `dandi/schema` repository + schema_url = ( + f"https://raw.githubusercontent.com/dandi/schema/" + f"master/releases/{schema_version}/{SCHEMA_MAP[schema_key]}" ) + r = requests.get(schema_url) r.raise_for_status() - return r.json() + schema = r.json() + + # Validate that the retrieved schema is a valid JSON object, i.e., a dictionary + # This step is needed because the `jsonschema` package requires the schema to be a + # `Mapping[str, Any]` object + try: + json_object_adapter.validate_python(schema) + except pydantic.ValidationError as e: + msg = ( + f"The JSON schema at {schema_url} is not a valid JSON object. " + f"Received: {schema}" + ) + raise RuntimeError(msg) from e + + # Create a jsonschema validator for the schema + return dandi_jsonschema_validator(schema) + + +@cache +def _get_jsonschema_validator_local(schema_key: str) -> JsonschemaValidator: + """ + Get jsonschema validator for validating instances against a specific DANDI schema + generated from the corresponding locally defined Pydantic model + + :param schema_key: The schema key that identifies the specific DANDI schema + :raises ValueError: If the provided schema key is not among the keys in `SCHEMA_MAP` + """ + if schema_key not in SCHEMA_MAP: + raise ValueError( + f"Schema key must be one of {', '.join(map(repr, SCHEMA_MAP.keys()))}" + ) + + # The pydantic model with the specified schema key + m: type[pydantic.BaseModel] = getattr(models, schema_key) + + return dandi_jsonschema_validator( + m.model_json_schema(schema_generator=TransitionalGenerateJsonSchema) + ) def validate( @@ -232,25 +309,22 @@ def validate( if schema_key is None: raise ValueError("Provided object has no known schemaKey") schema_version = schema_version or obj.get("schemaVersion") - if schema_version not in ALLOWED_VALIDATION_SCHEMAS and schema_key in schema_map: + if schema_version not in ALLOWED_VALIDATION_SCHEMAS and schema_key in SCHEMA_MAP: raise ValueError( f"Metadata version {schema_version} is not allowed. " f"Allowed are: {', '.join(ALLOWED_VALIDATION_SCHEMAS)}." ) if json_validation: if schema_version == DANDI_SCHEMA_VERSION: - klass = getattr(models, schema_key) - schema = klass.model_json_schema( - schema_generator=TransitionalGenerateJsonSchema - ) + jvalidator = _get_jsonschema_validator_local(schema_key) else: - if schema_key not in schema_map: + if schema_key not in SCHEMA_MAP: raise ValueError( "Only dandisets and assets can be validated " "using json schema for older versions" ) - schema = _get_schema(schema_version, schema_map[schema_key]) - _validate_obj_json(obj, schema, missing_ok) + jvalidator = _get_jsonschema_validator(schema_version, schema_key) + _validate_obj_json(obj, jvalidator, missing_ok=missing_ok) klass = getattr(models, schema_key) try: klass(**obj) @@ -358,8 +432,7 @@ def migrate( # Optionally validate the instance against the DANDI schema it specifies # before migration if not skip_validation: - schema = _get_schema(obj_ver, "dandiset.json") - _validate_obj_json(obj, schema) + _validate_obj_json(obj, _get_jsonschema_validator(obj_ver, "Dandiset")) obj_migrated = deepcopy(obj) diff --git a/dandischema/tests/test_metadata.py b/dandischema/tests/test_metadata.py index 49262166..b85b3ef1 100644 --- a/dandischema/tests/test_metadata.py +++ b/dandischema/tests/test_metadata.py @@ -1,16 +1,26 @@ +from contextlib import nullcontext from hashlib import md5, sha256 import json from pathlib import Path from typing import Any, Dict, Optional, Sequence, Set +from unittest.mock import MagicMock, patch +from jsonschema.protocols import Validator as JsonschemaValidator +from pydantic import BaseModel import pytest +from dandischema.models import Asset, Dandiset, PublishedAsset, PublishedDandiset +from dandischema.utils import TransitionalGenerateJsonSchema, jsonschema_validator + from .utils import skipif_no_network from ..consts import DANDI_SCHEMA_VERSION from ..exceptions import JsonschemaValidationError, PydanticValidationError from ..metadata import ( + _get_jsonschema_validator, + _get_jsonschema_validator_local, _validate_asset_json, _validate_dandiset_json, + _validate_obj_json, aggregate_assets_summary, migrate, publish_model_schemata, @@ -666,3 +676,283 @@ def test_aggregation_bids() -> None: sum(_.get("name", "").startswith("OME/NGFF") for _ in summary["dataStandard"]) == 1 ) # only a single entry so we do not duplicate them + + +class TestValidateObjJson: + """ + Tests for `_validate_obj_json()` + """ + + @pytest.fixture + def dummy_jvalidator(self) -> JsonschemaValidator: + """Returns a dummy jsonschema validator initialized with a dummy schema.""" + return jsonschema_validator( + { + "type": "object", + "properties": {"name": {"type": "string"}}, + "required": ["name"], + }, + check_format=True, + ) + + @pytest.fixture + def dummy_instance(self) -> dict: + """Returns a dummy instance""" + return {"name": "Example"} + + def test_valid_obj_no_errors( + self, + monkeypatch: pytest.MonkeyPatch, + dummy_jvalidator: JsonschemaValidator, + dummy_instance: dict, + ) -> None: + """ + Test that `_validate_obj_json` does not raise when `validate_json` has no errors + """ + + def mock_validate_json(_instance: dict, _schema: dict) -> None: + """Simulate successful validation with no exceptions.""" + return # No error raised + + # Patch the validate_json function used inside `_validate_obj_json` + from dandischema import metadata + + monkeypatch.setattr(metadata, "validate_json", mock_validate_json) + + # `_validate_obj_json` should succeed without raising an exception + _validate_obj_json(dummy_instance, dummy_jvalidator) + + def test_raises_error_without_missing_ok( + self, + monkeypatch: pytest.MonkeyPatch, + dummy_jvalidator: JsonschemaValidator, + dummy_instance: dict, + ) -> None: + """ + Test that `_validate_obj_json` forwards JsonschemaValidationError + when `missing_ok=False`. + """ + + def mock_validate_json(_instance: dict, _schema: dict) -> None: + """Simulate validation error.""" + # Create a mock error that says a field is invalid + raise JsonschemaValidationError( + errors=[MagicMock(message="`name` is a required property")] + ) + + from dandischema import metadata + + monkeypatch.setattr(metadata, "validate_json", mock_validate_json) + + # Since `missing_ok=False`, any error should be re-raised. + with pytest.raises(JsonschemaValidationError) as excinfo: + _validate_obj_json(dummy_instance, dummy_jvalidator, missing_ok=False) + assert "`name` is a required property" == excinfo.value.errors[0].message + + @pytest.mark.parametrize( + ("validation_errs", "expect_raises", "expected_remaining_errs_count"), + [ + pytest.param( + [ + MagicMock(message="`name` is a required property"), + MagicMock(message="`title` is a required property ..."), + ], + False, + None, + id="no_remaining_errors", + ), + pytest.param( + [ + MagicMock(message="`name` is a required property"), + MagicMock(message="Some other validation error"), + ], + True, + 1, + id="one_remaining_error", + ), + ], + ) + def test_raises_only_nonmissing_errors_with_missing_ok( + self, + monkeypatch: pytest.MonkeyPatch, + dummy_jvalidator: JsonschemaValidator, + dummy_instance: dict, + validation_errs: list[MagicMock], + expect_raises: bool, + expected_remaining_errs_count: Optional[int], + ) -> None: + """ + Test that `_validate_obj_json` filters out 'is a required property' errors + when `missing_ok=True`. + """ + + def mock_validate_json(_instance: dict, _schema: dict) -> None: + """ + Simulate multiple validation errors, including missing required property. + """ + raise JsonschemaValidationError( + errors=validation_errs # type: ignore[arg-type] + ) + + from dandischema import metadata + + monkeypatch.setattr(metadata, "validate_json", mock_validate_json) + + # If expect_raises is True, we use pytest.raises(ValidationError) + # Otherwise, we enter a no-op context + ctx = ( + pytest.raises(JsonschemaValidationError) if expect_raises else nullcontext() + ) + + with ctx as excinfo: + _validate_obj_json(dummy_instance, dummy_jvalidator, missing_ok=True) + + if excinfo is not None: + filtered_errors = excinfo.value.errors + + # We expect the "required property" error to be filtered out, + # so we should only see the "Some other validation error". + assert len(filtered_errors) == expected_remaining_errs_count + + +class TestGetJsonschemaValidator: + @pytest.mark.parametrize( + "schema_version, schema_key, expected_error_msg", + [ + pytest.param( + "0.5.8", + "Dandiset", + "DANDI schema version 0.5.8 is not allowed", + id="invalid-schema-version", + ), + pytest.param( + "0.6.0", + "Nonexistent", + "Schema key must be one of", + id="invalid-schema-key", + ), + ], + ) + def test_invalid_parameters( + self, schema_version: str, schema_key: str, expected_error_msg: str + ) -> None: + """ + Test that providing an invalid schema version or key raises ValueError. + """ + # Clear the cache to avoid interference from previous calls + _get_jsonschema_validator.cache_clear() + with pytest.raises(ValueError, match=expected_error_msg): + _get_jsonschema_validator(schema_version, schema_key) + + def test_valid_schema(self) -> None: + """ + Test the valid case: + - requests.get() is patched directly using patch("requests.get") + - The returned JSON is a valid dict + - The resulting validator is produced via dandi_jsonschema_validator + """ + valid_version = "0.6.0" + valid_key = "Dandiset" + expected_url = ( + f"https://raw.githubusercontent.com/dandi/schema/master/releases/" + f"{valid_version}/dandiset.json" + ) + dummy_validator = MagicMock(spec=JsonschemaValidator) + valid_schema = {"type": "object"} + + with patch("requests.get") as mock_get, patch( + "dandischema.metadata.dandi_jsonschema_validator", + return_value=dummy_validator, + ) as mock_validator: + mock_response = MagicMock() + mock_response.raise_for_status.return_value = None + mock_response.json.return_value = valid_schema + mock_get.return_value = mock_response + + # Clear the cache to avoid interference from previous calls + _get_jsonschema_validator.cache_clear() + result = _get_jsonschema_validator(valid_version, valid_key) + + mock_get.assert_called_once_with(expected_url) + mock_response.raise_for_status.assert_called_once() + mock_response.json.assert_called_once() + mock_validator.assert_called_once_with(valid_schema) + assert result is dummy_validator + + def test_invalid_json_schema_raises_runtime_error(self) -> None: + """ + Test that if the fetched schema is not a valid JSON object, + then _get_jsonschema_validator() raises a RuntimeError. + """ + valid_version = "0.6.0" + valid_key = "Dandiset" + expected_url = ( + f"https://raw.githubusercontent.com/dandi/schema/master/releases/" + f"{valid_version}/dandiset.json" + ) + # Return a list (instead of a dict) to trigger a ValidationError in json_object_adapter + invalid_schema = {4: 2} + + with patch("requests.get") as mock_get: + mock_response = MagicMock() + mock_response.raise_for_status.return_value = None + mock_response.json.return_value = invalid_schema + mock_get.return_value = mock_response + + # Clear the cache to avoid interference from previous calls + _get_jsonschema_validator.cache_clear() + with pytest.raises(RuntimeError, match="not a valid JSON object"): + _get_jsonschema_validator(valid_version, valid_key) + + mock_get.assert_called_once_with(expected_url) + mock_response.raise_for_status.assert_called_once() + mock_response.json.assert_called_once() + + +class TestGetJsonschemaValidatorLocal: + @pytest.mark.parametrize( + ("schema_key", "pydantic_model"), + [ + pytest.param("Dandiset", Dandiset, id="valid-Dandiset"), + pytest.param( + "PublishedDandiset", PublishedDandiset, id="valid-PublishedDandiset" + ), + pytest.param("Asset", Asset, id="valid-Asset"), + pytest.param("PublishedAsset", PublishedAsset, id="valid-PublishedAsset"), + ], + ) + def test_valid_schema_keys( + self, schema_key: str, pydantic_model: type[BaseModel] + ) -> None: + # Get the expected schema from the corresponding model. + expected_schema = pydantic_model.model_json_schema( + schema_generator=TransitionalGenerateJsonSchema + ) + + # Clear the cache to avoid interference from previous calls + _get_jsonschema_validator_local.cache_clear() + + # Call the function under test. + validator = _get_jsonschema_validator_local(schema_key) + + # Assert that the returned validator has a 'schema' attribute + # equal to the expected schema. + assert validator.schema == expected_schema, ( + f"For schema key {schema_key!r}, expected schema:\n{expected_schema}\n" + f"but got:\n{validator.schema}" + ) + + @pytest.mark.parametrize( + "invalid_schema_key", + [ + pytest.param("Nonexistent", id="invalid-Nonexistent"), + pytest.param("", id="invalid-empty-string"), + pytest.param("InvalidKey", id="invalid-Key"), + ], + ) + def test_invalid_schema_keys(self, invalid_schema_key: str) -> None: + # Clear the cache to avoid interference from previous calls + _get_jsonschema_validator_local.cache_clear() + + with pytest.raises(ValueError, match="Schema key must be one of"): + _get_jsonschema_validator_local(invalid_schema_key) diff --git a/dandischema/tests/test_utils.py b/dandischema/tests/test_utils.py index 71d9e7ad..bff96744 100644 --- a/dandischema/tests/test_utils.py +++ b/dandischema/tests/test_utils.py @@ -1,12 +1,21 @@ -from typing import Dict, List, Optional, Union +from contextlib import nullcontext +from typing import Any, Dict, List, Optional, Union, cast +from unittest.mock import patch +from jsonschema.exceptions import SchemaError, ValidationError +from jsonschema.protocols import Validator as JsonschemaValidator +from jsonschema.validators import Draft7Validator, Draft202012Validator import pytest -from ..utils import ( +from dandischema.exceptions import JsonschemaValidationError +from dandischema.utils import ( _ensure_newline, + dandi_jsonschema_validator, + jsonschema_validator, name2title, sanitize_value, strip_top_level_optional, + validate_json, version2tuple, ) @@ -88,3 +97,382 @@ def test_sanitize_value() -> None: assert sanitize_value("A;B") == "A-B" assert sanitize_value("A\\/B") == "A--B" assert sanitize_value("A\"'B") == "A--B" + + +@pytest.fixture +def draft7_schema() -> dict: + """ + A minimal valid Draft 7 schema requiring a 'name' property of type 'string'. + """ + return { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {"name": {"type": "string"}}, + "required": ["name"], + } + + +@pytest.fixture +def draft202012_schema() -> dict: + """ + A minimal valid Draft 2020-12 schema requiring a 'title' property of type 'string'. + """ + return { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "properties": {"title": {"type": "string"}}, + "required": ["title"], + } + + +@pytest.fixture +def draft202012_format_schema() -> dict: + """ + Draft 2020-12 schema that includes a 'format' requirement (e.g., 'email'). + Used to test the 'check_format' parameter. + """ + return { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "properties": {"email": {"type": "string", "format": "email"}}, + "required": ["email"], + } + + +@pytest.fixture +def schema_no_dollar_schema() -> dict: + """ + Schema that lacks the '$schema' property altogether. + Used to test that 'default_cls' is applied. + """ + return { + "type": "object", + "properties": {"foo": {"type": "string"}}, + "required": ["foo"], + } + + +class TestJsonschemaValidator: + @pytest.mark.parametrize( + ("fixture_name", "expected_validator_cls"), + [ + pytest.param( + "draft202012_format_schema", Draft202012Validator, id="Draft202012" + ), + pytest.param("draft7_schema", Draft7Validator, id="Draft7"), + ], + ) + @pytest.mark.parametrize("check_format", [True, False]) + def test_set_by_dollar_schema( + self, + request: pytest.FixtureRequest, + fixture_name: str, + expected_validator_cls: type, + check_format: bool, + ) -> None: + """ + Test that the correct validator class is returned for different '$schema' values + """ + # Dynamically retrieve the appropriate fixture schema based on fixture_name + schema = request.getfixturevalue(fixture_name) + + validator = jsonschema_validator(schema, check_format=check_format) + + assert isinstance(validator, expected_validator_cls) + + @pytest.mark.parametrize( + ("check_format", "instance", "expect_raises"), + [ + (True, {"email": "test@example.com"}, False), + (True, {"email": "not-an-email"}, True), + (False, {"email": "not-an-email"}, False), + ], + ids=[ + "check_format=True, valid email", + "check_format=True, invalid email", + "check_format=False, invalid email", + ], + ) + def test_check_format_email_scenarios( + self, + draft202012_format_schema: dict, + check_format: bool, + instance: dict, + expect_raises: bool, + ) -> None: + """ + Parametrized test for check_format usage on valid/invalid email addresses under + Draft202012Validator. + """ + validator = jsonschema_validator( + draft202012_format_schema, check_format=check_format + ) + + # If expect_raises is True, we use pytest.raises(ValidationError) + # Otherwise, we enter a no-op context + ctx = pytest.raises(ValidationError) if expect_raises else nullcontext() + + with ctx: + validator.validate(instance) # Should raise or not raise as parametrized + + @pytest.mark.parametrize( + ("schema_fixture", "expected_validator_cls"), + [ + # Scenario 1: no $schema => we expect the default_cls=Draft7Validator is used + pytest.param("schema_no_dollar_schema", Draft7Validator, id="no-$schema"), + # Scenario 2: has $schema => draft 2020-12 overrides the default_cls + pytest.param("draft202012_schema", Draft202012Validator, id="with-$schema"), + ], + ) + def test_default_cls( + self, + request: pytest.FixtureRequest, + schema_fixture: str, + expected_validator_cls: type, + ) -> None: + """ + If the schema has no '$schema' property, and we provide a 'default_cls', + the returned validator should be an instance of that class. + + If the schema *does* have '$schema', then the default_cls is ignored, and + the validator class is inferred from the schema's '$schema' field. + """ + # Dynamically grab whichever fixture is specified by schema_fixture: + schema = request.getfixturevalue(schema_fixture) + + # Provide default_cls=Draft7Validator + validator = jsonschema_validator( + schema, + check_format=False, + default_cls=cast(type[JsonschemaValidator], Draft7Validator), + ) + assert isinstance(validator, expected_validator_cls) + + def test_invalid_schema_raises_schema_error(self) -> None: + """ + Provide an invalid schema, ensuring that 'SchemaError' is raised. + """ + invalid_schema = { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": 123, # 'type' must be string/array, so this is invalid + } + with pytest.raises(SchemaError): + jsonschema_validator(invalid_schema, check_format=False) + + +# --------------------------- +# Example validator fixtures +# --------------------------- +@pytest.fixture +def draft7_validator() -> JsonschemaValidator: + """ + A Draft 7 validator that requires a 'name' (type string). + """ + from jsonschema.validators import Draft7Validator + + schema = { + "type": "object", + "properties": {"name": {"type": "string"}}, + "required": ["name"], + } + return cast(JsonschemaValidator, Draft7Validator(schema)) + + +@pytest.fixture +def draft202012_validator() -> JsonschemaValidator: + """ + A Draft 2020-12 validator that requires a 'title' (type string). + """ + from jsonschema.validators import Draft202012Validator + + schema = { + "type": "object", + "properties": {"title": {"type": "string"}}, + "required": ["title"], + } + return cast(JsonschemaValidator, Draft202012Validator(schema)) + + +@pytest.fixture +def multiple_required_validator() -> JsonschemaValidator: + """ + A Draft 7 validator that requires *two* string properties: 'name' and 'title'. + This enables multiple errors in a single validation if both are missing. + """ + from jsonschema.validators import Draft7Validator + + schema = { + "type": "object", + "properties": { + "name": {"type": "string"}, + "title": {"type": "string"}, + }, + "required": ["name", "title"], + } + return cast(JsonschemaValidator, Draft7Validator(schema)) + + +class TestValidateJson: + @pytest.mark.parametrize( + "validator_fixture, instance", + [ + pytest.param( + "draft7_validator", + {"name": "Alice"}, + id="draft7_valid_instance", + ), + pytest.param( + "draft202012_validator", + {"title": "My Title"}, + id="draft202012_valid_instance", + ), + pytest.param( + "multiple_required_validator", + {"name": "Bob", "title": "Something"}, + id="multiple_required_valid_instance", + ), + ], + ) + def test_valid_instance( + self, + request: pytest.FixtureRequest, + validator_fixture: str, + instance: Dict[str, Any], + ) -> None: + """ + Test that a valid instance does not raise any exceptions. + """ + # Load the correct validator using `request.getfixturevalue` + validator: JsonschemaValidator = request.getfixturevalue(validator_fixture) + validate_json(instance, validator) # Should not raise + + @pytest.mark.parametrize( + "validator_fixture, instance, expected_error_count", + [ + # Single error: missing "name" + pytest.param( + "draft7_validator", + {}, + 1, + id="draft7_missing_name", + ), + # Single error: "name" has wrong type + pytest.param( + "draft7_validator", + {"name": 123}, + 1, + id="draft7_wrong_type_name", + ), + # Single error: missing "title" + pytest.param( + "draft202012_validator", + {}, + 1, + id="draft202012_missing_title", + ), + # Single error: "title" has wrong type + pytest.param( + "draft202012_validator", + {"title": 999}, + 1, + id="draft202012_wrong_type_title", + ), + # Multiple errors: missing both "name" and "title" + pytest.param( + "multiple_required_validator", + {}, + 2, + id="multiple_required_missing_both", + ), + # Another multiple error scenario: 'name' wrong type, 'title' missing + pytest.param( + "multiple_required_validator", + {"name": 123}, + 2, + id="multiple_required_wrong_type_and_missing", + ), + ], + ) + def test_invalid_instance( + self, + request: pytest.FixtureRequest, + validator_fixture: str, + instance: Dict[str, Any], + expected_error_count: int, + ) -> None: + """ + Tests that an invalid instance raises a JsonschemaValidationError. + Verifies that the number of validation errors matches `expected_error_count`. + """ + validator: JsonschemaValidator = request.getfixturevalue(validator_fixture) + + with pytest.raises(JsonschemaValidationError) as exc_info: + validate_json(instance, validator) + + errs = exc_info.value.errors + assert isinstance(errs, list), "Expected a list" + assert ( + len(errs) == expected_error_count + ), f"Expected {expected_error_count} error(s), got {len(errs)}" + assert all( + isinstance(err, ValidationError) for err in errs + ), "All errors must be `jsonschema.exceptions.ValidationError`" + + +class TestDandiJsonschemaValidator: + @pytest.mark.parametrize( + "version, expected_validator_cls", + [ + pytest.param("0.6.5", Draft202012Validator, id="version-0.6.5"), + pytest.param("0.7.0", Draft202012Validator, id="version-0.7.0"), + pytest.param("0.6.0", Draft7Validator, id="version-0.6.0"), + ], + ) + def test_dandi_jsonschema_validator_versions( + self, version: str, expected_validator_cls: JsonschemaValidator + ) -> None: + """ + Test that dandi_jsonschema_validator() selects the correct default validator + class based on the version specified in the schema's "schemaVersion" default. + """ + schema = {"properties": {"schemaVersion": {"default": version}}} + # Patch jsonschema_validator so we can intercept the call and + # verify the parameters. + with patch( + "dandischema.utils.jsonschema_validator", autospec=True + ) as mock_validator: + mock_validator.return_value = "dummy_validator_result" + result = cast(str, dandi_jsonschema_validator(schema)) + # Verify that the dummy return value is propagated. + assert result == "dummy_validator_result" + # Assert that jsonschema_validator was called with the expected parameters. + mock_validator.assert_called_once_with( + schema, + check_format=True, + default_cls=expected_validator_cls, + ) + + @pytest.mark.parametrize( + "schema", + [ + pytest.param({}, id="missing-properties"), + pytest.param( + {"properties": {}}, + id="missing-schemaVersion", + ), + pytest.param( + {"properties": {"schemaVersion": {}}}, + id="missing-default", + ), + ], + ) + def test_dandi_jsonschema_validator_missing_keys(self, schema: dict) -> None: + """ + Test that dandi_jsonschema_validator() raises a `ValueError` when the schema + does not have a 'schemaVersion' property that specifies the schema version with + a 'default' field. + """ + with pytest.raises( + ValueError, match="schema must has a 'schemaVersion' property" + ): + dandi_jsonschema_validator(schema) diff --git a/dandischema/utils.py b/dandischema/utils.py index f82fac44..076b34cd 100644 --- a/dandischema/utils.py +++ b/dandischema/utils.py @@ -1,11 +1,17 @@ from __future__ import annotations import re -from typing import Any, Iterator, List, Union, get_args, get_origin +from typing import Any, Iterator, List, Union, cast, get_args, get_origin +from jsonschema import Draft7Validator, Draft202012Validator +from jsonschema.protocols import Validator as JsonschemaValidator +from jsonschema.validators import validator_for +from pydantic import ConfigDict, TypeAdapter from pydantic.json_schema import GenerateJsonSchema, JsonSchemaMode, JsonSchemaValue from pydantic_core import CoreSchema, core_schema +from .exceptions import JsonschemaValidationError + TITLE_CASE_LOWER = { "a", "an", @@ -136,3 +142,102 @@ def sanitize_value(value: str, field: str = "non-extension", sub: str = "-") -> if field != "extension": value = value.replace(".", sub) return value + + +def dandi_jsonschema_validator(schema: dict[str, Any]) -> JsonschemaValidator: + """ + Create a JSON Schema validator appropriate for validating instances against the + JSON schema of a DANDI model + + :param schema: The JSON schema of the DANDI model to validate against + :return: The JSON schema validator + :raises ValueError: If the schema does not have a 'schemaVersion' property that + specifies the schema version with a 'default' field. + :raises jsonschema.exceptions.SchemaError: If the JSON schema is invalid + """ + if ( + "properties" not in schema + or "schemaVersion" not in schema["properties"] + or "default" not in schema["properties"]["schemaVersion"] + ): + msg = ( + "The schema must has a 'schemaVersion' property that specifies the schema " + "version with a 'default' field." + ) + raise ValueError(msg) + + default_validator_cls = cast( + type[JsonschemaValidator], + ( + Draft202012Validator + # `"schemaVersion"` 0.6.5 and above is produced with Pydantic V2 + # which is compliant with JSON Schema Draft 2020-12 + if ( + version2tuple(schema["properties"]["schemaVersion"]["default"]) + >= version2tuple("0.6.5") + ) + else Draft7Validator + ), + ) + + return jsonschema_validator( + schema, check_format=True, default_cls=default_validator_cls + ) + + +def jsonschema_validator( + schema: dict[str, Any], + *, + check_format: bool, + default_cls: type[JsonschemaValidator] | None = None, +) -> JsonschemaValidator: + """ + Create a jsonschema validator appropriate for validating instances against a given + JSON schema + + :param schema: The JSON schema to validate against + :param check_format: Indicates whether to check the format against format + specifications in the schema + :param default_cls: The default JSON schema validator class to use to create the + validator should the appropriate validator class cannot be determined based on + the schema (by assessing the `$schema` property). If `None`, the class + representing the latest JSON schema draft supported by the `jsonschema` package. + :return: The JSON schema validator + :raises jsonschema.exceptions.SchemaError: If the JSON schema is invalid + """ + # Retrieve appropriate validator class for validating the given schema + validator_cls: type[JsonschemaValidator] = ( + validator_for(schema, default_cls) + if default_cls is not None + else validator_for(schema) + ) + + # Ensure the schema is valid + validator_cls.check_schema(schema) + + if check_format: + # Return a validator with format checking enabled + return validator_cls(schema, format_checker=validator_cls.FORMAT_CHECKER) + + # Return a validator with format checking disabled + return validator_cls(schema) + + +def validate_json(instance: Any, validator: JsonschemaValidator) -> None: + """ + Validate a data instance using a jsonschema validator + + :param instance: The data instance to validate + :param validator: The JSON schema validator to use + :raises JsonschemaValidationError: If the metadata instance is invalid, an instance + of this exception containing a list of `jsonschema.exceptions.ValidationError` + instances representing all the errors detected in the validation is raised + """ + errs = sorted(validator.iter_errors(instance), key=str) + + if errs: + raise JsonschemaValidationError(errs) + + +# Pydantic type adapter for a JSON object, which is of type `dict[str, Any]` +json_object_adapter = TypeAdapter(dict[str, Any], config=ConfigDict(strict=True))