Skip to content

Commit eff6c8a

Browse files
authored
Merge pull request #6 from LushaTeam/main
Add JsonSchema support
2 parents d87430d + 706b8c8 commit eff6c8a

File tree

11 files changed

+368
-34
lines changed

11 files changed

+368
-34
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ Feature | Java Library | Python Library | Notes
1414
:------ | :----------- | :------------- | :----
1515
Serialization and deserialization using schema registry | ✔️ | ✔️
1616
Avro message format | ✔️ | ✔️
17-
JSON Schema message format | ✔️ |
17+
JSON Schema message format | ✔️ | ✔️
1818
Kafka Streams support | ✔️ | | N/A for Python, Kafka Streams is Java-only
1919
Compression | ✔️ | ✔️ |
2020
Local schema cache | ✔️ | ✔️

setup.cfg

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[metadata]
22
name = aws-glue-schema-registry
3-
version = 1.0.0
3+
version = 1.1.0
44
description = Use the AWS Glue Schema Registry.
55
long_description = file: README.md
66
long_description_content_type = text/markdown
@@ -34,6 +34,8 @@ install_requires =
3434
boto3>=1.18.48
3535
typing-extensions>=3.10.0.2;python_version<"3.8"
3636
fastavro>=1.4.5
37+
orjson~=3.6.0
38+
fastjsonschema~=2.15
3739
setup_requires =
3840
setuptools
3941

@@ -52,3 +54,6 @@ ignore_missing_imports = True
5254

5355
[mypy-boto3.*]
5456
ignore_missing_imports = True
57+
58+
[mypy-fastjsonschema.*]
59+
ignore_missing_imports = True
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from __future__ import annotations
2+
3+
from typing import Union
4+
5+
import orjson
6+
7+
import fastjsonschema
8+
9+
from aws_schema_registry.schema import DataFormat, Schema, ValidationError
10+
11+
12+
class JsonSchema(Schema):
13+
"""Implementation of the `Schema` protocol for JSON schemas.
14+
15+
Arguments:
16+
definition: the schema, either as a parsed dict or a string
17+
"""
18+
19+
def __init__(self, definition: Union[str, dict]):
20+
if isinstance(definition, str):
21+
self._dict = orjson.loads(definition)
22+
else:
23+
self._dict = definition
24+
self._compiled_validation_method = fastjsonschema.compile(self._dict)
25+
26+
def __hash__(self):
27+
return hash(str(self))
28+
29+
def __eq__(self, other):
30+
return isinstance(other, JsonSchema) and \
31+
self._dict == other._dict
32+
33+
def __str__(self):
34+
return orjson.dumps(self._dict).decode()
35+
36+
def __repr__(self):
37+
return '<JsonSchema %s>' % self._dict
38+
39+
@property
40+
def data_format(self) -> DataFormat:
41+
return 'JSON'
42+
43+
@property
44+
def fqn(self) -> str:
45+
return ""
46+
47+
def read(self, bytes_: bytes):
48+
data = orjson.loads(bytes_)
49+
self.validate(data)
50+
return data
51+
52+
def write(self, data) -> bytes:
53+
self.validate(data)
54+
return orjson.dumps(data)
55+
56+
def validate(self, data):
57+
try:
58+
self._compiled_validation_method(data)
59+
except fastjsonschema.exceptions.JsonSchemaValueException as e:
60+
raise ValidationError(str(e)) from e

src/aws_schema_registry/serde.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from uuid import UUID
77

