Skip to content

Commit cfe149f

Browse files
authored
Add ability to use latest version during serialization (confluentinc#1473)
* Add ability to use last version during serialization This helps to address issues where the object's schema might have slightly different metadata than the latest schema that is registered * Add unit test
1 parent 293222f commit cfe149f

File tree

12 files changed

+187
-35
lines changed

12 files changed

+187
-35
lines changed

avro-serializer/src/main/java/io/confluent/kafka/formatter/AvroMessageReader.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,19 +92,21 @@ public AvroMessageReader() {
9292
String topic,
9393
boolean parseKey,
9494
BufferedReader reader,
95-
boolean autoRegister
95+
boolean autoRegister,
96+
boolean useLatest
9697
) {
9798
super(schemaRegistryClient, new AvroSchema(keySchema), new AvroSchema(valueSchema), topic,
98-
parseKey, reader, autoRegister);
99+
parseKey, reader, autoRegister, useLatest);
99100
}
100101

101102
@Override
102103
protected SchemaMessageSerializer<Object> createSerializer(
103104
SchemaRegistryClient schemaRegistryClient,
104105
boolean autoRegister,
106+
boolean useLatest,
105107
Serializer keySerializer
106108
) {
107-
return new AvroMessageSerializer(schemaRegistryClient, autoRegister, keySerializer);
109+
return new AvroMessageSerializer(schemaRegistryClient, autoRegister, useLatest, keySerializer);
108110
}
109111

110112
@Override
@@ -136,10 +138,12 @@ static class AvroMessageSerializer extends AbstractKafkaAvroSerializer
136138
protected final Serializer keySerializer;
137139

138140
AvroMessageSerializer(
139-
SchemaRegistryClient schemaRegistryClient, boolean autoRegister, Serializer keySerializer
141+
SchemaRegistryClient schemaRegistryClient,
142+
boolean autoRegister, boolean useLatest, Serializer keySerializer
140143
) {
141144
this.schemaRegistry = schemaRegistryClient;
142145
this.autoRegisterSchema = autoRegister;
146+
this.useLatestVersion = useLatest;
143147
this.keySerializer = keySerializer;
144148
}
145149

avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@ public abstract class AbstractKafkaAvroSerializer extends AbstractKafkaSchemaSer
4040

4141
private final EncoderFactory encoderFactory = EncoderFactory.get();
4242
protected boolean autoRegisterSchema;
43+
protected boolean useLatestVersion;
4344

4445
protected void configure(KafkaAvroSerializerConfig config) {
4546
configureClientProperties(config, new AvroSchemaProvider());
4647
autoRegisterSchema = config.autoRegisterSchema();
48+
useLatestVersion = config.useLatestVersion();
4749
}
4850

