Skip to content
73 changes: 72 additions & 1 deletion cyclonedx/validation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


from abc import ABC, abstractmethod
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, Literal, Optional, Protocol, Union, overload

from ..schema import OutputFormat
Expand All @@ -27,13 +28,75 @@
from .xml import XmlValidator


def squeeze(text: str, size: int, replacement: str = ' ... ') -> str:
"""Replaces the middle of ``text`` with ``replacement``.

:param size: the length of the output, -1 to make no squeezing.
:return: potentially shorter text
:retval: ``text`` if ``size`` is -1 (for easy pass-through)
:retval: ``text`` if it is shorter than ``size``
:retval: ``text`` with the middle of it replaced with ``replacement``,
if ``text`` is longer, than ``size``

Raises error if ``replacement`` is longer than ``size``, and replacement
would happen.
"""
if size == -1:
return text

if size < len(replacement):
raise ValueError(f'squeeze: {size = } < {len(replacement) = }')

if len(text) <= size:
return text

left_size = (size - len(replacement)) // 2
right_size = size - len(replacement) - left_size
right_offset = len(text) - right_size

return f'{text[:left_size]}{replacement}{text[right_offset:]}'


class ValidationError:
"""Validation failed with this specific error.

Use :attr:`~data` to access the content.
You can use :attr:`~data` to access the raw error object, but prefer
other properties and functions, if possible.
"""

data: Any
"""Raw error data from one of the validation libraries."""

@property
def message(self) -> str:
"""The error message."""
return str(getattr(self.data, 'message', self))

@property
def path(self) -> str:
"""Path to the location of the problem in the document.

An XPath/JSONPath string.
"""
# only subclasses know how to extract this info
return str(getattr(self.data, 'path', ''))

def get_squeezed_message(self, *, context_limit: int = -1, max_size: int = -1, replacement: str = ' ... ') -> str:
"""Extracts, and sanitizes the error message.

Messages can be quite big from underlying libraries, as they sometimes
add context to the error message: both the input or the rule can be big.

This can be amended both in a generic and library specific ways.

:param max_size: squeeze message to this size.
:param context_limit: limit of tolerated context length.
:param replacement: to mark place of dropped text bit[s]

With the defaults, no squeezing happens.
"""
# subclasses may know how to do it better
return squeeze(self.message, max_size, replacement)

def __init__(self, data: Any) -> None:
self.data = data
Expand All @@ -58,6 +121,14 @@ def validate_str(self, data: str) -> Optional[ValidationError]:
"""
... # pragma: no cover

def iterate_errors(self, data: str) -> Iterable[ValidationError]:
"""Validate a string, enumerating all the problems.

:param data: the data string to validate
:return: iterator over the errors
"""
... # pragma: no cover


class BaseSchemabasedValidator(ABC, SchemabasedValidator):
"""Base Schema-based Validator"""
Expand Down
65 changes: 59 additions & 6 deletions cyclonedx/validation/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
__all__ = ['JsonValidator', 'JsonStrictValidator']

from abc import ABC
from collections.abc import Iterable
from json import loads as json_loads
from typing import TYPE_CHECKING, Any, Literal, Optional

Expand All @@ -29,11 +30,11 @@

from ..exception import MissingOptionalDependencyException
from ..schema._res import BOM_JSON as _S_BOM, BOM_JSON_STRICT as _S_BOM_STRICT, JSF as _S_JSF, SPDX_JSON as _S_SPDX
from . import BaseSchemabasedValidator, SchemabasedValidator, ValidationError
from . import BaseSchemabasedValidator, SchemabasedValidator, ValidationError, squeeze

_missing_deps_error: Optional[tuple[MissingOptionalDependencyException, ImportError]] = None
try:
from jsonschema.exceptions import ValidationError as JsonValidationError # type:ignore[import-untyped]
from jsonschema.exceptions import ValidationError as JsonSchemaValidationError # type:ignore[import-untyped]
from jsonschema.validators import Draft7Validator # type:ignore[import-untyped]
from referencing import Registry
from referencing.jsonschema import DRAFT7
Expand All @@ -47,6 +48,51 @@
), err


def _get_message_with_squeezed_context(error: 'JsonSchemaValidationError', context_limit: int, replacement: str) -> str:
# The below code depends on jsonschema internals, that messages are created
# like `yield ValidationError(f"{instance!r} has non-unique elements")`
# and tries to replace `{instance!r}` with a shortened version, if needed
message: str = error.message
if context_limit <= 0 or len(message) <= context_limit:
return message

repr_context = repr(error.instance)
if len(repr_context) <= context_limit:
return message

return message.replace(repr_context, squeeze(repr_context, context_limit, replacement))


class _JsonValidationError(ValidationError):
def get_squeezed_message(self, *, context_limit: int = -1, max_size: int = -1, replacement: str = ' ... ') -> str:
"""Extracts, and sanitizes the error message.

