Skip to content

Commit f2ec4d7

Browse files
iliaxiliax
andauthored
Avro (Embedded) serde implementation (#3266)
* Avro (Embedded) serde implementation --------- Co-authored-by: iliax <[email protected]>
1 parent a87b31a commit f2ec4d7

File tree

3 files changed

+166
-0
lines changed

3 files changed

+166
-0
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.provectus.kafka.ui.exception.ValidationException;
1010
import com.provectus.kafka.ui.serde.api.PropertyResolver;
1111
import com.provectus.kafka.ui.serde.api.Serde;
12+
import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde;
1213
import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
1314
import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
1415
import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
@@ -43,6 +44,7 @@ public SerdesInitializer() {
4344
.put(Int64Serde.name(), Int64Serde.class)
4445
.put(UInt32Serde.name(), UInt32Serde.class)
4546
.put(UInt64Serde.name(), UInt64Serde.class)
47+
.put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
4648
.put(Base64Serde.name(), Base64Serde.class)
4749
.put(UuidBinarySerde.name(), UuidBinarySerde.class)
4850
.build(),
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.provectus.kafka.ui.serdes.builtin;
2+
3+
import com.provectus.kafka.ui.serde.api.DeserializeResult;
4+
import com.provectus.kafka.ui.serde.api.PropertyResolver;
5+
import com.provectus.kafka.ui.serde.api.RecordHeaders;
6+
import com.provectus.kafka.ui.serde.api.SchemaDescription;
7+
import com.provectus.kafka.ui.serdes.BuiltInSerde;
8+
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
9+
import java.util.Map;
10+
import java.util.Optional;
11+
import lombok.SneakyThrows;
12+
import org.apache.avro.file.DataFileReader;
13+
import org.apache.avro.file.SeekableByteArrayInput;
14+
import org.apache.avro.generic.GenericDatumReader;
15+
16+
public class AvroEmbeddedSerde implements BuiltInSerde {
17+
18+
public static String name() {
19+
return "Avro (Embedded)";
20+
}
21+
22+
@Override
23+
public void configure(PropertyResolver serdeProperties,
24+
PropertyResolver kafkaClusterProperties,
25+
PropertyResolver globalProperties) {
26+
}
27+
28+
@Override
29+
public Optional<String> getDescription() {
30+
return Optional.empty();
31+
}
32+
33+
@Override
34+
public Optional<SchemaDescription> getSchema(String topic, Target type) {
35+
return Optional.empty();
36+
}
37+
38+
@Override
39+
public boolean canDeserialize(String topic, Target type) {
40+
return true;
41+
}
42+
43+
@Override
44+
public boolean canSerialize(String topic, Target type) {
45+
return false;
46+
}
47+
48+
@Override
49+
public Serializer serializer(String topic, Target type) {
50+
throw new IllegalStateException();
51+
}
52+
53+
@Override
54+
public Deserializer deserializer(String topic, Target type) {
55+
return new Deserializer() {
56+
@SneakyThrows
57+
@Override
58+
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
59+
try (var reader = new DataFileReader<>(new SeekableByteArrayInput(data), new GenericDatumReader<>())) {
60+
if (!reader.hasNext()) {
61+
// this is very strange situation, when only header present in payload
62+
// returning null in this case
63+
return new DeserializeResult(null, DeserializeResult.Type.JSON, Map.of());
64+
}
65+
Object avroObj = reader.next();
66+
String jsonValue = new String(AvroSchemaUtils.toJson(avroObj));
67+
return new DeserializeResult(jsonValue, DeserializeResult.Type.JSON, Map.of());
68+
}
69+
}
70+
};
71+
}
72+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.provectus.kafka.ui.serdes.builtin;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import com.fasterxml.jackson.databind.json.JsonMapper;
6+
import com.provectus.kafka.ui.serde.api.DeserializeResult;
7+
import com.provectus.kafka.ui.serde.api.Serde;
8+
import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
9+
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
10+
import java.io.ByteArrayOutputStream;
11+
import java.io.IOException;
12+
import org.apache.avro.Schema;
13+
import org.apache.avro.file.DataFileWriter;
14+
import org.apache.avro.generic.GenericData;
15+
import org.apache.avro.generic.GenericDatumWriter;
16+
import org.apache.avro.generic.GenericRecord;
17+
import org.junit.jupiter.api.BeforeEach;
18+
import org.junit.jupiter.api.Test;
19+
import org.junit.jupiter.params.ParameterizedTest;
20+
import org.junit.jupiter.params.provider.EnumSource;
21+
22+
class AvroEmbeddedSerdeTest {
23+
24+
private AvroEmbeddedSerde avroEmbeddedSerde;
25+
26+
@BeforeEach
27+
void init() {
28+
avroEmbeddedSerde = new AvroEmbeddedSerde();
29+
avroEmbeddedSerde.configure(
30+
PropertyResolverImpl.empty(),
31+
PropertyResolverImpl.empty(),
32+
PropertyResolverImpl.empty()
33+
);
34+
}
35+
36+
@ParameterizedTest
37+
@EnumSource
38+
void canDeserializeReturnsTrueForAllTargets(Serde.Target target) {
39+
assertThat(avroEmbeddedSerde.canDeserialize("anyTopic", target))
40+
.isTrue();
41+
}
42+
43+
@ParameterizedTest
44+
@EnumSource
45+
void canSerializeReturnsFalseForAllTargets(Serde.Target target) {
46+
assertThat(avroEmbeddedSerde.canSerialize("anyTopic", target))
47+
.isFalse();
48+
}
49+
50+
@Test
51+
void deserializerParsesAvroDataWithEmbeddedSchema() throws Exception {
52+
Schema schema = new Schema.Parser().parse("""
53+
{
54+
"type": "record",
55+
"name": "TestAvroRecord",
56+
"fields": [
57+
{ "name": "field1", "type": "string" },
58+
{ "name": "field2", "type": "int" }
59+
]
60+
}
61+
"""
62+
);
63+
GenericRecord record = new GenericData.Record(schema);
64+
record.put("field1", "this is test msg");
65+
record.put("field2", 100500);
66+
67+
String jsonRecord = new String(AvroSchemaUtils.toJson(record));
68+
byte[] serializedRecordBytes = serializeAvroWithEmbeddedSchema(record);
69+
70+
var deserializer = avroEmbeddedSerde.deserializer("anyTopic", Serde.Target.KEY);
71+
DeserializeResult result = deserializer.deserialize(null, serializedRecordBytes);
72+
assertThat(result.getType()).isEqualTo(DeserializeResult.Type.JSON);
73+
assertThat(result.getAdditionalProperties()).isEmpty();
74+
assertJsonEquals(jsonRecord, result.getResult());
75+
}
76+
77+
private void assertJsonEquals(String expected, String actual) throws IOException {
78+
var mapper = new JsonMapper();
79+
assertThat(mapper.readTree(actual)).isEqualTo(mapper.readTree(expected));
80+
}
81+
82+
private byte[] serializeAvroWithEmbeddedSchema(GenericRecord record) throws IOException {
83+
try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<>());
84+
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
85+
writer.create(record.getSchema(), baos);
86+
writer.append(record);
87+
writer.flush();
88+
return baos.toByteArray();
89+
}
90+
}
91+
92+
}

0 commit comments

Comments
 (0)