88
from aws_schema_registry.avro import AvroSchema
9+
from aws_schema_registry.jsonschema import JsonSchema
910
from aws_schema_registry.client import SchemaRegistryClient
1011
from aws_schema_registry.codec import decode, encode, UnknownEncodingException
1112
from aws_schema_registry.exception import SchemaRegistryException
@@ -61,7 +62,7 @@ def serialize(self, topic: str, data_and_schema: DataAndSchema):
6162
if data_and_schema is None:
6263
return None
6364
if not isinstance(data_and_schema, tuple):
64-
raise TypeError('AvroSerializer can only serialize',
65+
raise TypeError('KafkaSerializer can only serialize',
6566
f' {tuple}, got {type(data_and_schema)}')
6667
data, schema = data_and_schema
6768
schema_version = self._get_schema_version(topic, schema)
@@ -136,4 +137,4 @@ def _schema_for_version(self, version: SchemaVersion) -> Schema:
136137
if version.data_format == 'AVRO':
137138
return AvroSchema(version.definition)
138139
elif version.data_format == 'JSON':
139-
raise NotImplementedError('JSON schema not supported')
140+
return JsonSchema(version.definition)

tests/integration/java/src/main/java/com/disasteraware/aws/schemaregistry/App.java

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,29 +25,56 @@ public class App {
2525
static Map<String, Object> configs = new HashMap<>();
2626

2727
public static void main(String[] args) {
28-
configs.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
28+
String dataFormat = Objects.requireNonNull(System.getenv("DATA_FORMAT"));
2929
configs.put(AWSSchemaRegistryConstants.AWS_REGION, Objects.requireNonNull(System.getenv("AWS_REGION")));
3030
configs.put(AWSSchemaRegistryConstants.REGISTRY_NAME, Objects.requireNonNull(System.getenv("REGISTRY_NAME")));
31-
//configs.put(AWSSchemaRegistryConstants.SCHEMA_NAME, Objects.requireNonNull(System.getenv("SCHEMA_NAME")));
31+
configs.put(AWSSchemaRegistryConstants.SCHEMA_NAME, Objects.requireNonNull(System.getenv("SCHEMA_NAME")));
3232
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
33-
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
34-
try {
35-
byte[] bytes;
36-
GenericRecord record;
37-
Schema schema;
3833

39-
bytes = System.in.readAllBytes();
34+
if (dataFormat.equals("AVRO")) {
35+
configs.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
36+
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
37+
try {
38+
byte[] bytes;
39+
GenericRecord record;
40+
Schema schema;
4041

41-
GlueSchemaRegistryKafkaDeserializer deserializer = new GlueSchemaRegistryKafkaDeserializer(configs);
42-
record = (GenericRecord) deserializer.deserialize("test", bytes);
43-
schema = record.getSchema();
42+
bytes = System.in.readAllBytes();
4443

45-
GlueSchemaRegistryKafkaSerializer serializer = new GlueSchemaRegistryKafkaSerializer(configs);
46-
bytes = serializer.serialize("test", record);
44+
GlueSchemaRegistryKafkaDeserializer deserializer = new GlueSchemaRegistryKafkaDeserializer(configs);
45+
record = (GenericRecord) deserializer.deserialize("test", bytes);
46+
schema = record.getSchema();
4747

48-
System.out.write(bytes, 0, bytes.length);
49-
} catch (IOException e) {
50-
e.printStackTrace();
48+
GlueSchemaRegistryKafkaSerializer serializer = new GlueSchemaRegistryKafkaSerializer(configs);
49+
bytes = serializer.serialize("test", record);
50+
51+
System.out.write(bytes, 0, bytes.length);
52+
} catch (IOException e) {
53+
e.printStackTrace();
54+
System.exit(1);
55+
}
56+
} else if (dataFormat.equals("JSON")) {
57+
configs.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.JSON.name());
58+
try {
59+
byte[] bytes;
60+
Object record;
61+
Schema schema;
62+
63+
bytes = System.in.readAllBytes();
64+
65+
GlueSchemaRegistryKafkaDeserializer deserializer = new GlueSchemaRegistryKafkaDeserializer(configs);
66+
record = deserializer.deserialize("test", bytes);
67+
68+
GlueSchemaRegistryKafkaSerializer serializer = new GlueSchemaRegistryKafkaSerializer(configs);
69+
bytes = serializer.serialize("test", record);
70+
71+
System.out.write(bytes, 0, bytes.length);
72+
} catch (IOException e) {
73+
e.printStackTrace();
74+
System.exit(1);
75+
}
76+
} else {
77+
System.out.println("Only JSON or AVRO are acceptable data formats");
5178
System.exit(1);
5279
}
5380
}

tests/integration/java/test_java_integration.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22
import os
33
import subprocess
44

5+
import pytest
6+
57
from aws_schema_registry import DataAndSchema, SchemaRegistryClient
68
from aws_schema_registry.avro import AvroSchema
9+
from aws_schema_registry.jsonschema import JsonSchema
710
from aws_schema_registry.adapter.kafka import (
811
KafkaDeserializer, KafkaSerializer
912
)
@@ -20,10 +23,21 @@
2023
with open(os.path.join(os.path.dirname(__file__), 'user.avsc'), 'r') as f:
2124
SCHEMA = AvroSchema(f.read())
2225

26+
with open(os.path.join(os.path.dirname(__file__), 'user.json'), 'r') as f:
27+
JSON_SCHEMA = JsonSchema(f.read())
28+
29+
30+
def _topic_name_schema_type_name_strategy(topic, is_key, schema):
31+
return f"{topic}-{'key' if is_key else 'value'}-{schema.data_format}"
32+
2333

24-
def test_interop_with_java_library(glue_client, registry, boto_session):
34+
@pytest.mark.parametrize("schema", [SCHEMA, JSON_SCHEMA])
35+
def test_interop_with_java_library(glue_client, registry,
36+
boto_session, schema):
2537
client = SchemaRegistryClient(glue_client, registry_name=registry)
26-
serializer = KafkaSerializer(client)
38+
serializer = KafkaSerializer(
39+
client,
40+
schema_naming_strategy=_topic_name_schema_type_name_strategy)
2741
deserializer = KafkaDeserializer(client)
2842

2943
data = {
@@ -32,7 +46,7 @@ def test_interop_with_java_library(glue_client, registry, boto_session):
3246
'favorite_color': 'red'
3347
}
3448
serialized: bytes = serializer.serialize(
35-
'test', DataAndSchema(data, SCHEMA)
49+
'test', DataAndSchema(data, schema)
3650
)
3751

3852
if not os.path.exists(JAR_LOCATION):
@@ -45,19 +59,22 @@ def test_interop_with_java_library(glue_client, registry, boto_session):
4559
input=serialized,
4660
capture_output=True,
4761
env={
62+
'DATA_FORMAT': schema.data_format,
4863
'AWS_ACCESS_KEY_ID': credentials.access_key,
4964
'AWS_SECRET_ACCESS_KEY': credentials.secret_key,
5065
'AWS_SESSION_TOKEN': credentials.token,
5166
'AWS_REGION': boto_session.region_name,
5267
'REGISTRY_NAME': registry,
53-
'SCHEMA_NAME': 'sometestschema'
68+
'SCHEMA_NAME': _topic_name_schema_type_name_strategy(
69+
"test", False, schema)
5470
}
5571
)
72+
print(proc.stderr)
5673
proc.check_returncode()
5774
deserialized = deserializer.deserialize('test', proc.stdout)
5875
assert deserialized
5976
assert deserialized.data == data
60-
assert deserialized.schema == SCHEMA
77+
assert deserialized.schema == schema
6178

6279

6380
def compile_java():

tests/integration/java/user.json

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"$schema": "http://json-schema.org/draft-04/schema#",
3+
"type": "object",
4+
"properties": {
5+
"name": {
6+
"type": "string"
7+
},
8+
"favorite_number": {
9+
"type": "integer"
10+
},
11+
"favorite_color": {
12+
"type": "string"
13+
}
14+
},
15+
"required": [
16+
"name",
17+
"favorite_number",
18+
"favorite_color"
19+
]
20+
}