Messages can be quite big from underlying libraries, as they sometimes
add context to the error message..

This is amended both in a generic and library specific ways here.

:param max_size: squeeze message to this size.
:param context_limit: jsonschema messages most of the time include the
instance repr as context, which can be very big
(in the megabytes range), so an attempt is made to
shorten context to this size.
:param replacement: to mark place of dropped text bit[s]

With the defaults, no squeezing happens.
"""
message = _get_message_with_squeezed_context(self.data, context_limit, replacement)
return squeeze(message, max_size, replacement)

@property
def path(self) -> str:
"""Path to the location of the problem in the document.

An XPath/JSONPath string.
"""
return str(getattr(self.data, 'json_path', ''))


class _BaseJsonValidator(BaseSchemabasedValidator, ABC):
@property
def output_format(self) -> Literal[OutputFormat.JSON]:
Expand All @@ -62,17 +108,24 @@ def __init__(self, schema_version: 'SchemaVersion') -> None:
def validate_str(self, data: str) -> Optional[ValidationError]:
raise self.__MDERROR[0] from self.__MDERROR[1]

def iterate_errors(self, data: str) -> Iterable[ValidationError]:
raise self.__MDERROR[0] from self.__MDERROR[1]
else:
def iterate_errors(self, data: str) -> Iterable[ValidationError]:
json_data = json_loads(data)
validator = self._validator # may throw on error that MUST NOT be caught
yield from validator.iter_errors(json_data)

def validate_str(self, data: str) -> Optional[ValidationError]:
return self._validata_data(
return self._validate_data(
json_loads(data))

def _validata_data(self, data: Any) -> Optional[ValidationError]:
def _validate_data(self, data: Any) -> Optional[ValidationError]:
validator = self._validator # may throw on error that MUST NOT be caught
try:
validator.validate(data)
except JsonValidationError as error:
return ValidationError(error)
except JsonSchemaValidationError as error:
return _JsonValidationError(error)
return None

__validator: Optional['JsonSchemaValidator'] = None
Expand Down
19 changes: 16 additions & 3 deletions cyclonedx/validation/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
__all__ = ['XmlValidator']

from abc import ABC
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, Literal, Optional

from ..exception import MissingOptionalDependencyException
Expand Down Expand Up @@ -53,19 +54,31 @@ def __init__(self, schema_version: 'SchemaVersion') -> None:
# this is the def that is used for generating the documentation
super().__init__(schema_version)

if _missing_deps_error:
if _missing_deps_error: # noqa:C901
__MDERROR = _missing_deps_error

def validate_str(self, data: str) -> Optional[ValidationError]:
raise self.__MDERROR[0] from self.__MDERROR[1]

def iterate_errors(self, data: str) -> Iterable[ValidationError]:
raise self.__MDERROR[0] from self.__MDERROR[1]
else:
def iterate_errors(self, data: str) -> Iterable[ValidationError]:
xml_data = xml_fromstring( # nosec B320
bytes(data, encoding='utf8'),
parser=self.__xml_parser)
validator = self._validator # may throw on error that MUST NOT be caught
validator.validate(xml_data)
for error in validator.error_log:
yield ValidationError(error)

def validate_str(self, data: str) -> Optional[ValidationError]:
return self._validata_data(
return self._validate_data(
xml_fromstring( # nosec B320
bytes(data, encoding='utf8'),
parser=self.__xml_parser))

def _validata_data(self, data: Any) -> Optional[ValidationError]:
def _validate_data(self, data: Any) -> Optional[ValidationError]:
validator = self._validator # may throw on error that MUST NOT be caught
if not validator.validate(data):
return ValidationError(validator.error_log.last_error)
Expand Down
124 changes: 121 additions & 3 deletions tests/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
# Copyright (c) OWASP Foundation. All Rights Reserved.


import unittest
from itertools import product
from unittest import TestCase

from ddt import data, ddt, named_data, unpack

from cyclonedx.schema import OutputFormat, SchemaVersion
from cyclonedx.validation import make_schemabased_validator
from cyclonedx.validation import make_schemabased_validator, squeeze

UNDEFINED_FORMAT_VERSION = {
(OutputFormat.JSON, SchemaVersion.V1_1),
Expand All @@ -31,7 +31,7 @@


@ddt
class TestGetSchemabasedValidator(TestCase):
class TestGetSchemabasedValidator(unittest.TestCase):

@named_data(*([f'{f.name} {v.name}', f, v]
for f, v
Expand All @@ -51,3 +51,121 @@ def test_as_expected(self, of: OutputFormat, sv: SchemaVersion) -> None:
def test_fails_on_wrong_args(self, of: OutputFormat, sv: SchemaVersion, raises_regex: tuple) -> None:
with self.assertRaisesRegex(*raises_regex):
make_schemabased_validator(of, sv)


class TestSqueeze(unittest.TestCase):

def test_squeeze_size_minus_one_returns_original_text(self) -> None:
"""Test that size=-1 returns original text unchanged."""
self.assertEqual(squeeze('hello world', -1), 'hello world')
self.assertEqual(squeeze('', -1), '')
self.assertEqual(squeeze('a', -1), 'a')
self.assertEqual(squeeze('very long text that would normally be squeezed', -1),
'very long text that would normally be squeezed')

def test_squeeze_size_zero_returns_empty_text(self) -> None:
"""Test that size=-1 returns original text unchanged."""
self.assertEqual(squeeze('hello world', 0, ''), '')
self.assertEqual(squeeze('', 0, ''), '')

def test_squeeze_text_shorter_than_or_equal_size_returns_original(self) -> None:
"""Test that text shorter than or equal to size returns original text."""
self.assertEqual(squeeze('hello', 10), 'hello')
self.assertEqual(squeeze('hello', 5), 'hello')
self.assertEqual(squeeze('', 5), '')
self.assertEqual(squeeze('a', 5), 'a')
self.assertEqual(squeeze('ab', 10), 'ab')

def test_squeeze_with_default_replacement(self) -> None:
"""Test squeezing with default ' ... ' replacement."""
self.assertEqual(squeeze('hello world', 8), 'h ... ld')
self.assertEqual(squeeze('hello world', 7), 'h ... d')
self.assertEqual(squeeze('hello world', 9), 'he ... ld')
self.assertEqual(squeeze('hello world', 10), 'he ... rld')
self.assertEqual(squeeze('hello world', 11), 'hello world')

def test_squeeze_with_custom_replacement(self) -> None:
"""Test squeezing with custom replacement strings."""
self.assertEqual(squeeze('hello world', 8, '..'), 'hel..rld')
self.assertEqual(squeeze('hello world', 7, '..'), 'he..rld')
self.assertEqual(squeeze('hello world', 9, '---'), 'hel---rld')
self.assertEqual(squeeze('hello world', 10, 'XX'), 'hellXXorld')

def test_squeeze_with_single_character_replacement(self) -> None:
"""Test squeezing with single character replacement."""
self.assertEqual(squeeze('hello world', 5, '*'), 'he*ld')
self.assertEqual(squeeze('hello world', 6, '*'), 'he*rld')
self.assertEqual(squeeze('hello world', 7, '*'), 'hel*rld')

def test_squeeze_with_empty_replacement(self) -> None:
"""Test squeezing with empty replacement string."""
self.assertEqual(squeeze('hello world', 5, ''), 'herld')
self.assertEqual(squeeze('hello world', 6, ''), 'helrld')
self.assertEqual(squeeze('hello world', 7, ''), 'helorld')

def test_squeeze_replacement_equals_target_size(self) -> None:
"""Test when replacement string equals the target size."""
self.assertEqual(squeeze('hello world', 4, '....'), '....')
self.assertEqual(squeeze('hello world', 3, '***'), '***')

def test_squeeze_very_short_target_sizes(self) -> None:
"""Test edge cases with very short target sizes."""
self.assertEqual(squeeze('hello world', 5, '.'), 'he.ld')
self.assertEqual(squeeze('hello world', 6, '.'), 'he.rld')
self.assertEqual(squeeze('hello world', 1, 'X'), 'X')

def test_squeeze_with_long_text(self) -> None:
"""Test squeezing with very long text."""
long_text = 'a' * 100
result = squeeze(long_text, 10, '...')
self.assertEqual(len(result), 10)
self.assertEqual(result, 'aaa...aaaa')

# Test with different replacement
result2 = squeeze(long_text, 8, '--')
self.assertEqual(len(result2), 8)
self.assertEqual(result2, 'aaa--aaa')

def test_squeeze_size_distribution_even(self) -> None:
"""Test size distribution when remaining space is even."""
# size=8, replacement="--" (len=2), remaining=6, left=3, right=3
self.assertEqual(squeeze('abcdefghijk', 8, '--'), 'abc--ijk')
# size=10, replacement="...." (len=4), remaining=6, left=3, right=3
self.assertEqual(squeeze('abcdefghijk', 10, '....'), 'abc....ijk')

def test_squeeze_size_distribution_odd(self) -> None:
"""Test size distribution when remaining space is odd."""
# size=9, replacement="--" (len=2), remaining=7, left=3, right=4
self.assertEqual(squeeze('abcdefghijk', 9, '--'), 'abc--hijk')
# size=11, replacement="..." (len=3), remaining=8, left=4, right=4
self.assertEqual(squeeze('abcdefghijk', 11, '...'), 'abcdefghijk')

def test_squeeze_raises_error_when_replacement_too_long(self) -> None:
"""Test that ValueError is raised when replacement is longer than target size."""
with self.assertRaises(ValueError) as context:
squeeze('hello world', 3, ' ... ')
self.assertIn('size = 3 < len(replacement) = 5', str(context.exception))

with self.assertRaises(ValueError) as context:
squeeze('hello world', 2, 'abc')
self.assertIn('size = 2 < len(replacement) = 3', str(context.exception))

with self.assertRaises(ValueError) as context:
squeeze('hello world', 1, 'ab')
self.assertIn('size = 1 < len(replacement) = 2', str(context.exception))

def test_squeeze_error_when_replacement_long_but_no_squeeze_needed(self) -> None:
"""Test that no error is raised when replacement is long but text doesn't need squeezing."""
# Text is shorter than size, so no squeezing would occur,
# yet, the replacement is longer than the requested size, so error is raised
with self.assertRaises(ValueError) as context:
self.assertEqual(squeeze('abc', 10, 'very long replacement'), 'abc')
self.assertIn('size = 10 < len(replacement) = 21', str(context.exception))

with self.assertRaises(ValueError) as context:
self.assertEqual(squeeze('', 3, 'abcd'), '')
self.assertIn('size = 3 < len(replacement) = 4', str(context.exception))


if __name__ == '__main__':
unittest.main()
Loading