Skip to content
26 changes: 23 additions & 3 deletions scene_common/src/scene_common/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,28 @@
# SPDX-License-Identifier: Apache-2.0

import json
import uuid
from jsonschema import FormatChecker
from fastjsonschema import compile

def _validateUuidFormat(instance):
"""Validate UUID format (accepts UUIDv1-v5)"""
try:
uuid.UUID(instance)
except (ValueError, AttributeError, TypeError):
raise ValueError(f"Invalid UUID format: {instance}")

def _adaptJsonschemaChecker(checker_func):
"""Adapt jsonschema format checker to work with fastjsonschema"""
def wrapper(instance):
try:
result = checker_func(instance)
if not result:
raise ValueError("Format validation failed")
except Exception as e:
raise ValueError(f"Format validation failed: {e}")
return wrapper

class SchemaValidation:
def __init__(self, schema_path, is_multi_message=False):
self.mqtt_schema = None
Expand All @@ -21,7 +40,8 @@ def compileValidators(self):
for key in checker.checkers:
formatType = checker.checkers[key][0]
if key not in formats:
formats[key] = formatType
formats[key] = _adaptJsonschemaChecker(formatType)
formats['uuid'] = _validateUuidFormat

if not self.mqtt_schema:
raise Exception("Schema not available")
Expand All @@ -35,10 +55,10 @@ def compileValidators(self):
defs_key: self.mqtt_schema[defs_key]
}
self.validator[key] = compile(sub_schema, formats=formats)
self.validator_no_format[key] = compile(sub_schema)
self.validator_no_format[key] = compile(sub_schema, formats=formats, use_formats=False)
else:
self.validator[None] = compile(self.mqtt_schema, formats=formats)
self.validator_no_format[None] = compile(self.mqtt_schema)
self.validator_no_format[None] = compile(self.mqtt_schema, formats=formats, use_formats=False)
return

def loadSchema(self, schema_path):
Expand Down
Loading