4951
protected KafkaAvroSerializerConfig serializerConfig(Map<String, ?> props) {
@@ -70,6 +72,10 @@ protected byte[] serializeImpl(
7072
if (autoRegisterSchema) {
7173
restClientErrorMsg = "Error registering Avro schema: ";
7274
id = schemaRegistry.register(subject, schema);
75+
} else if (useLatestVersion) {
76+
restClientErrorMsg = "Error retrieving latest version: ";
77+
schema = (AvroSchema) lookupLatestVersion(subject, schema);
78+
id = schemaRegistry.getId(subject, schema);
7379
} else {
7480
restClientErrorMsg = "Error retrieving Avro schema: ";
7581
id = schemaRegistry.getId(subject, schema);

avro-serializer/src/test/java/io/confluent/kafka/formatter/KafkaAvroFormatterTest.java

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.confluent.kafka.formatter;
1717

18+
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
1819
import org.apache.avro.AvroRuntimeException;
1920
import org.apache.avro.Schema;
2021
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -73,7 +74,8 @@ public void testKafkaAvroValueFormatter() {
7374
BufferedReader reader =
7475
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
7576
AvroMessageReader avroReader =
76-
new AvroMessageReader(schemaRegistry, null, recordSchema, "topic1", false, reader, true);
77+
new AvroMessageReader(schemaRegistry, null, recordSchema, "topic1", false, reader,
78+
true, false);
7779
ProducerRecord<byte[], byte[]> message = avroReader.readMessage();
7880

7981
byte[] serializedValue = message.value();
@@ -97,7 +99,8 @@ public void testKafkaAvroKeyValueFormatter() {
9799
BufferedReader reader =
98100
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
99101
AvroMessageReader avroReader =
100-
new AvroMessageReader(schemaRegistry, intSchema, recordSchema, "topic1", true, reader, true);
102+
new AvroMessageReader(schemaRegistry, intSchema, recordSchema, "topic1", true, reader,
103+
true, false);
101104
ProducerRecord<byte[], byte[]> message = avroReader.readMessage();
102105

103106
byte[] serializedKey = message.key();
@@ -125,17 +128,18 @@ public void testKafkaAvroValueWithTimestampFormatter() {
125128
String expectedJson = String.format("%s:%d\t%s",
126129
timestampType.name, timestamp, inputJson);
127130
BufferedReader reader =
128-
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
131+
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
129132
AvroMessageReader avroReader =
130-
new AvroMessageReader(schemaRegistry, null, recordSchema, "topic1", false, reader, true);
133+
new AvroMessageReader(schemaRegistry, null, recordSchema, "topic1", false, reader,
134+
true, false);
131135
ProducerRecord<byte[], byte[]> message = avroReader.readMessage();
132136

133137
byte[] serializedValue = message.value();
134138
ByteArrayOutputStream baos = new ByteArrayOutputStream();
135139
PrintStream ps = new PrintStream(baos);
136140
ConsumerRecord<byte[], byte[]> crecord = new ConsumerRecord<>(
137-
"topic1", 0, 200, timestamp, timestampType, 0, 0, serializedValue.length,
138-
null, serializedValue);
141+
"topic1", 0, 200, timestamp, timestampType, 0, 0, serializedValue.length,
142+
null, serializedValue);
139143
formatter.writeTo(crecord, ps);
140144
String outputJson = baos.toString();
141145

@@ -148,7 +152,8 @@ public void testInvalidFormat() {
148152
BufferedReader reader =
149153
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
150154
AvroMessageReader avroReader =
151-
new AvroMessageReader(schemaRegistry, null, recordSchema, "topic1", false, reader, true);
155+
new AvroMessageReader(schemaRegistry, null, recordSchema, "topic1", false, reader,
156+
true, false);
152157
try {
153158
avroReader.readMessage();
154159
fail("Registering an invalid schema should fail");
@@ -170,7 +175,7 @@ public void testStringKey() {
170175
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
171176
AvroMessageReader avroReader =
172177
new AvroMessageReader(schemaRegistry, null, recordSchema, "topic1", false, reader,
173-
true);
178+
true, false);
174179
ProducerRecord<byte[], byte[]> message = avroReader.readMessage();
175180

176181
byte[] serializedKey = "TestKey".getBytes();
@@ -198,12 +203,12 @@ public void testStringKeyWithTimestamp() {
198203

199204
String inputJson = "{\"name\":\"myname\"}\n";
200205
String expectedJson = String.format("%s:%d\tTestKey\t%s",
201-
timestampType.name, timestamp, inputJson);
206+
timestampType.name, timestamp, inputJson);
202207
BufferedReader reader =
203-
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
208+
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
204209
AvroMessageReader avroReader =
205-
new AvroMessageReader(schemaRegistry, null, recordSchema, "topic1", false, reader,
206-
true);
210+
new AvroMessageReader(schemaRegistry, null, recordSchema, "topic1", false, reader,
211+
true, false);
207212
ProducerRecord<byte[], byte[]> message = avroReader.readMessage();
208213

209214
byte[] serializedKey = "TestKey".getBytes();
@@ -218,4 +223,30 @@ public void testStringKeyWithTimestamp() {
218223

219224
assertEquals("Input key/value json should match output key/value json", expectedJson, outputJson);
220225
}
226+
227+
@Test
228+
public void testKafkaAvroValueUsingLatestVersion() throws Exception {
229+
formatter.init(props);
230+
231+
schemaRegistry.register("topic1-value", new AvroSchema(recordSchema));
232+
233+
String inputJson = "{\"name\":\"myname\"}\n";
234+
BufferedReader reader =
235+
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
236+
AvroMessageReader avroReader =
237+
new AvroMessageReader(schemaRegistry, null, recordSchema, "topic1", false, reader,
238+
false, true);
239+
ProducerRecord<byte[], byte[]> message = avroReader.readMessage();
240+
241+
byte[] serializedValue = message.value();
242+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
243+
PrintStream ps = new PrintStream(baos);
244+
ConsumerRecord<byte[], byte[]> crecord = new ConsumerRecord<>(
245+
"topic1", 0, 200, 1000, TimestampType.LOG_APPEND_TIME, 0, 0, serializedValue.length,
246+
null, serializedValue);
247+
formatter.writeTo(crecord, ps);
248+
String outputJson = baos.toString();
249+
250+
assertEquals("Input value json should match output value json", inputJson, outputJson);
251+
}
221252
}

json-schema-serializer/src/main/java/io/confluent/kafka/formatter/json/JsonSchemaMessageReader.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,19 +97,22 @@ public JsonSchemaMessageReader() {
9797
String topic,
9898
boolean parseKey,
9999
BufferedReader reader,
100-
boolean autoRegister
100+
boolean autoRegister,
101+
boolean useLatest
101102
) {
102103
super(schemaRegistryClient, keySchema, valueSchema, topic,
103-
parseKey, reader, autoRegister);
104+
parseKey, reader, autoRegister, useLatest);
104105
}
105106

106107
@Override
107108
protected SchemaMessageSerializer<JsonNode> createSerializer(
108109
SchemaRegistryClient schemaRegistryClient,
109110
boolean autoRegister,
111+
boolean useLatest,
110112
Serializer keySerializer
111113
) {
112-
return new JsonSchemaMessageSerializer(schemaRegistryClient, autoRegister, keySerializer);
114+
return new JsonSchemaMessageSerializer(
115+
schemaRegistryClient, autoRegister, useLatest, keySerializer);
113116
}
114117

115118
@Override
@@ -132,10 +135,12 @@ static class JsonSchemaMessageSerializer extends AbstractKafkaJsonSchemaSerializ
132135
protected final Serializer keySerializer;
133136

134137
JsonSchemaMessageSerializer(
135-
SchemaRegistryClient schemaRegistryClient, boolean autoRegister, Serializer keySerializer
138+
SchemaRegistryClient schemaRegistryClient,
139+
boolean autoRegister, boolean useLatest, Serializer keySerializer
136140
) {
137141
this.schemaRegistry = schemaRegistryClient;
138142
this.autoRegisterSchema = autoRegister;
143+
this.useLatestVersion = useLatest;
139144
this.keySerializer = keySerializer;
140145
this.validate = true;
141146
}

json-schema-serializer/src/main/java/io/confluent/kafka/serializers/json/AbstractKafkaJsonSchemaSerializer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@
3636
public abstract class AbstractKafkaJsonSchemaSerializer<T> extends AbstractKafkaSchemaSerDe {
3737

3838
protected boolean autoRegisterSchema;
39+
protected boolean useLatestVersion;
3940
protected ObjectMapper objectMapper = Jackson.newObjectMapper();
4041
protected boolean validate;
4142

4243
protected void configure(KafkaJsonSchemaSerializerConfig config) {
4344
configureClientProperties(config, new JsonSchemaProvider());
4445
this.autoRegisterSchema = config.autoRegisterSchema();
46+
this.useLatestVersion = config.useLatestVersion();
4547
boolean prettyPrint = config.getBoolean(KafkaJsonSchemaSerializerConfig.JSON_INDENT_OUTPUT);
4648
this.objectMapper.configure(SerializationFeature.INDENT_OUTPUT, prettyPrint);
4749
this.validate = config.getBoolean(KafkaJsonSchemaSerializerConfig.FAIL_INVALID_SCHEMA);
@@ -74,6 +76,10 @@ protected byte[] serializeImpl(
7476
if (autoRegisterSchema) {
7577
restClientErrorMsg = "Error registering JSON schema: ";
7678
id = schemaRegistry.register(subject, schema);
79+
} else if (useLatestVersion) {
80+
restClientErrorMsg = "Error retrieving latest version: ";
81+
schema = (JsonSchema) lookupLatestVersion(subject, schema);
82+
id = schemaRegistry.getId(subject, schema);
7783
} else {
7884
restClientErrorMsg = "Error retrieving JSON schema: ";
7985
id = schemaRegistry.getId(subject, schema);

json-schema-serializer/src/test/java/io/confluent/kafka/formatter/json/KafkaJsonSchemaFormatterTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void testKafkaJsonSchemaValueFormatter() throws Exception {
7171
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
7272
JsonSchemaMessageReader jsonSchemaReader =
7373
new JsonSchemaMessageReader(schemaRegistry, null, recordSchema, "topic1", false, reader,
74-
true);
74+
true, false);
7575
ProducerRecord<byte[], byte[]> message = jsonSchemaReader.readMessage();
7676

7777
byte[] serializedValue = message.value();
@@ -98,7 +98,7 @@ public void testKafkaJsonSchemaKeyValueFormatter() throws Exception {
9898
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
9999
JsonSchemaMessageReader jsonSchemaReader =
100100
new JsonSchemaMessageReader(schemaRegistry, keySchema, recordSchema, "topic1", true, reader,
101-
true);
101+
true, false);
102102
ProducerRecord<byte[], byte[]> message = jsonSchemaReader.readMessage();
103103

104104
byte[] serializedKey = message.key();
@@ -123,7 +123,7 @@ public void testInvalidFormat() {
123123
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
124124
JsonSchemaMessageReader jsonSchemaReader =
125125
new JsonSchemaMessageReader(schemaRegistry, null, recordSchema, "topic1", false, reader,
126-
true);
126+
true, false);
127127
try {
128128
jsonSchemaReader.readMessage();
129129
fail("Registering an invalid schema should fail");

protobuf-serializer/src/main/java/io/confluent/kafka/formatter/protobuf/ProtobufMessageReader.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,22 @@ public ProtobufMessageReader() {
6868
String topic,
6969
boolean parseKey,
7070
BufferedReader reader,
71-
boolean autoRegister
71+
boolean autoRegister,
72+
boolean useLatest
7273
) {
7374
super(schemaRegistryClient, keySchema, valueSchema, topic,
74-
parseKey, reader, autoRegister);
75+
parseKey, reader, autoRegister, useLatest);
7576
}
7677

7778
@Override
7879
protected SchemaMessageSerializer<Message> createSerializer(
7980
SchemaRegistryClient schemaRegistryClient,
8081
boolean autoRegister,
82+
boolean useLatest,
8183
Serializer keySerializer
8284
) {
83-
return new ProtobufMessageSerializer(schemaRegistryClient, autoRegister, keySerializer);
85+
return new ProtobufMessageSerializer(
86+
schemaRegistryClient, autoRegister, useLatest, keySerializer);
8487
}
8588

8689
@Override
@@ -107,10 +110,12 @@ static class ProtobufMessageSerializer extends AbstractKafkaProtobufSerializer
107110
protected final Serializer keySerializer;
108111

109112
ProtobufMessageSerializer(
110-
SchemaRegistryClient schemaRegistryClient, boolean autoRegister, Serializer keySerializer
113+
SchemaRegistryClient schemaRegistryClient,
114+
boolean autoRegister, boolean useLatest, Serializer keySerializer
111115
) {
112116
this.schemaRegistry = schemaRegistryClient;
113117
this.autoRegisterSchema = autoRegister;
118+
this.useLatestVersion = useLatest;
114119
this.keySerializer = keySerializer;
115120
}
116121

protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/AbstractKafkaProtobufSerializer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@ public abstract class AbstractKafkaProtobufSerializer<T extends Message>
4242
extends AbstractKafkaSchemaSerDe {
4343

4444
protected boolean autoRegisterSchema;
45+
protected boolean useLatestVersion;
4546
protected ReferenceSubjectNameStrategy referenceSubjectNameStrategy;
4647

4748
protected void configure(KafkaProtobufSerializerConfig config) {
4849
configureClientProperties(config, new ProtobufSchemaProvider());
4950
this.autoRegisterSchema = config.autoRegisterSchema();
51+
this.useLatestVersion = config.useLatestVersion();
5052
this.referenceSubjectNameStrategy = config.referenceSubjectNameStrategyInstance();
5153
}
5254

@@ -77,6 +79,10 @@ protected byte[] serializeImpl(
7779
if (autoRegisterSchema) {
7880
restClientErrorMsg = "Error registering Protobuf schema: ";
7981
id = schemaRegistry.register(subject, schema);
82+
} else if (useLatestVersion) {
83+
restClientErrorMsg = "Error retrieving latest version: ";
84+
schema = (ProtobufSchema) lookupLatestVersion(subject, schema);
85+
id = schemaRegistry.getId(subject, schema);
8086
} else {
8187
restClientErrorMsg = "Error retrieving Protobuf schema: ";
8288
id = schemaRegistry.getId(subject, schema);

protobuf-serializer/src/test/java/io/confluent/kafka/formatter/protobuf/KafkaProtobufFormatterTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void testKafkaProtobufValueFormatter() throws Exception {
7676
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
7777
ProtobufMessageReader protobufReader =
7878
new ProtobufMessageReader(schemaRegistry, null, recordSchema, "topic1", false, reader,
79-
true);
79+
true, false);
8080
ProducerRecord<byte[], byte[]> message = protobufReader.readMessage();
8181

8282
byte[] serializedValue = message.value();
@@ -102,7 +102,7 @@ public void testKafkaProtobufEnumValueFormatter() throws Exception {
102102
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
103103
ProtobufMessageReader protobufReader =
104104
new ProtobufMessageReader(schemaRegistry, null, enumSchema, "topic1", false, reader,
105-
true);
105+
true, false);
106106
ProducerRecord<byte[], byte[]> message = protobufReader.readMessage();
107107

108108
byte[] serializedValue = message.value();
@@ -129,7 +129,7 @@ public void testKafkaProtobufKeyValueFormatter() throws Exception {
129129
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
130130
ProtobufMessageReader protobufReader =
131131
new ProtobufMessageReader(schemaRegistry, keySchema, recordSchema, "topic1", true, reader,
132-
true);
132+
true, false);
133133
ProducerRecord<byte[], byte[]> message = protobufReader.readMessage();
134134

135135
byte[] serializedKey = message.key();
@@ -154,7 +154,7 @@ public void testInvalidFormat() {
154154
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
155155
ProtobufMessageReader protobufReader =
156156
new ProtobufMessageReader(schemaRegistry, null, recordSchema, "topic1", false, reader,
157-
true);
157+
true, false);
158158
try {
159159
protobufReader.readMessage();
160160
fail("Registering an invalid schema should fail");

0 commit comments

Comments
 (0)