tests/integration/kafka/test_kafka_integration.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77

88
from aws_schema_registry import DataAndSchema, SchemaRegistryClient
99
from aws_schema_registry.avro import AvroSchema
10+
from aws_schema_registry.jsonschema import JsonSchema
1011
from aws_schema_registry.adapter.kafka import (
1112
KafkaDeserializer, KafkaSerializer
1213
)
13-
from aws_schema_registry.naming import record_name_strategy
14+
from aws_schema_registry.naming import (record_name_strategy,
15+
topic_name_strategy)
1416

1517
BOOTSTRAP_STRING = '127.0.0.1:9092'
1618

@@ -24,6 +26,8 @@
2426
SCHEMA_V1 = AvroSchema(f.read())
2527
with open(os.path.join(os.path.dirname(__file__), 'user.v2.avsc'), 'r') as f:
2628
SCHEMA_V2 = AvroSchema(f.read())
29+
with open(os.path.join(os.path.dirname(__file__), 'user.json'), 'r') as f:
30+
SCHEMA_JSON = JsonSchema(f.read())
2731

2832
PRODUCER_PROPERTIES = {
2933
'bootstrap_servers': BOOTSTRAP_STRING,
@@ -65,12 +69,25 @@ def test_produce_consume_with_ser_de_schema_registry(
6569
serializer = KafkaSerializer(
6670
client, schema_naming_strategy=record_name_strategy
6771
)
72+
73+
# jsonschema has no fqn, so we use topic_name_strategy for it
74+
# (which also requires a separate producer)
75+
json_serializer = KafkaSerializer(
76+
client, schema_naming_strategy=topic_name_strategy
77+
)
78+
6879
deserializer = KafkaDeserializer(client)
6980

7081
producer = KafkaProducer(
7182
value_serializer=serializer,
7283
**PRODUCER_PROPERTIES
7384
)
85+
86+
json_producer = KafkaProducer(
87+
value_serializer=json_serializer,
88+
**PRODUCER_PROPERTIES
89+
)
90+
7491
data1 = {
7592
'name': 'John Doe',
7693
'favorite_number': 6,
@@ -85,6 +102,13 @@ def test_produce_consume_with_ser_de_schema_registry(
85102
}
86103
producer.send(topic, DataAndSchema(data2, SCHEMA_V2))
87104

105+
data3 = {
106+
'name': 'John Doe',
107+
'favorite_number': 6,
108+
'favorite_colors': ['red', 'blue', "yello"]
109+
}
110+
json_producer.send(topic, DataAndSchema(data3, SCHEMA_JSON))
111+
88112
consumer = KafkaConsumer(
89113
topic,
90114
value_deserializer=deserializer,
@@ -93,8 +117,10 @@ def test_produce_consume_with_ser_de_schema_registry(
93117
batch = consumer.poll(timeout_ms=1000)
94118
assert len(batch) == 1
95119
messages = batch[list(batch.keys())[0]]
96-
assert len(messages) == 2
120+
assert len(messages) == 3
97121
assert messages[0].value.data == data1
98122
assert messages[0].value.schema == SCHEMA_V1
99123
assert messages[1].value.data == data2
100124
assert messages[1].value.schema == SCHEMA_V2
125+
assert messages[2].value.data == data3
126+
assert messages[2].value.schema == SCHEMA_JSON

tests/integration/kafka/user.json

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"$schema": "http://json-schema.org/draft-04/schema#",
3+
"type": "object",
4+
"properties": {
5+
"name": {
6+
"type": "string"
7+
},
8+
"favorite_number": {
9+
"type": "integer"
10+
},
11+
"favorite_colors": {
12+
"type": "array",
13+
"items": {
14+
"type": "string"
15+
}
16+
}
17+
},
18+
"required": [
19+
"name",
20+
"favorite_number",
21+
"favorite_colors"
22+
]
23+
}

0 commit comments

Comments
 (0)