Skip to content

Commit f0f9450

Browse files
committed
AVRO References functionality and tests
1 parent fd811e3 commit f0f9450

File tree

4 files changed

+331
-49
lines changed

4 files changed

+331
-49
lines changed

karapace/compatibility/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ def check_compatibility(
8888
if old_schema.schema_type is SchemaType.AVRO:
8989
assert isinstance(old_schema.schema, AvroSchema)
9090
assert isinstance(new_schema.schema, AvroSchema)
91+
9192
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
9293
result = check_avro_compatibility(
9394
reader_schema=new_schema.schema,

karapace/schema_models.py

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@
2727
from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError
2828
from typing import Any, cast, Dict, Final, final, Mapping, Sequence
2929

30+
import avro.schema
3031
import hashlib
3132
import logging
33+
import re
3234

3335
LOG = logging.getLogger(__name__)
3436

@@ -181,15 +183,42 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema:
181183
return parsed_typed_schema.schema
182184

183185

184-
def avro_schema_merge(schema_str: str, dependencies: Mapping[str, Dependency]) -> str:
185-
"""To support references in AVRO we recursively merge all referenced schemas with current schema"""
186-
if dependencies:
187-
merged_schema = ""
188-
for dependency in dependencies.values():
189-
merged_schema += avro_schema_merge(dependency.schema.schema_str, dependency.schema.dependencies) + ",\n"
190-
merged_schema += schema_str
191-
return "[\n" + merged_schema + "\n]"
192-
return schema_str
186+
class AvroMerge:
187+
def __init__(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None):
188+
self.schema_str = json_encode(json_decode(schema_str), compact=True, sort_keys=True)
189+
self.dependencies = dependencies
190+
self.unique_id = 0
191+
192+
def union_safe_schema_str(self, schema_str: str) -> str:
193+
# in case we meet union - we use it as is
194+
regex = re.compile(r"^\s*\[")
195+
base_schema = (
196+
f'{{ "name": "___RESERVED_KARAPACE_WRAPPER_NAME_{self.unique_id}___",'
197+
f'"type": "record", "fields": [{{"name": "name", "type":'
198+
)
199+
if regex.match(schema_str):
200+
return f"{base_schema} {schema_str}}}]}}"
201+
return f"{base_schema} [{schema_str}]}}]}}"
202+
203+
def builder(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None) -> str:
204+
"""To support references in AVRO we iteratively merge all referenced schemas with current schema"""
205+
stack: list[tuple[str, Mapping[str, Dependency] | None]] = [(schema_str, dependencies)]
206+
merged_schemas = []
207+
208+
while stack:
209+
current_schema_str, current_dependencies = stack.pop()
210+
if current_dependencies:
211+
stack.append((current_schema_str, None))
212+
for dependency in reversed(current_dependencies.values()):
213+
stack.append((dependency.schema.schema_str, dependency.schema.dependencies))
214+
else:
215+
self.unique_id += 1
216+
merged_schemas.append(self.union_safe_schema_str(current_schema_str))
217+
218+
return ",\n".join(merged_schemas)
219+
220+
def wrap(self) -> str:
221+
return "[\n" + self.builder(self.schema_str, self.dependencies) + "\n]"
193222

194223

195224
def parse(
@@ -200,21 +229,41 @@ def parse(
200229
references: Sequence[Reference] | None = None,
201230
dependencies: Mapping[str, Dependency] | None = None,
202231
normalize: bool = False,
232+
dependencies_compat: bool = False,
203233
) -> ParsedTypedSchema:
204234
if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]:
205235
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}")
206-
236+
parsed_schema_result: Draft7Validator | AvroSchema | ProtobufSchema
207237
parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema
208238
if schema_type is SchemaType.AVRO:
209239
try:
240+
if dependencies or dependencies_compat:
241+
wrapped_schema_str = AvroMerge(schema_str, dependencies).wrap()
242+
else:
243+
wrapped_schema_str = schema_str
210244
parsed_schema = parse_avro_schema_definition(
211-
avro_schema_merge(schema_str, dependencies),
245+
wrapped_schema_str,
212246
validate_enum_symbols=validate_avro_enum_symbols,
213247
validate_names=validate_avro_names,
214248
)
249+
if dependencies or dependencies_compat:
250+
if isinstance(parsed_schema, avro.schema.UnionSchema):
251+
parsed_schema_result = parsed_schema.schemas[-1].fields[0].type.schemas[-1]
252+
253+
else:
254+
raise InvalidSchema
255+
else:
256+
parsed_schema_result = parsed_schema
257+
return ParsedTypedSchema(
258+
schema_type=schema_type,
259+
schema_str=schema_str,
260+
schema=parsed_schema_result,
261+
references=references,
262+
dependencies=dependencies,
263+
schema_wrapped=parsed_schema,
264+
)
215265
except (SchemaParseException, JSONDecodeError, TypeError) as e:
216266
raise InvalidSchema from e
217-
218267
elif schema_type is SchemaType.JSONSCHEMA:
219268
try:
220269
parsed_schema = parse_jsonschema_definition(schema_str)
@@ -276,9 +325,10 @@ def __init__(
276325
schema: Draft7Validator | AvroSchema | ProtobufSchema,
277326
references: Sequence[Reference] | None = None,
278327
dependencies: Mapping[str, Dependency] | None = None,
328+
schema_wrapped: Draft7Validator | AvroSchema | ProtobufSchema | None = None,
279329
) -> None:
280330
self._schema_cached: Draft7Validator | AvroSchema | ProtobufSchema | None = schema
281-
331+
self.schema_wrapped = schema_wrapped
282332
super().__init__(
283333
schema_type=schema_type,
284334
schema_str=schema_str,

karapace/schema_reader.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -512,9 +512,9 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
512512

513513
try:
514514
schema_type_parsed = SchemaType(schema_type)
515-
except ValueError:
515+
except ValueError as e:
516516
LOG.warning("Invalid schema type: %s", schema_type)
517-
return
517+
raise e
518518

519519
# This does two jobs:
520520
# - Validates the schema's JSON
@@ -531,18 +531,18 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
531531
candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references]
532532
resolved_references, resolved_dependencies = self.resolve_references(candidate_references)
533533
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
534-
except json.JSONDecodeError:
534+
except json.JSONDecodeError as e:
535535
LOG.warning("Schema is not valid JSON")
536-
return
537-
except InvalidReferences:
536+
raise e
537+
except InvalidReferences as e:
538538
LOG.exception("Invalid AVRO references")
539-
return
539+
raise e
540540
elif schema_type_parsed == SchemaType.JSONSCHEMA:
541541
try:
542542
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
543-
except json.JSONDecodeError:
543+
except json.JSONDecodeError as e:
544544
LOG.warning("Schema is not valid JSON")
545-
return
545+
raise e
546546
elif schema_type_parsed == SchemaType.PROTOBUF:
547547
try:
548548
if schema_references:
@@ -556,12 +556,12 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
556556
normalize=False,
557557
)
558558
schema_str = str(parsed_schema)
559-
except InvalidSchema:
559+
except InvalidSchema as e:
560560
LOG.exception("Schema is not valid ProtoBuf definition")
561-
return
562-
except InvalidReferences:
561+
raise e
562+
except InvalidReferences as e:
563563
LOG.exception("Invalid Protobuf references")
564-
return
564+
raise e
565565

566566
try:
567567
typed_schema = TypedSchema(
@@ -571,8 +571,8 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
571571
dependencies=resolved_dependencies,
572572
schema=parsed_schema,
573573
)
574-
except (InvalidSchema, JSONDecodeError):
575-
return
574+
except (InvalidSchema, JSONDecodeError) as e:
575+
raise e
576576

577577
self.database.insert_schema_version(
578578
subject=schema_subject,

0 commit comments

Comments
 (0)