diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java index d6f7a3699..bbf38230f 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java @@ -279,7 +279,7 @@ public Serializer serializer(String topic, Target type) { @Override public Deserializer deserializer(String topic, Target type) { return (headers, data) -> { - var schemaId = extractSchemaIdFromMsg(data); + int schemaId = getSchemaIdFromMessageOrTopic(data, topic, type); SchemaType format = getMessageFormatBySchemaId(schemaId); MessageFormatter formatter = schemaRegistryFormatters.get(format); return new DeserializeResult( @@ -293,6 +293,18 @@ public Deserializer deserializer(String topic, Target type) { }; } + private int getSchemaIdFromMessageOrTopic(byte[] data, String topic, Target type) { + return extractSchemaIdFromMsg(data).orElseGet( + () -> { + String subject = schemaSubject(topic, type); + return getSchemaBySubject(subject) + .map(SchemaMetadata::getId) + .orElseThrow(() -> new ValidationException( + String.format("No schema for subject '%s' found and no magic byte in avro data", subject))); + } + ); + } + private SchemaType getMessageFormatBySchemaId(int schemaId) { return getSchemaById(schemaId) .map(ParsedSchema::schemaType) @@ -300,15 +312,11 @@ private SchemaType getMessageFormatBySchemaId(int schemaId) { .orElseThrow(() -> new ValidationException(String.format("Schema for id '%d' not found ", schemaId))); } - private int extractSchemaIdFromMsg(byte[] data) { + private Optional extractSchemaIdFromMsg(byte[] data) { ByteBuffer buffer = ByteBuffer.wrap(data); if (buffer.remaining() >= SR_PAYLOAD_PREFIX_LENGTH && buffer.get() == SR_PAYLOAD_MAGIC_BYTE) { - return buffer.getInt(); + return Optional.of(buffer.getInt()); } - throw new ValidationException( - String.format( - "Data doesn't contain magic byte and schema id prefix, so it can't be deserialized with %s serde", - name()) - ); + return Optional.empty(); } } diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java index d66a8d004..e243303d2 100644 --- a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java @@ -130,6 +130,39 @@ void deserializeReturnsJsonAvroMsgJsonRepresentation() throws RestClientExceptio .contains(Map.entry("schemaId", schemaId)); } + @Test + void deserializeReturnsJsonAvroMsgJsonRepresentationViaTopicNameOnly() throws RestClientException, IOException { + AvroSchema schema = new AvroSchema( + "{" + + " \"type\": \"record\"," + + " \"name\": \"TestAvroRecord1\"," + + " \"fields\": [" + + " {" + + " \"name\": \"field1\"," + + " \"type\": \"string\"" + + " }," + + " {" + + " \"name\": \"field2\"," + + " \"type\": \"int\"" + + " }" + + " ]" + + "}" + ); + String jsonValue = "{ \"field1\":\"testStr\", \"field2\": 123 }"; + + String topic = "test"; + int schemaId = registryClient.register(topic + "-value", schema); + + byte[] data = jsonToAvro(jsonValue, schema); // No magic byte no schema id registered + var result = serde.deserializer(topic, Serde.Target.VALUE).deserialize(null, data); + + assertJsonsEqual(jsonValue, result.getResult()); + assertThat(result.getType()).isEqualTo(DeserializeResult.Type.JSON); + assertThat(result.getAdditionalProperties()) + .contains(Map.entry("type", "AVRO")) + .contains(Map.entry("schemaId", schemaId)); + } + @Nested class SerdeWithDisabledSubjectExistenceCheck {