Skip to content

Commit b67c0aa

Browse files
authored
DGS-22343 Fix transformation of union of JSON refs (#2074)
1 parent be5bc1d commit b67c0aa

File tree

3 files changed

+269
-12
lines changed

3 files changed

+269
-12
lines changed

src/confluent_kafka/schema_registry/common/json_schema.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -83,17 +83,17 @@ def transform(
8383
schema["type"] = original_type # restore original type
8484
all_of = schema.get("allOf")
8585
if all_of is not None:
86-
subschema = _validate_subschemas(all_of, message, ref_registry)
86+
subschema = _validate_subschemas(all_of, message, ref_registry, ref_resolver)
8787
if subschema is not None:
8888
return transform(ctx, subschema, ref_registry, ref_resolver, path, message, field_transform)
8989
any_of = schema.get("anyOf")
9090
if any_of is not None:
91-
subschema = _validate_subschemas(any_of, message, ref_registry)
91+
subschema = _validate_subschemas(any_of, message, ref_registry, ref_resolver)
9292
if subschema is not None:
9393
return transform(ctx, subschema, ref_registry, ref_resolver, path, message, field_transform)
9494
one_of = schema.get("oneOf")
9595
if one_of is not None:
96-
subschema = _validate_subschemas(one_of, message, ref_registry)
96+
subschema = _validate_subschemas(one_of, message, ref_registry, ref_resolver)
9797
if subschema is not None:
9898
return transform(ctx, subschema, ref_registry, ref_resolver, path, message, field_transform)
9999
items = schema.get("items")
@@ -133,13 +133,14 @@ def _transform_field(
133133
get_type(prop_schema),
134134
get_inline_tags(prop_schema)
135135
)
136-
value = message[prop_name]
137-
new_value = transform(ctx, prop_schema, ref_registry, ref_resolver, full_name, value, field_transform)
138-
if ctx.rule.kind == RuleKind.CONDITION:
139-
if new_value is False:
140-
raise RuleConditionError(ctx.rule)
141-
else:
142-
message[prop_name] = new_value
136+
value = message.get(prop_name)
137+
if value is not None:
138+
new_value = transform(ctx, prop_schema, ref_registry, ref_resolver, full_name, value, field_transform)
139+
if ctx.rule.kind == RuleKind.CONDITION:
140+
if new_value is False:
141+
raise RuleConditionError(ctx.rule)
142+
else:
143+
message[prop_name] = new_value
143144
finally:
144145
ctx.exit_field()
145146

@@ -163,11 +164,16 @@ def _validate_subtypes(
163164
def _validate_subschemas(
164165
subschemas: List[JsonSchema],
165166
message: JsonMessage,
166-
registry: Registry
167+
registry: Registry,
168+
resolver: Resolver,
167169
) -> Optional[JsonSchema]:
168170
for subschema in subschemas:
169171
try:
170-
validate(instance=message, schema=subschema, registry=registry)
172+
ref = subschema.get("$ref")
173+
if ref is not None:
174+
# resolve $ref before validating
175+
subschema = resolver.lookup(ref).contents
176+
validate(instance=message, schema=subschema, registry=registry, resolver=resolver)
171177
return subschema
172178
except ValidationError:
173179
pass
@@ -202,6 +208,11 @@ def get_type(schema: JsonSchema) -> FieldType:
202208
return FieldType.BOOLEAN
203209
if schema_type == "null":
204210
return FieldType.NULL
211+
212+
props = schema.get("properties")
213+
if props is not None:
214+
return FieldType.RECORD
215+
205216
return FieldType.NULL
206217

207218

tests/schema_registry/_async/test_json_serdes.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,6 +1209,129 @@ async def test_json_encryption_with_union():
12091209
assert obj == obj2
12101210

12111211

1212+
async def test_json_encryption_with_union_of_refs():
1213+
executor = FieldEncryptionExecutor.register_with_clock(FakeClock())
1214+
1215+
conf = {'url': _BASE_URL}
1216+
client = AsyncSchemaRegistryClient.new_client(conf)
1217+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
1218+
rule_conf = {'secret': 'mysecret'}
1219+
schema = {
1220+
"type": "object",
1221+
"properties": {
1222+
"messageType": {
1223+
"type": "string"
1224+
},
1225+
"version": {
1226+
"type": "string"
1227+
},
1228+
"payload": {
1229+
"type": "object",
1230+
"oneOf": [
1231+
{
1232+
"$ref": "#/$defs/authentication_request"
1233+
},
1234+
{
1235+
"$ref": "#/$defs/authentication_status"
1236+
}
1237+
]
1238+
}
1239+
},
1240+
"required": [
1241+
"payload",
1242+
"messageType",
1243+
"version"
1244+
],
1245+
"$defs": {
1246+
"authentication_request": {
1247+
"properties": {
1248+
"messageId": {
1249+
"type": "string",
1250+
"confluent:tags": ["PII"]
1251+
},
1252+
"timestamp": {
1253+
"type": "integer",
1254+
"minimum": 0
1255+
},
1256+
"requestId": {
1257+
"type": "string"
1258+
}
1259+
},
1260+
"required": [
1261+
"messageId",
1262+
"timestamp"
1263+
]
1264+
},
1265+
"authentication_status": {
1266+
"properties": {
1267+
"messageId": {
1268+
"type": "string",
1269+
"confluent:tags": ["PII"]
1270+
},
1271+
"authType": {
1272+
"type": [
1273+
"string",
1274+
"null"
1275+
]
1276+
}
1277+
},
1278+
"required": [
1279+
"messageId",
1280+
"authType"
1281+
]
1282+
}
1283+
}
1284+
}
1285+
1286+
rule = Rule(
1287+
"test-encrypt",
1288+
"",
1289+
RuleKind.TRANSFORM,
1290+
RuleMode.WRITEREAD,
1291+
"ENCRYPT",
1292+
["PII"],
1293+
RuleParams({
1294+
"encrypt.kek.name": "kek1",
1295+
"encrypt.kms.type": "local-kms",
1296+
"encrypt.kms.key.id": "mykey"
1297+
}),
1298+
None,
1299+
None,
1300+
"ERROR,NONE",
1301+
False
1302+
)
1303+
await client.register_schema(_SUBJECT, Schema(
1304+
json.dumps(schema),
1305+
"JSON",
1306+
[],
1307+
None,
1308+
RuleSet(None, [rule])
1309+
))
1310+
1311+
obj = {
1312+
"messageType": "authentication_request",
1313+
"version": "1.0",
1314+
"payload": {
1315+
"messageId": "12345",
1316+
"timestamp": 1757410647
1317+
}
1318+
}
1319+
1320+
ser = await AsyncJSONSerializer(json.dumps(schema), client, conf=ser_conf, rule_conf=rule_conf)
1321+
dek_client = executor.executor.client
1322+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
1323+
obj_bytes = await ser(obj, ser_ctx)
1324+
1325+
# reset encrypted fields
1326+
assert obj['payload']['messageId'] != '12345'
1327+
obj['payload']['messageId'] = '12345'
1328+
1329+
deser = await AsyncJSONDeserializer(None, schema_registry_client=client, rule_conf=rule_conf)
1330+
executor.executor.client = dek_client
1331+
obj2 = await deser(obj_bytes, ser_ctx)
1332+
assert obj == obj2
1333+
1334+
12121335
async def test_json_encryption_with_references():
12131336
executor = FieldEncryptionExecutor.register_with_clock(FakeClock())
12141337

tests/schema_registry/_sync/test_json_serdes.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,6 +1209,129 @@ def test_json_encryption_with_union():
12091209
assert obj == obj2
12101210

12111211

1212+
def test_json_encryption_with_union_of_refs():
1213+
executor = FieldEncryptionExecutor.register_with_clock(FakeClock())
1214+
1215+
conf = {'url': _BASE_URL}
1216+
client = SchemaRegistryClient.new_client(conf)
1217+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
1218+
rule_conf = {'secret': 'mysecret'}
1219+
schema = {
1220+
"type": "object",
1221+
"properties": {
1222+
"messageType": {
1223+
"type": "string"
1224+
},
1225+
"version": {
1226+
"type": "string"
1227+
},
1228+
"payload": {
1229+
"type": "object",
1230+
"oneOf": [
1231+
{
1232+
"$ref": "#/$defs/authentication_request"
1233+
},
1234+
{
1235+
"$ref": "#/$defs/authentication_status"
1236+
}
1237+
]
1238+
}
1239+
},
1240+
"required": [
1241+
"payload",
1242+
"messageType",
1243+
"version"
1244+
],
1245+
"$defs": {
1246+
"authentication_request": {
1247+
"properties": {
1248+
"messageId": {
1249+
"type": "string",
1250+
"confluent:tags": ["PII"]
1251+
},
1252+
"timestamp": {
1253+
"type": "integer",
1254+
"minimum": 0
1255+
},
1256+
"requestId": {
1257+
"type": "string"
1258+
}
1259+
},
1260+
"required": [
1261+
"messageId",
1262+
"timestamp"
1263+
]
1264+
},
1265+
"authentication_status": {
1266+
"properties": {
1267+
"messageId": {
1268+
"type": "string",
1269+
"confluent:tags": ["PII"]
1270+
},
1271+
"authType": {
1272+
"type": [
1273+
"string",
1274+
"null"
1275+
]
1276+
}
1277+
},
1278+
"required": [
1279+
"messageId",
1280+
"authType"
1281+
]
1282+
}
1283+
}
1284+
}
1285+
1286+
rule = Rule(
1287+
"test-encrypt",
1288+
"",
1289+
RuleKind.TRANSFORM,
1290+
RuleMode.WRITEREAD,
1291+
"ENCRYPT",
1292+
["PII"],
1293+
RuleParams({
1294+
"encrypt.kek.name": "kek1",
1295+
"encrypt.kms.type": "local-kms",
1296+
"encrypt.kms.key.id": "mykey"
1297+
}),
1298+
None,
1299+
None,
1300+
"ERROR,NONE",
1301+
False
1302+
)
1303+
client.register_schema(_SUBJECT, Schema(
1304+
json.dumps(schema),
1305+
"JSON",
1306+
[],
1307+
None,
1308+
RuleSet(None, [rule])
1309+
))
1310+
1311+
obj = {
1312+
"messageType": "authentication_request",
1313+
"version": "1.0",
1314+
"payload": {
1315+
"messageId": "12345",
1316+
"timestamp": 1757410647
1317+
}
1318+
}
1319+
1320+
ser = JSONSerializer(json.dumps(schema), client, conf=ser_conf, rule_conf=rule_conf)
1321+
dek_client = executor.executor.client
1322+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
1323+
obj_bytes = ser(obj, ser_ctx)
1324+
1325+
# reset encrypted fields
1326+
assert obj['payload']['messageId'] != '12345'
1327+
obj['payload']['messageId'] = '12345'
1328+
1329+
deser = JSONDeserializer(None, schema_registry_client=client, rule_conf=rule_conf)
1330+
executor.executor.client = dek_client
1331+
obj2 = deser(obj_bytes, ser_ctx)
1332+
assert obj == obj2
1333+
1334+
12121335
def test_json_encryption_with_references():
12131336
executor = FieldEncryptionExecutor.register_with_clock(FakeClock())
12141337

0 commit comments

Comments
 (0)