Skip to content

Commit 8c551d7

Browse files
committed
feat(deserializer): implement schema resolution strategies for protobuf messages
1 parent 59a92fa commit 8c551d7

File tree

9 files changed

+309
-100
lines changed

9 files changed

+309
-100
lines changed

clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ public class TopicConfig {
265265
public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace";
266266
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
267267
public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type";
268-
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema";
268+
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema, schema_latest";
269269
public static final String TABLE_TOPIC_ID_COLUMNS_CONFIG = "automq.table.topic.id.columns";
270270
public static final String TABLE_TOPIC_ID_COLUMNS_DOC = "The primary key, comma-separated list of columns that identify a row in tables."
271271
+ "ex. [region, name]";

core/src/main/java/kafka/automq/table/deserializer/proto/AbstractCustomKafkaProtobufDeserializer.java

Lines changed: 18 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.io.ByteArrayInputStream;
3434
import java.io.IOException;
3535
import java.io.InterruptedIOException;
36-
import java.nio.ByteBuffer;
3736
import java.util.Map;
3837
import java.util.Objects;
3938

@@ -43,13 +42,18 @@
4342

4443
public abstract class AbstractCustomKafkaProtobufDeserializer<T extends Message>
4544
extends AbstractKafkaSchemaSerDe {
46-
private static final int SCHEMA_ID_SIZE = 4;
47-
private static final int HEADER_SIZE = SCHEMA_ID_SIZE + 1; // magic byte + schema id
4845

4946
protected final Map<SchemaKey, ProtobufSchemaWrapper> schemaCache;
47+
protected final SchemaResolutionResolver schemaResolutionResolver;
5048

5149
public AbstractCustomKafkaProtobufDeserializer() {
5250
this.schemaCache = new BoundedConcurrentHashMap<>(1000);
51+
this.schemaResolutionResolver = new HeaderBasedSchemaResolutionResolver();
52+
}
53+
54+
public AbstractCustomKafkaProtobufDeserializer(SchemaResolutionResolver schemaResolutionResolver) {
55+
this.schemaCache = new BoundedConcurrentHashMap<>(1000);
56+
this.schemaResolutionResolver = schemaResolutionResolver != null ? schemaResolutionResolver : new HeaderBasedSchemaResolutionResolver();
5357
}
5458

5559
protected void configure(CustomKafkaProtobufDeserializerConfig config) {
@@ -76,30 +80,27 @@ protected T deserialize(String topic, Headers headers, byte[] payload)
7680
throw new InvalidConfigurationException("Schema registry not found, make sure the schema.registry.url is set");
7781
}
7882

79-
int schemaId = 0;
80-
byte[] messageBytes;
81-
MessageIndexes indexes;
82-
Message message;
83-
8483
try {
85-
// Phase 2: Message Header Parsing
86-
ByteBuffer buffer = processHeader(payload);
87-
schemaId = extractSchemaId(buffer);
88-
indexes = extractMessageIndexes(buffer);
89-
messageBytes = extractMessageBytes(buffer);
84+
// Phase 2: Schema Resolution
85+
SchemaResolutionResolver.SchemaResolution resolution = schemaResolutionResolver.resolve(topic, payload);
86+
int schemaId = resolution.getSchemaId();
87+
MessageIndexes indexes = resolution.getIndexes();
88+
byte[] messageBytes = resolution.getMessageBytes();
9089

9190
// Phase 3: Schema Processing
9291
ProtobufSchemaWrapper protobufSchemaWrapper = processSchema(topic, schemaId, indexes);
9392
Descriptors.Descriptor targetDescriptor = protobufSchemaWrapper.getDescriptor();
9493

9594
// Phase 4: Message Deserialization
96-
message = deserializeMessage(targetDescriptor, messageBytes);
95+
Message message = deserializeMessage(targetDescriptor, messageBytes);
9796

98-
return (T) message;
97+
@SuppressWarnings("unchecked")
98+
T result = (T) message;
99+
return result;
99100
} catch (InterruptedIOException e) {
100-
throw new TimeoutException("Error deserializing Protobuf message for id " + schemaId, e);
101+
throw new TimeoutException("Error deserializing Protobuf message", e);
101102
} catch (IOException | RuntimeException e) {
102-
throw new SerializationException("Error deserializing Protobuf message for id " + schemaId, e);
103+
throw new SerializationException("Error deserializing Protobuf message", e);
103104
}
104105
}
105106

@@ -110,62 +111,6 @@ private Message deserializeMessage(Descriptors.Descriptor descriptor, byte[] mes
110111
return DynamicMessage.parseFrom(descriptor, new ByteArrayInputStream(messageBytes));
111112
}
112113

113-
/**
114-
* Phase 2a: Process the header of the message
115-
*
116-
* @param payload The serialized payload
117-
* @return ByteBuffer positioned after the magic byte
118-
*/
119-
protected ByteBuffer processHeader(byte[] payload) {
120-
return getByteBuffer(payload);
121-
}
122-
123-
protected ByteBuffer getByteBuffer(byte[] payload) {
124-
if (payload == null || payload.length < HEADER_SIZE) {
125-
throw new SerializationException("Invalid payload size");
126-
}
127-
ByteBuffer buffer = ByteBuffer.wrap(payload);
128-
byte magicByte = buffer.get();
129-
if (magicByte != MAGIC_BYTE) {
130-
throw new SerializationException("Unknown magic byte: " + magicByte);
131-
}
132-
return buffer;
133-
}
134-
135-
/**
136-
* Phase 2b: Extract the schema ID from the buffer
137-
*
138-
* @param buffer The byte buffer positioned after the magic byte
139-
* @return The schema ID
140-
*/
141-
protected int extractSchemaId(ByteBuffer buffer) {
142-
return buffer.getInt();
143-
}
144-
145-
/**
146-
* Phase 2c: Extract message indexes from the buffer
147-
*
148-
* @param buffer The byte buffer positioned after the schema ID
149-
* @return The message indexes
150-
*/
151-
protected MessageIndexes extractMessageIndexes(ByteBuffer buffer) {
152-
return MessageIndexes.readFrom(buffer);
153-
}
154-
155-
/**
156-
* Phase 2d: Extract the actual message bytes from the buffer
157-
*
158-
* @param buffer The byte buffer positioned after the message indexes
159-
* @return The message bytes
160-
*/
161-
protected byte[] extractMessageBytes(ByteBuffer buffer) {
162-
int messageLength = buffer.remaining();
163-
164-
byte[] messageBytes = new byte[messageLength];
165-
buffer.get(messageBytes);
166-
return messageBytes;
167-
}
168-
169114
/**
170115
* Phase 3: Process and retrieve the schema
171116
*

core/src/main/java/kafka/automq/table/deserializer/proto/CustomKafkaProtobufDeserializer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ public CustomKafkaProtobufDeserializer(SchemaRegistryClient schemaRegistry) {
3939
this.schemaRegistry = schemaRegistry;
4040
}
4141

42+
public CustomKafkaProtobufDeserializer(SchemaRegistryClient schemaRegistry, SchemaResolutionResolver schemaResolutionResolver) {
43+
super(schemaResolutionResolver);
44+
this.schemaRegistry = schemaRegistry;
45+
}
46+
4247
@Override
4348
public void configure(Map<String, ?> configs, boolean isKey) {
4449
CustomKafkaProtobufDeserializerConfig config = new CustomKafkaProtobufDeserializerConfig(configs);
@@ -63,4 +68,4 @@ public void close() {
6368
throw new RuntimeException("Exception while closing deserializer", e);
6469
}
6570
}
66-
}
71+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package kafka.automq.table.deserializer.proto;
21+
22+
import kafka.automq.table.deserializer.proto.schema.MessageIndexes;
23+
24+
import org.apache.kafka.common.errors.SerializationException;
25+
26+
import java.nio.ByteBuffer;
27+
28+
/**
29+
* Default implementation of SchemaResolutionResolver that parses schema information from message headers.
30+
* This implementation handles the standard Confluent Kafka protobuf message format with magic byte,
31+
* schema ID, message indexes, and message payload.
32+
*/
33+
public class HeaderBasedSchemaResolutionResolver implements SchemaResolutionResolver {
34+
35+
private static final int SCHEMA_ID_SIZE = 4;
36+
private static final int HEADER_SIZE = SCHEMA_ID_SIZE + 1; // magic byte + schema id
37+
private static final byte MAGIC_BYTE = 0x0;
38+
39+
@Override
40+
public SchemaResolution resolve(String topic, byte[] payload) {
41+
if (payload == null) {
42+
throw new SerializationException("Payload cannot be null");
43+
}
44+
45+
if (payload.length < HEADER_SIZE) {
46+
throw new SerializationException("Invalid payload size: " + payload.length + ", expected at least " + HEADER_SIZE);
47+
}
48+
49+
ByteBuffer buffer = ByteBuffer.wrap(payload);
50+
51+
// Extract magic byte
52+
byte magicByte = buffer.get();
53+
if (magicByte != MAGIC_BYTE) {
54+
throw new SerializationException("Unknown magic byte: " + magicByte);
55+
}
56+
57+
// Extract schema ID
58+
int schemaId = buffer.getInt();
59+
60+
// Extract message indexes
61+
MessageIndexes indexes = MessageIndexes.readFrom(buffer);
62+
63+
// Extract message bytes
64+
int messageLength = buffer.remaining();
65+
byte[] messageBytes = new byte[messageLength];
66+
buffer.get(messageBytes);
67+
68+
return new SchemaResolution(schemaId, indexes, messageBytes);
69+
}
70+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package kafka.automq.table.deserializer.proto;
21+
22+
import org.apache.kafka.common.errors.SerializationException;
23+
24+
import com.automq.stream.utils.Time;
25+
26+
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
27+
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
28+
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
29+
import kafka.automq.table.deserializer.proto.schema.MessageIndexes;
30+
31+
import java.io.IOException;
32+
import java.util.Collections;
33+
import java.util.concurrent.ConcurrentHashMap;
34+
35+
/**
36+
* Implementation of SchemaResolutionResolver that retrieves the latest schema from Schema Registry by subject name.
37+
* This implementation includes caching mechanism to avoid frequent registry queries.
38+
* Cache entries are refreshed every 5 minutes.
39+
*/
40+
public class RegistryBasedSchemaResolutionResolver implements SchemaResolutionResolver {
41+
42+
private static final long CACHE_REFRESH_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
43+
private static final MessageIndexes DEFAULT_INDEXES = new MessageIndexes(Collections.singletonList(0));
44+
45+
private final SchemaRegistryClient schemaRegistry;
46+
private final ConcurrentHashMap<String, RegistryBasedSchemaResolutionResolver.CachedSchemaInfo> schemaCache = new ConcurrentHashMap<>();
47+
private final Time time;
48+
49+
public RegistryBasedSchemaResolutionResolver(SchemaRegistryClient schemaRegistry) {
50+
this.schemaRegistry = schemaRegistry;
51+
time = Time.SYSTEM;
52+
}
53+
54+
@Override
55+
public SchemaResolution resolve(String topic, byte[] payload) {
56+
if (payload == null) {
57+
throw new SerializationException("Payload cannot be null");
58+
}
59+
60+
String subject = getSubjectName(topic);
61+
RegistryBasedSchemaResolutionResolver.CachedSchemaInfo cachedInfo = getCachedSchemaInfo(subject);
62+
63+
return new SchemaResolution(cachedInfo.schemaId, DEFAULT_INDEXES, payload);
64+
}
65+
66+
private RegistryBasedSchemaResolutionResolver.CachedSchemaInfo getCachedSchemaInfo(String subject) {
67+
long currentTime = time.milliseconds();
68+
69+
return schemaCache.compute(subject, (key, existing) -> {
70+
// If we have existing data and it's still fresh, use it
71+
if (existing != null && currentTime - existing.lastUpdated <= CACHE_REFRESH_INTERVAL_MS) {
72+
return existing;
73+
}
74+
75+
// Try to get fresh data from registry
76+
try {
77+
SchemaMetadata latestSchema = schemaRegistry.getLatestSchemaMetadata(subject);
78+
return new RegistryBasedSchemaResolutionResolver.CachedSchemaInfo(latestSchema.getId(), currentTime);
79+
} catch (IOException | RestClientException e) {
80+
// If we have existing cached data (even if expired), use it as fallback
81+
if (existing != null) {
82+
// Log warning but continue with stale data
83+
System.err.println("Warning: Failed to refresh schema for subject " + subject +
84+
", using cached data from " +
85+
new java.util.Date(existing.lastUpdated) + ": " + e.getMessage());
86+
return existing;
87+
}
88+
// No cached data and fresh fetch failed - this is a hard error
89+
throw new SerializationException("Error retrieving schema for subject " + subject +
90+
" and no cached data available", e);
91+
}
92+
});
93+
}
94+
95+
private String getSubjectName(String topic) {
96+
// Follow the Confluent naming convention: <topic>-value or <topic>-key
97+
return topic + "-value";
98+
}
99+
100+
private static class CachedSchemaInfo {
101+
final int schemaId;
102+
final long lastUpdated;
103+
104+
CachedSchemaInfo(int schemaId, long lastUpdated) {
105+
this.schemaId = schemaId;
106+
this.lastUpdated = lastUpdated;
107+
}
108+
}
109+
}

0 commit comments

Comments
 (0)