Skip to content

Commit 5557900

Browse files
author
Ron Serruya
committed
Add jsonschema support
1 parent d87430d commit 5557900

File tree

3 files changed

+66
-3
lines changed

3 files changed

+66
-3
lines changed

setup.cfg

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[metadata]
22
name = aws-glue-schema-registry
3-
version = 1.0.0
3+
version = 1.1.0
44
description = Use the AWS Glue Schema Registry.
55
long_description = file: README.md
66
long_description_content_type = text/markdown
@@ -34,6 +34,8 @@ install_requires =
3434
boto3>=1.18.48
3535
typing-extensions>=3.10.0.2;python_version<"3.8"
3636
fastavro>=1.4.5
37+
orjson~=3.6.0
38+
fastjsonschema~=2.15
3739
setup_requires =
3840
setuptools
3941

@@ -52,3 +54,6 @@ ignore_missing_imports = True
5254

5355
[mypy-boto3.*]
5456
ignore_missing_imports = True
57+
58+
[mypy-fastjsonschema.*]
59+
ignore_missing_imports = True
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from __future__ import annotations
2+
3+
from typing import Union
4+
5+
import orjson
6+
7+
import fastjsonschema
8+
9+
from aws_schema_registry.schema import DataFormat, Schema, ValidationError
10+
11+
12+
class JsonSchema(Schema):
13+
"""Implementation of the `Schema` protocol for JSON schemas.
14+
15+
Arguments:
16+
definition: the schema, either as a parsed dict or a string
17+
"""
18+
19+
def __init__(self, definition: Union[str, dict]):
20+
if isinstance(definition, str):
21+
self._dict = orjson.loads(definition)
22+
else:
23+
self._dict = definition
24+
self._compiled_validation_method = fastjsonschema.compile(self._dict)
25+
26+
def __hash__(self):
27+
return hash(str(self))
28+
29+
def __eq__(self, other):
30+
return isinstance(other, JsonSchema) and \
31+
self._dict == other._dict
32+
33+
def __str__(self):
34+
return orjson.dumps(self._dict).decode()
35+
36+
def __repr__(self):
37+
return '<JsonSchema %s>' % self._dict
38+
39+
@property
40+
def data_format(self) -> DataFormat:
41+
return 'JSON'
42+
43+
@property
44+
def fqn(self) -> str:
45+
return ""
46+
47+
def read(self, bytes_: bytes):
48+
return orjson.loads(bytes_)
49+
50+
def write(self, data) -> bytes:
51+
return orjson.dumps(data)
52+
53+
def validate(self, data):
54+
try:
55+
self._compiled_validation_method(data)
56+
except fastjsonschema.exceptions.JsonSchemaValueException as e:
57+
raise ValidationError(str(e)) from e

src/aws_schema_registry/serde.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from uuid import UUID
77

88
from aws_schema_registry.avro import AvroSchema
9+
from aws_schema_registry.jsonschema import JsonSchema
910
from aws_schema_registry.client import SchemaRegistryClient
1011
from aws_schema_registry.codec import decode, encode, UnknownEncodingException
1112
from aws_schema_registry.exception import SchemaRegistryException
@@ -61,7 +62,7 @@ def serialize(self, topic: str, data_and_schema: DataAndSchema):
6162
if data_and_schema is None:
6263
return None
6364
if not isinstance(data_and_schema, tuple):
64-
raise TypeError('AvroSerializer can only serialize',
65+
raise TypeError('KafkaSerializer can only serialize',
6566
f' {tuple}, got {type(data_and_schema)}')
6667
data, schema = data_and_schema
6768
schema_version = self._get_schema_version(topic, schema)
@@ -136,4 +137,4 @@ def _schema_for_version(self, version: SchemaVersion) -> Schema:
136137
if version.data_format == 'AVRO':
137138
return AvroSchema(version.definition)
138139
elif version.data_format == 'JSON':
139-
raise NotImplementedError('JSON schema not supported')
140+
return JsonSchema(version.definition)

0 commit comments

Comments
 (0)