Skip to content

Commit 0c2de37

Browse files
authored
Optimization to cache json validators (#1918)
1 parent 5f70657 commit 0c2de37

File tree

1 file changed

+55
-28
lines changed

1 file changed

+55
-28
lines changed

src/confluent_kafka/schema_registry/json_schema.py

Lines changed: 55 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@
2424

2525
import httpx
2626
import referencing
27+
from cachetools import LRUCache
2728
from jsonschema import validate, ValidationError
29+
from jsonschema.protocols import Validator
30+
from jsonschema.validators import validator_for
2831
from referencing import Registry, Resource
2932
from referencing._core import Resolver
3033

@@ -209,7 +212,7 @@ class JSONSerializer(BaseSerializer):
209212
""" # noqa: E501
210213
__slots__ = ['_known_subjects', '_parsed_schema', '_ref_registry',
211214
'_schema', '_schema_id', '_schema_name', '_to_dict',
212-
'_parsed_schemas', '_validate']
215+
'_parsed_schemas', '_validators', '_validate']
213216

214217
_default_conf = {'auto.register.schemas': True,
215218
'normalize.schemas': False,
@@ -240,6 +243,7 @@ def __init__(
240243
self._schema_id = None
241244
self._known_subjects = set()
242245
self._parsed_schemas = ParsedSchemaCache()
246+
self._validators = LRUCache(1000)
243247

244248
if to_dict is not None and not callable(to_dict):
245249
raise ValueError("to_dict must be callable with the signature "
@@ -345,25 +349,24 @@ def __call__(self, obj: object, ctx: Optional[SerializationContext] = None) -> O
345349
value = obj
346350

347351
if latest_schema is not None:
352+
schema = latest_schema.schema
348353
parsed_schema, ref_registry = self._get_parsed_schema(latest_schema.schema)
349354
root_resource = Resource.from_contents(
350355
parsed_schema, default_specification=DEFAULT_SPEC)
351356
ref_resolver = ref_registry.resolver_with_root(root_resource)
352357
field_transformer = lambda rule_ctx, field_transform, msg: ( # noqa: E731
353-
transform(rule_ctx, parsed_schema, ref_resolver, "$", msg, field_transform))
358+
transform(rule_ctx, parsed_schema, ref_registry, ref_resolver, "$", msg, field_transform))
354359
value = self._execute_rules(ctx, subject, RuleMode.WRITE, None,
355360
latest_schema.schema, value, None,
356361
field_transformer)
357362
else:
363+
schema = self._schema
358364
parsed_schema, ref_registry = self._parsed_schema, self._ref_registry
359365

360366
if self._validate:
361367
try:
362-
if ref_registry:
363-
validate(instance=value, schema=parsed_schema,
364-
registry=ref_registry)
365-
else:
366-
validate(instance=value, schema=parsed_schema)
368+
validator = self._get_validator(schema, parsed_schema, ref_registry)
369+
validator.validate(value)
367370
except ValidationError as ve:
368371
raise SerializationError(ve.message)
369372

@@ -390,6 +393,18 @@ def _get_parsed_schema(self, schema: Schema) -> Tuple[Optional[JsonSchema], Opti
390393
self._parsed_schemas.set(schema, (parsed_schema, ref_registry))
391394
return parsed_schema, ref_registry
392395

396+
def _get_validator(self, schema: Schema, parsed_schema: JsonSchema, registry: Registry) -> Validator:
397+
validator = self._validators.get(schema, None)
398+
if validator is not None:
399+
return validator
400+
401+
cls = validator_for(parsed_schema)
402+
cls.check_schema(parsed_schema)
403+
validator = cls(parsed_schema, registry=registry)
404+
405+
self._validators[schema] = validator
406+
return validator
407+
393408

394409
class JSONDeserializer(BaseDeserializer):
395410
"""
@@ -442,7 +457,7 @@ class JSONDeserializer(BaseDeserializer):
442457
""" # noqa: E501
443458

444459
__slots__ = ['_reader_schema', '_ref_registry', '_from_dict', '_schema',
445-
'_parsed_schemas', '_validate']
460+
'_parsed_schemas', '_validators', '_validate']
446461

447462
_default_conf = {'use.latest.version': False,
448463
'use.latest.with.metadata': None,
@@ -479,6 +494,7 @@ def __init__(
479494
self._registry = schema_registry_client
480495
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
481496
self._parsed_schemas = ParsedSchemaCache()
497+
self._validators = LRUCache(1000)
482498

483499
conf_copy = self._default_conf.copy()
484500
if conf is not None:
@@ -593,18 +609,16 @@ def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> U
593609
reader_schema, default_specification=DEFAULT_SPEC)
594610
reader_ref_resolver = reader_ref_registry.resolver_with_root(reader_root_resource)
595611
field_transformer = lambda rule_ctx, field_transform, message: ( # noqa: E731
596-
transform(rule_ctx, reader_schema, reader_ref_resolver, "$", message, field_transform))
612+
transform(rule_ctx, reader_schema, reader_ref_registry, reader_ref_resolver,
613+
"$", message, field_transform))
597614
obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None,
598615
reader_schema_raw, obj_dict, None,
599616
field_transformer)
600617

601618
if self._validate:
602619
try:
603-
if reader_ref_registry:
604-
validate(instance=obj_dict, schema=reader_schema,
605-
registry=reader_ref_registry)
606-
else:
607-
validate(instance=obj_dict, schema=reader_schema)
620+
validator = self._get_validator(reader_schema_raw, reader_schema, reader_ref_registry)
621+
validator.validate(obj_dict)
608622
except ValidationError as ve:
609623
raise SerializationError(ve.message)
610624

@@ -627,9 +641,21 @@ def _get_parsed_schema(self, schema: Schema) -> Tuple[Optional[JsonSchema], Opti
627641
self._parsed_schemas.set(schema, (parsed_schema, ref_registry))
628642
return parsed_schema, ref_registry
629643

644+
def _get_validator(self, schema: Schema, parsed_schema: JsonSchema, registry: Registry) -> Validator:
645+
validator = self._validators.get(schema, None)
646+
if validator is not None:
647+
return validator
648+
649+
cls = validator_for(parsed_schema)
650+
cls.check_schema(parsed_schema)
651+
validator = cls(parsed_schema, registry=registry)
652+
653+
self._validators[schema] = validator
654+
return validator
655+
630656

631657
def transform(
632-
ctx: RuleContext, schema: JsonSchema, ref_resolver: Resolver,
658+
ctx: RuleContext, schema: JsonSchema, ref_registry: Registry, ref_resolver: Resolver,
633659
path: str, message: JsonMessage, field_transform: FieldTransform
634660
) -> Optional[JsonMessage]:
635661
if message is None or schema is None or isinstance(schema, bool):
@@ -639,34 +665,34 @@ def transform(
639665
field_ctx.field_type = get_type(schema)
640666
all_of = schema.get("allOf")
641667
if all_of is not None:
642-
subschema = _validate_subschemas(all_of, message)
668+
subschema = _validate_subschemas(all_of, message, ref_registry)
643669
if subschema is not None:
644-
return transform(ctx, subschema, ref_resolver, path, message, field_transform)
670+
return transform(ctx, subschema, ref_registry, ref_resolver, path, message, field_transform)
645671
any_of = schema.get("anyOf")
646672
if any_of is not None:
647-
subschema = _validate_subschemas(any_of, message)
673+
subschema = _validate_subschemas(any_of, message, ref_registry)
648674
if subschema is not None:
649-
return transform(ctx, subschema, ref_resolver, path, message, field_transform)
675+
return transform(ctx, subschema, ref_registry, ref_resolver, path, message, field_transform)
650676
one_of = schema.get("oneOf")
651677
if one_of is not None:
652-
subschema = _validate_subschemas(one_of, message)
678+
subschema = _validate_subschemas(one_of, message, ref_registry)
653679
if subschema is not None:
654-
return transform(ctx, subschema, ref_resolver, path, message, field_transform)
680+
return transform(ctx, subschema, ref_registry, ref_resolver, path, message, field_transform)
655681
items = schema.get("items")
656682
if items is not None:
657683
if isinstance(message, list):
658-
return [transform(ctx, items, ref_resolver, path, item, field_transform) for item in message]
684+
return [transform(ctx, items, ref_registry, ref_resolver, path, item, field_transform) for item in message]
659685
ref = schema.get("$ref")
660686
if ref is not None:
661687
ref_schema = ref_resolver.lookup(ref)
662-
return transform(ctx, ref_schema.contents, ref_resolver, path, message, field_transform)
688+
return transform(ctx, ref_schema.contents, ref_registry, ref_resolver, path, message, field_transform)
663689
schema_type = get_type(schema)
664690
if schema_type == FieldType.RECORD:
665691
props = schema.get("properties")
666692
if props is not None:
667693
for prop_name, prop_schema in props.items():
668694
_transform_field(ctx, path, prop_name, message,
669-
prop_schema, ref_resolver, field_transform)
695+
prop_schema, ref_registry, ref_resolver, field_transform)
670696
return message
671697
if schema_type in (FieldType.ENUM, FieldType.STRING, FieldType.INT, FieldType.DOUBLE, FieldType.BOOLEAN):
672698
if field_ctx is not None:
@@ -678,7 +704,7 @@ def transform(
678704

679705
def _transform_field(
680706
ctx: RuleContext, path: str, prop_name: str, message: JsonMessage,
681-
prop_schema: JsonSchema, ref_resolver: Resolver, field_transform: FieldTransform
707+
prop_schema: JsonSchema, ref_registry: Registry, ref_resolver: Resolver, field_transform: FieldTransform
682708
):
683709
full_name = path + "." + prop_name
684710
try:
@@ -690,7 +716,7 @@ def _transform_field(
690716
get_inline_tags(prop_schema)
691717
)
692718
value = message[prop_name]
693-
new_value = transform(ctx, prop_schema, ref_resolver, full_name, value, field_transform)
719+
new_value = transform(ctx, prop_schema, ref_registry, ref_resolver, full_name, value, field_transform)
694720
if ctx.rule.kind == RuleKind.CONDITION:
695721
if new_value is False:
696722
raise RuleConditionError(ctx.rule)
@@ -702,11 +728,12 @@ def _transform_field(
702728

703729
def _validate_subschemas(
704730
subschemas: List[JsonSchema],
705-
message: JsonMessage
731+
message: JsonMessage,
732+
registry: Registry
706733
) -> Optional[JsonSchema]:
707734
for subschema in subschemas:
708735
try:
709-
validate(instance=message, schema=subschema)
736+
validate(instance=message, schema=subschema, registry=registry)
710737
return subschema
711738
except ValidationError:
712739
pass

0 commit comments

Comments
 (0)