From 8c551d7ae59dafdac1c5d8105ec63e7b62688cac Mon Sep 17 00:00:00 2001 From: Gezi-lzq Date: Mon, 11 Aug 2025 20:21:20 +0800 Subject: [PATCH 1/3] feat(deserializer): implement schema resolution strategies for protobuf messages --- .../kafka/common/config/TopicConfig.java | 2 +- ...stractCustomKafkaProtobufDeserializer.java | 91 +++------------ .../CustomKafkaProtobufDeserializer.java | 7 +- .../HeaderBasedSchemaResolutionResolver.java | 70 +++++++++++ ...RegistryBasedSchemaResolutionResolver.java | 109 ++++++++++++++++++ .../proto/SchemaResolutionResolver.java | 68 +++++++++++ .../table/transformer/ConverterFactory.java | 54 +++++---- .../ProtobufKafkaRecordConvert.java | 5 + .../server/record/TableTopicSchemaType.java | 3 +- 9 files changed, 309 insertions(+), 100 deletions(-) create mode 100644 core/src/main/java/kafka/automq/table/deserializer/proto/HeaderBasedSchemaResolutionResolver.java create mode 100644 core/src/main/java/kafka/automq/table/deserializer/proto/RegistryBasedSchemaResolutionResolver.java create mode 100644 core/src/main/java/kafka/automq/table/deserializer/proto/SchemaResolutionResolver.java diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 0584a8ee14..b416e7ce7c 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -265,7 +265,7 @@ public class TopicConfig { public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace"; public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace"; public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type"; - public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema"; + public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema, schema_latest"; public static final String TABLE_TOPIC_ID_COLUMNS_CONFIG = "automq.table.topic.id.columns"; public static final String TABLE_TOPIC_ID_COLUMNS_DOC = "The primary key, comma-separated list of columns that identify a row in tables." + "ex. [region, name]"; diff --git a/core/src/main/java/kafka/automq/table/deserializer/proto/AbstractCustomKafkaProtobufDeserializer.java b/core/src/main/java/kafka/automq/table/deserializer/proto/AbstractCustomKafkaProtobufDeserializer.java index 605afca2a1..4874b1e644 100644 --- a/core/src/main/java/kafka/automq/table/deserializer/proto/AbstractCustomKafkaProtobufDeserializer.java +++ b/core/src/main/java/kafka/automq/table/deserializer/proto/AbstractCustomKafkaProtobufDeserializer.java @@ -33,7 +33,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InterruptedIOException; -import java.nio.ByteBuffer; import java.util.Map; import java.util.Objects; @@ -43,13 +42,18 @@ public abstract class AbstractCustomKafkaProtobufDeserializer extends AbstractKafkaSchemaSerDe { - private static final int SCHEMA_ID_SIZE = 4; - private static final int HEADER_SIZE = SCHEMA_ID_SIZE + 1; // magic byte + schema id protected final Map schemaCache; + protected final SchemaResolutionResolver schemaResolutionResolver; public AbstractCustomKafkaProtobufDeserializer() { this.schemaCache = new BoundedConcurrentHashMap<>(1000); + this.schemaResolutionResolver = new HeaderBasedSchemaResolutionResolver(); + } + + public AbstractCustomKafkaProtobufDeserializer(SchemaResolutionResolver schemaResolutionResolver) { + this.schemaCache = new BoundedConcurrentHashMap<>(1000); + this.schemaResolutionResolver = schemaResolutionResolver != null ? schemaResolutionResolver : new HeaderBasedSchemaResolutionResolver(); } protected void configure(CustomKafkaProtobufDeserializerConfig config) { @@ -76,30 +80,27 @@ protected T deserialize(String topic, Headers headers, byte[] payload) throw new InvalidConfigurationException("Schema registry not found, make sure the schema.registry.url is set"); } - int schemaId = 0; - byte[] messageBytes; - MessageIndexes indexes; - Message message; - try { - // Phase 2: Message Header Parsing - ByteBuffer buffer = processHeader(payload); - schemaId = extractSchemaId(buffer); - indexes = extractMessageIndexes(buffer); - messageBytes = extractMessageBytes(buffer); + // Phase 2: Schema Resolution + SchemaResolutionResolver.SchemaResolution resolution = schemaResolutionResolver.resolve(topic, payload); + int schemaId = resolution.getSchemaId(); + MessageIndexes indexes = resolution.getIndexes(); + byte[] messageBytes = resolution.getMessageBytes(); // Phase 3: Schema Processing ProtobufSchemaWrapper protobufSchemaWrapper = processSchema(topic, schemaId, indexes); Descriptors.Descriptor targetDescriptor = protobufSchemaWrapper.getDescriptor(); // Phase 4: Message Deserialization - message = deserializeMessage(targetDescriptor, messageBytes); + Message message = deserializeMessage(targetDescriptor, messageBytes); - return (T) message; + @SuppressWarnings("unchecked") + T result = (T) message; + return result; } catch (InterruptedIOException e) { - throw new TimeoutException("Error deserializing Protobuf message for id " + schemaId, e); + throw new TimeoutException("Error deserializing Protobuf message", e); } catch (IOException | RuntimeException e) { - throw new SerializationException("Error deserializing Protobuf message for id " + schemaId, e); + throw new SerializationException("Error deserializing Protobuf message", e); } } @@ -110,62 +111,6 @@ private Message deserializeMessage(Descriptors.Descriptor descriptor, byte[] mes return DynamicMessage.parseFrom(descriptor, new ByteArrayInputStream(messageBytes)); } - /** - * Phase 2a: Process the header of the message - * - * @param payload The serialized payload - * @return ByteBuffer positioned after the magic byte - */ - protected ByteBuffer processHeader(byte[] payload) { - return getByteBuffer(payload); - } - - protected ByteBuffer getByteBuffer(byte[] payload) { - if (payload == null || payload.length < HEADER_SIZE) { - throw new SerializationException("Invalid payload size"); - } - ByteBuffer buffer = ByteBuffer.wrap(payload); - byte magicByte = buffer.get(); - if (magicByte != MAGIC_BYTE) { - throw new SerializationException("Unknown magic byte: " + magicByte); - } - return buffer; - } - - /** - * Phase 2b: Extract the schema ID from the buffer - * - * @param buffer The byte buffer positioned after the magic byte - * @return The schema ID - */ - protected int extractSchemaId(ByteBuffer buffer) { - return buffer.getInt(); - } - - /** - * Phase 2c: Extract message indexes from the buffer - * - * @param buffer The byte buffer positioned after the schema ID - * @return The message indexes - */ - protected MessageIndexes extractMessageIndexes(ByteBuffer buffer) { - return MessageIndexes.readFrom(buffer); - } - - /** - * Phase 2d: Extract the actual message bytes from the buffer - * - * @param buffer The byte buffer positioned after the message indexes - * @return The message bytes - */ - protected byte[] extractMessageBytes(ByteBuffer buffer) { - int messageLength = buffer.remaining(); - - byte[] messageBytes = new byte[messageLength]; - buffer.get(messageBytes); - return messageBytes; - } - /** * Phase 3: Process and retrieve the schema * diff --git a/core/src/main/java/kafka/automq/table/deserializer/proto/CustomKafkaProtobufDeserializer.java b/core/src/main/java/kafka/automq/table/deserializer/proto/CustomKafkaProtobufDeserializer.java index 310120e3dc..e44193d52b 100644 --- a/core/src/main/java/kafka/automq/table/deserializer/proto/CustomKafkaProtobufDeserializer.java +++ b/core/src/main/java/kafka/automq/table/deserializer/proto/CustomKafkaProtobufDeserializer.java @@ -39,6 +39,11 @@ public CustomKafkaProtobufDeserializer(SchemaRegistryClient schemaRegistry) { this.schemaRegistry = schemaRegistry; } + public CustomKafkaProtobufDeserializer(SchemaRegistryClient schemaRegistry, SchemaResolutionResolver schemaResolutionResolver) { + super(schemaResolutionResolver); + this.schemaRegistry = schemaRegistry; + } + @Override public void configure(Map configs, boolean isKey) { CustomKafkaProtobufDeserializerConfig config = new CustomKafkaProtobufDeserializerConfig(configs); @@ -63,4 +68,4 @@ public void close() { throw new RuntimeException("Exception while closing deserializer", e); } } -} \ No newline at end of file +} diff --git a/core/src/main/java/kafka/automq/table/deserializer/proto/HeaderBasedSchemaResolutionResolver.java b/core/src/main/java/kafka/automq/table/deserializer/proto/HeaderBasedSchemaResolutionResolver.java new file mode 100644 index 0000000000..e900db1e3b --- /dev/null +++ b/core/src/main/java/kafka/automq/table/deserializer/proto/HeaderBasedSchemaResolutionResolver.java @@ -0,0 +1,70 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.deserializer.proto; + +import kafka.automq.table.deserializer.proto.schema.MessageIndexes; + +import org.apache.kafka.common.errors.SerializationException; + +import java.nio.ByteBuffer; + +/** + * Default implementation of SchemaResolutionResolver that parses schema information from message headers. + * This implementation handles the standard Confluent Kafka protobuf message format with magic byte, + * schema ID, message indexes, and message payload. + */ +public class HeaderBasedSchemaResolutionResolver implements SchemaResolutionResolver { + + private static final int SCHEMA_ID_SIZE = 4; + private static final int HEADER_SIZE = SCHEMA_ID_SIZE + 1; // magic byte + schema id + private static final byte MAGIC_BYTE = 0x0; + + @Override + public SchemaResolution resolve(String topic, byte[] payload) { + if (payload == null) { + throw new SerializationException("Payload cannot be null"); + } + + if (payload.length < HEADER_SIZE) { + throw new SerializationException("Invalid payload size: " + payload.length + ", expected at least " + HEADER_SIZE); + } + + ByteBuffer buffer = ByteBuffer.wrap(payload); + + // Extract magic byte + byte magicByte = buffer.get(); + if (magicByte != MAGIC_BYTE) { + throw new SerializationException("Unknown magic byte: " + magicByte); + } + + // Extract schema ID + int schemaId = buffer.getInt(); + + // Extract message indexes + MessageIndexes indexes = MessageIndexes.readFrom(buffer); + + // Extract message bytes + int messageLength = buffer.remaining(); + byte[] messageBytes = new byte[messageLength]; + buffer.get(messageBytes); + + return new SchemaResolution(schemaId, indexes, messageBytes); + } +} diff --git a/core/src/main/java/kafka/automq/table/deserializer/proto/RegistryBasedSchemaResolutionResolver.java b/core/src/main/java/kafka/automq/table/deserializer/proto/RegistryBasedSchemaResolutionResolver.java new file mode 100644 index 0000000000..4a7d5a7ce4 --- /dev/null +++ b/core/src/main/java/kafka/automq/table/deserializer/proto/RegistryBasedSchemaResolutionResolver.java @@ -0,0 +1,109 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.deserializer.proto; + +import org.apache.kafka.common.errors.SerializationException; + +import com.automq.stream.utils.Time; + +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import kafka.automq.table.deserializer.proto.schema.MessageIndexes; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Implementation of SchemaResolutionResolver that retrieves the latest schema from Schema Registry by subject name. + * This implementation includes caching mechanism to avoid frequent registry queries. + * Cache entries are refreshed every 5 minutes. + */ +public class RegistryBasedSchemaResolutionResolver implements SchemaResolutionResolver { + + private static final long CACHE_REFRESH_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + private static final MessageIndexes DEFAULT_INDEXES = new MessageIndexes(Collections.singletonList(0)); + + private final SchemaRegistryClient schemaRegistry; + private final ConcurrentHashMap schemaCache = new ConcurrentHashMap<>(); + private final Time time; + + public RegistryBasedSchemaResolutionResolver(SchemaRegistryClient schemaRegistry) { + this.schemaRegistry = schemaRegistry; + time = Time.SYSTEM; + } + + @Override + public SchemaResolution resolve(String topic, byte[] payload) { + if (payload == null) { + throw new SerializationException("Payload cannot be null"); + } + + String subject = getSubjectName(topic); + RegistryBasedSchemaResolutionResolver.CachedSchemaInfo cachedInfo = getCachedSchemaInfo(subject); + + return new SchemaResolution(cachedInfo.schemaId, DEFAULT_INDEXES, payload); + } + + private RegistryBasedSchemaResolutionResolver.CachedSchemaInfo getCachedSchemaInfo(String subject) { + long currentTime = time.milliseconds(); + + return schemaCache.compute(subject, (key, existing) -> { + // If we have existing data and it's still fresh, use it + if (existing != null && currentTime - existing.lastUpdated <= CACHE_REFRESH_INTERVAL_MS) { + return existing; + } + + // Try to get fresh data from registry + try { + SchemaMetadata latestSchema = schemaRegistry.getLatestSchemaMetadata(subject); + return new RegistryBasedSchemaResolutionResolver.CachedSchemaInfo(latestSchema.getId(), currentTime); + } catch (IOException | RestClientException e) { + // If we have existing cached data (even if expired), use it as fallback + if (existing != null) { + // Log warning but continue with stale data + System.err.println("Warning: Failed to refresh schema for subject " + subject + + ", using cached data from " + + new java.util.Date(existing.lastUpdated) + ": " + e.getMessage()); + return existing; + } + // No cached data and fresh fetch failed - this is a hard error + throw new SerializationException("Error retrieving schema for subject " + subject + + " and no cached data available", e); + } + }); + } + + private String getSubjectName(String topic) { + // Follow the Confluent naming convention: -value or -key + return topic + "-value"; + } + + private static class CachedSchemaInfo { + final int schemaId; + final long lastUpdated; + + CachedSchemaInfo(int schemaId, long lastUpdated) { + this.schemaId = schemaId; + this.lastUpdated = lastUpdated; + } + } +} diff --git a/core/src/main/java/kafka/automq/table/deserializer/proto/SchemaResolutionResolver.java b/core/src/main/java/kafka/automq/table/deserializer/proto/SchemaResolutionResolver.java new file mode 100644 index 0000000000..fa5ca847ee --- /dev/null +++ b/core/src/main/java/kafka/automq/table/deserializer/proto/SchemaResolutionResolver.java @@ -0,0 +1,68 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.deserializer.proto; + +import kafka.automq.table.deserializer.proto.schema.MessageIndexes; + +/** + * Interface for resolving schema information for protobuf message deserialization. + * This interface supports different strategies for obtaining schema ID and message structure: + * - Parse from message header (standard Confluent format) + * - Lookup latest schema from registry by subject name + */ +public interface SchemaResolutionResolver { + + /** + * Resolves schema information for the given message payload and topic. + * + * @param topic The Kafka topic name + * @param payload The serialized protobuf payload + * @return SchemaResolution containing schema ID, message indexes, and message bytes + * @throws org.apache.kafka.common.errors.SerializationException if resolution fails + */ + SchemaResolution resolve(String topic, byte[] payload); + + /** + * Container class for resolved schema information. + */ + class SchemaResolution { + private final int schemaId; + private final MessageIndexes indexes; + private final byte[] messageBytes; + + public SchemaResolution(int schemaId, MessageIndexes indexes, byte[] messageBytes) { + this.schemaId = schemaId; + this.indexes = indexes; + this.messageBytes = messageBytes; + } + + public int getSchemaId() { + return schemaId; + } + + public MessageIndexes getIndexes() { + return indexes; + } + + public byte[] getMessageBytes() { + return messageBytes; + } + } +} diff --git a/core/src/main/java/kafka/automq/table/transformer/ConverterFactory.java b/core/src/main/java/kafka/automq/table/transformer/ConverterFactory.java index e2eb1ef0a1..bd1ae04890 100644 --- a/core/src/main/java/kafka/automq/table/transformer/ConverterFactory.java +++ b/core/src/main/java/kafka/automq/table/transformer/ConverterFactory.java @@ -19,33 +19,29 @@ package kafka.automq.table.transformer; -import kafka.automq.table.deserializer.proto.ProtobufSchemaProvider; - -import org.apache.kafka.server.record.TableTopicSchemaType; - import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; - -import org.apache.avro.generic.GenericRecord; - -import java.io.IOException; -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import kafka.automq.table.deserializer.proto.ProtobufSchemaProvider; +import kafka.automq.table.deserializer.proto.RegistryBasedSchemaResolutionResolver; +import kafka.automq.table.deserializer.proto.SchemaResolutionResolver; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.server.record.TableTopicSchemaType; -import static kafka.automq.table.transformer.SchemaFormat.AVRO; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class ConverterFactory { private final String registryUrl; - private final Map> recordConvertMap = new ConcurrentHashMap<>(); + private final Map> recordConvertMap = new ConcurrentHashMap<>(); private SchemaRegistryClient schemaRegistry; private final Cache topicSchemaFormatCache = CacheBuilder.newBuilder() @@ -80,7 +76,10 @@ public Converter converter(TableTopicSchemaType type, String topic) { return new SchemalessConverter(); } case SCHEMA: { - return new LazyRegistrySchemaConvert(() -> createRegistrySchemaConverter(topic)); + return new LazyRegistrySchemaConvert(() -> createRegistrySchemaConverter(topic, false)); + } + case SCHEMA_LATEST: { + return new LazyRegistrySchemaConvert(() -> createRegistrySchemaConverter(topic, true)); } default: { throw new IllegalArgumentException("Unsupported converter type: " + type); @@ -88,7 +87,7 @@ public Converter converter(TableTopicSchemaType type, String topic) { } } - private Converter createRegistrySchemaConverter(String topic) { + private Converter createRegistrySchemaConverter(String topic, boolean useLatestSchema) { if (schemaRegistry != null) { try { SchemaFormat format = topicSchemaFormatCache.getIfPresent(topic); @@ -104,7 +103,7 @@ private Converter createRegistrySchemaConverter(String topic) { return createAvroConverter(topic); } case PROTOBUF: { - return createProtobufConverter(topic); + return createProtobufConverter(topic, useLatestSchema); } default: { throw new IllegalArgumentException("Unsupported schema format: " + format); @@ -123,14 +122,15 @@ private String getSubjectName(String topic) { } private Converter createAvroConverter(String topic) { - KafkaRecordConvert recordConvert = recordConvertMap.computeIfAbsent(AVRO, + KafkaRecordConvert recordConvert = recordConvertMap.computeIfAbsent(SchemaFormat.AVRO.name(), format -> createKafkaAvroRecordConvert(registryUrl)); return new RegistrySchemaAvroConverter(recordConvert, topic); } - private Converter createProtobufConverter(String topic) { - KafkaRecordConvert recordConvert = recordConvertMap.computeIfAbsent(SchemaFormat.PROTOBUF, - format -> createKafkaProtobufRecordConvert(registryUrl)); + private Converter createProtobufConverter(String topic, boolean useLatestSchema) { + String cacheKey = useLatestSchema ? SchemaFormat.PROTOBUF.name() + "_LATEST" : SchemaFormat.PROTOBUF.name(); + KafkaRecordConvert recordConvert = recordConvertMap.computeIfAbsent(cacheKey, + key -> createKafkaProtobufRecordConvert(registryUrl, useLatestSchema)); return new RegistrySchemaAvroConverter(recordConvert, topic); } @@ -142,8 +142,14 @@ private KafkaRecordConvert createKafkaAvroRecordConvert(String re } @SuppressWarnings("unchecked") - private KafkaRecordConvert createKafkaProtobufRecordConvert(String registryUrl) { - ProtobufKafkaRecordConvert protobufKafkaRecordConvert = new ProtobufKafkaRecordConvert(schemaRegistry); + private KafkaRecordConvert createKafkaProtobufRecordConvert(String registryUrl, boolean useLatestSchema) { + ProtobufKafkaRecordConvert protobufKafkaRecordConvert; + if (useLatestSchema) { + SchemaResolutionResolver resolver = new RegistryBasedSchemaResolutionResolver(schemaRegistry); + protobufKafkaRecordConvert = new ProtobufKafkaRecordConvert(schemaRegistry, resolver); + } else { + protobufKafkaRecordConvert = new ProtobufKafkaRecordConvert(schemaRegistry); + } protobufKafkaRecordConvert.configure(Map.of("schema.registry.url", registryUrl), false); return protobufKafkaRecordConvert; } diff --git a/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java b/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java index 4055be2f2c..544e042662 100644 --- a/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java +++ b/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java @@ -21,6 +21,7 @@ import kafka.automq.table.deserializer.proto.CustomKafkaProtobufDeserializer; +import kafka.automq.table.deserializer.proto.SchemaResolutionResolver; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.serialization.Deserializer; @@ -55,6 +56,10 @@ public ProtobufKafkaRecordConvert(SchemaRegistryClient schemaRegistry) { this.deserializer = new CustomKafkaProtobufDeserializer<>(schemaRegistry); } + public ProtobufKafkaRecordConvert(SchemaRegistryClient schemaRegistry, SchemaResolutionResolver resolver) { + this.deserializer = new CustomKafkaProtobufDeserializer<>(schemaRegistry, resolver); + } + @VisibleForTesting public ProtobufKafkaRecordConvert(Deserializer deserializer) { this.deserializer = deserializer; diff --git a/server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java b/server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java index ab93e75817..de92aafc5f 100644 --- a/server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java +++ b/server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java @@ -27,7 +27,8 @@ public enum TableTopicSchemaType { SCHEMALESS("schemaless"), - SCHEMA("schema"); + SCHEMA("schema"), + SCHEMA_LATEST("schema_latest"); public final String name; private static final List VALUES = asList(values()); From 8fc2520433916f34c09d0be146105c986ab01a81 Mon Sep 17 00:00:00 2001 From: Gezi-lzq Date: Mon, 11 Aug 2025 21:21:00 +0800 Subject: [PATCH 2/3] feat(deserializer): add schema ID retrieval for Protobuf and Avro records --- .../proto/CustomKafkaProtobufDeserializer.java | 4 ++++ .../transformer/AvroKafkaRecordConvert.java | 14 ++++++++++++++ .../table/transformer/KafkaRecordConvert.java | 4 ++++ .../transformer/ProtobufKafkaRecordConvert.java | 17 +++++++++++++++-- .../RegistrySchemaAvroConverter.java | 2 +- 5 files changed, 38 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/kafka/automq/table/deserializer/proto/CustomKafkaProtobufDeserializer.java b/core/src/main/java/kafka/automq/table/deserializer/proto/CustomKafkaProtobufDeserializer.java index e44193d52b..d81b822715 100644 --- a/core/src/main/java/kafka/automq/table/deserializer/proto/CustomKafkaProtobufDeserializer.java +++ b/core/src/main/java/kafka/automq/table/deserializer/proto/CustomKafkaProtobufDeserializer.java @@ -35,6 +35,10 @@ public class CustomKafkaProtobufDeserializer public CustomKafkaProtobufDeserializer() { } + public CustomKafkaProtobufDeserializer(SchemaResolutionResolver resolver) { + super(resolver); + } + public CustomKafkaProtobufDeserializer(SchemaRegistryClient schemaRegistry) { this.schemaRegistry = schemaRegistry; } diff --git a/core/src/main/java/kafka/automq/table/transformer/AvroKafkaRecordConvert.java b/core/src/main/java/kafka/automq/table/transformer/AvroKafkaRecordConvert.java index ef565395cf..61367e86d9 100644 --- a/core/src/main/java/kafka/automq/table/transformer/AvroKafkaRecordConvert.java +++ b/core/src/main/java/kafka/automq/table/transformer/AvroKafkaRecordConvert.java @@ -26,12 +26,16 @@ import org.apache.avro.generic.GenericRecord; +import java.nio.ByteBuffer; import java.util.Map; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroDeserializer; public class AvroKafkaRecordConvert implements KafkaRecordConvert { + // Source: io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe#MAGIC_BYTE + private static final byte MAGIC_BYTE = 0x0; + private final Deserializer deserializer; public AvroKafkaRecordConvert() { @@ -57,6 +61,16 @@ public GenericRecord convert(String topic, Record record, int schemaId) { } } + // io.confluent.kafka.serializers.DeserializationContext#constructor + @Override + public int getSchemaId(String topic, org.apache.kafka.common.record.Record record) { + ByteBuffer buffer = record.value().duplicate(); + if (buffer.get() != MAGIC_BYTE) { + throw new InvalidDataException("Unknown magic byte!"); + } + return buffer.getInt(); + } + @Override public void configure(Map configs, boolean isKey) { deserializer.configure(configs, isKey); diff --git a/core/src/main/java/kafka/automq/table/transformer/KafkaRecordConvert.java b/core/src/main/java/kafka/automq/table/transformer/KafkaRecordConvert.java index 53df3d0e6e..f6c878f261 100644 --- a/core/src/main/java/kafka/automq/table/transformer/KafkaRecordConvert.java +++ b/core/src/main/java/kafka/automq/table/transformer/KafkaRecordConvert.java @@ -25,6 +25,10 @@ public interface KafkaRecordConvert { T convert(String topic, org.apache.kafka.common.record.Record record, int schemaId); + + // io.confluent.kafka.serializers.DeserializationContext#constructor + int getSchemaId(String topic, org.apache.kafka.common.record.Record record); + /** * Configure this class. * @param configs configs in key/value pairs diff --git a/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java b/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java index 544e042662..ffaabbcd75 100644 --- a/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java +++ b/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java @@ -30,10 +30,12 @@ import com.google.common.cache.CacheBuilder; import com.google.protobuf.Message; +import kafka.automq.table.deserializer.proto.HeaderBasedSchemaResolutionResolver; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.protobuf.ProtoConversions; import org.apache.avro.protobuf.ProtobufData; +import org.apache.kafka.common.utils.Utils; import java.time.Duration; import java.util.Map; @@ -48,21 +50,27 @@ public class ProtobufKafkaRecordConvert implements KafkaRecordConvert(); + this.resolver = new HeaderBasedSchemaResolutionResolver(); + this.deserializer = new CustomKafkaProtobufDeserializer<>(resolver); } public ProtobufKafkaRecordConvert(SchemaRegistryClient schemaRegistry) { - this.deserializer = new CustomKafkaProtobufDeserializer<>(schemaRegistry); + this.resolver = new HeaderBasedSchemaResolutionResolver(); + this.deserializer = new CustomKafkaProtobufDeserializer<>(schemaRegistry, resolver); } public ProtobufKafkaRecordConvert(SchemaRegistryClient schemaRegistry, SchemaResolutionResolver resolver) { this.deserializer = new CustomKafkaProtobufDeserializer<>(schemaRegistry, resolver); + this.resolver = resolver; } @VisibleForTesting public ProtobufKafkaRecordConvert(Deserializer deserializer) { this.deserializer = deserializer; + this.resolver = new HeaderBasedSchemaResolutionResolver(); } @Override @@ -78,6 +86,11 @@ public GenericRecord convert(String topic, Record record, int schemaId) { return ProtoToAvroConverter.convert(protoMessage, schema); } + @Override + public int getSchemaId(String topic, Record record) { + return resolver.resolve(topic, Utils.toNullableArray(record.value())).getSchemaId(); + } + @Override public void configure(Map configs, boolean isKey) { deserializer.configure(configs, isKey); diff --git a/core/src/main/java/kafka/automq/table/transformer/RegistrySchemaAvroConverter.java b/core/src/main/java/kafka/automq/table/transformer/RegistrySchemaAvroConverter.java index 9fdd659d5c..a6145d70fc 100644 --- a/core/src/main/java/kafka/automq/table/transformer/RegistrySchemaAvroConverter.java +++ b/core/src/main/java/kafka/automq/table/transformer/RegistrySchemaAvroConverter.java @@ -55,7 +55,7 @@ public RegistrySchemaAvroConverter(KafkaRecordConvert recordConve @Override public Record convert(org.apache.kafka.common.record.Record record) { - int schemaId = getSchemaId(record); + int schemaId = recordConvert.getSchemaId(topic, record); GenericRecord value; try { value = recordConvert.convert(topic, record, schemaId); From 0ea3eb52819e8386d4fdb9e1479c793c7c841869 Mon Sep 17 00:00:00 2001 From: Gezi-lzq Date: Tue, 12 Aug 2025 10:47:08 +0800 Subject: [PATCH 3/3] chore: spotless apply --- .../HeaderBasedSchemaResolutionResolver.java | 13 +++++++++++ ...RegistryBasedSchemaResolutionResolver.java | 19 +++++++++++----- .../proto/SchemaResolutionResolver.java | 6 +++++ .../table/transformer/ConverterFactory.java | 22 +++++++++++-------- .../ProtobufKafkaRecordConvert.java | 7 +++--- 5 files changed, 49 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/kafka/automq/table/deserializer/proto/HeaderBasedSchemaResolutionResolver.java b/core/src/main/java/kafka/automq/table/deserializer/proto/HeaderBasedSchemaResolutionResolver.java index e900db1e3b..74d1f5e91a 100644 --- a/core/src/main/java/kafka/automq/table/deserializer/proto/HeaderBasedSchemaResolutionResolver.java +++ b/core/src/main/java/kafka/automq/table/deserializer/proto/HeaderBasedSchemaResolutionResolver.java @@ -20,8 +20,10 @@ package kafka.automq.table.deserializer.proto; import kafka.automq.table.deserializer.proto.schema.MessageIndexes; +import kafka.automq.table.transformer.InvalidDataException; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.record.Record; import java.nio.ByteBuffer; @@ -67,4 +69,15 @@ public SchemaResolution resolve(String topic, byte[] payload) { return new SchemaResolution(schemaId, indexes, messageBytes); } + + @Override + public int getSchemaId(String topic, Record record) { + // io.confluent.kafka.serializers.DeserializationContext#constructor + ByteBuffer buffer = record.value().duplicate(); + if (buffer.get() != MAGIC_BYTE) { + throw new InvalidDataException("Unknown magic byte!"); + } + return buffer.getInt(); + } + } diff --git a/core/src/main/java/kafka/automq/table/deserializer/proto/RegistryBasedSchemaResolutionResolver.java b/core/src/main/java/kafka/automq/table/deserializer/proto/RegistryBasedSchemaResolutionResolver.java index 4a7d5a7ce4..848b6102ec 100644 --- a/core/src/main/java/kafka/automq/table/deserializer/proto/RegistryBasedSchemaResolutionResolver.java +++ b/core/src/main/java/kafka/automq/table/deserializer/proto/RegistryBasedSchemaResolutionResolver.java @@ -19,19 +19,21 @@ package kafka.automq.table.deserializer.proto; +import kafka.automq.table.deserializer.proto.schema.MessageIndexes; + import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.record.Record; import com.automq.stream.utils.Time; -import io.confluent.kafka.schemaregistry.client.SchemaMetadata; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import kafka.automq.table.deserializer.proto.schema.MessageIndexes; - import java.io.IOException; import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; + /** * Implementation of SchemaResolutionResolver that retrieves the latest schema from Schema Registry by subject name. * This implementation includes caching mechanism to avoid frequent registry queries. @@ -63,6 +65,13 @@ public SchemaResolution resolve(String topic, byte[] payload) { return new SchemaResolution(cachedInfo.schemaId, DEFAULT_INDEXES, payload); } + @Override + public int getSchemaId(String topic, Record record) { + String subject = getSubjectName(topic); + RegistryBasedSchemaResolutionResolver.CachedSchemaInfo cachedInfo = getCachedSchemaInfo(subject); + return cachedInfo.schemaId; + } + private RegistryBasedSchemaResolutionResolver.CachedSchemaInfo getCachedSchemaInfo(String subject) { long currentTime = time.milliseconds(); diff --git a/core/src/main/java/kafka/automq/table/deserializer/proto/SchemaResolutionResolver.java b/core/src/main/java/kafka/automq/table/deserializer/proto/SchemaResolutionResolver.java index fa5ca847ee..1ec0f5c63f 100644 --- a/core/src/main/java/kafka/automq/table/deserializer/proto/SchemaResolutionResolver.java +++ b/core/src/main/java/kafka/automq/table/deserializer/proto/SchemaResolutionResolver.java @@ -21,6 +21,9 @@ import kafka.automq.table.deserializer.proto.schema.MessageIndexes; +import org.apache.kafka.common.record.Record; + + /** * Interface for resolving schema information for protobuf message deserialization. * This interface supports different strategies for obtaining schema ID and message structure: @@ -39,6 +42,9 @@ public interface SchemaResolutionResolver { */ SchemaResolution resolve(String topic, byte[] payload); + + int getSchemaId(String topic, Record record); + /** * Container class for resolved schema information. */ diff --git a/core/src/main/java/kafka/automq/table/transformer/ConverterFactory.java b/core/src/main/java/kafka/automq/table/transformer/ConverterFactory.java index bd1ae04890..2c38470c13 100644 --- a/core/src/main/java/kafka/automq/table/transformer/ConverterFactory.java +++ b/core/src/main/java/kafka/automq/table/transformer/ConverterFactory.java @@ -19,26 +19,30 @@ package kafka.automq.table.transformer; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; -import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaMetadata; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import kafka.automq.table.deserializer.proto.ProtobufSchemaProvider; import kafka.automq.table.deserializer.proto.RegistryBasedSchemaResolutionResolver; import kafka.automq.table.deserializer.proto.SchemaResolutionResolver; -import org.apache.avro.generic.GenericRecord; + import org.apache.kafka.server.record.TableTopicSchemaType; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import org.apache.avro.generic.GenericRecord; + import java.io.IOException; import java.time.Duration; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; + public class ConverterFactory { private final String registryUrl; private final Map> recordConvertMap = new ConcurrentHashMap<>(); diff --git a/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java b/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java index ffaabbcd75..5576582c67 100644 --- a/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java +++ b/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java @@ -20,8 +20,9 @@ package kafka.automq.table.transformer; import kafka.automq.table.deserializer.proto.CustomKafkaProtobufDeserializer; - +import kafka.automq.table.deserializer.proto.HeaderBasedSchemaResolutionResolver; import kafka.automq.table.deserializer.proto.SchemaResolutionResolver; + import org.apache.kafka.common.record.Record; import org.apache.kafka.common.serialization.Deserializer; @@ -30,12 +31,10 @@ import com.google.common.cache.CacheBuilder; import com.google.protobuf.Message; -import kafka.automq.table.deserializer.proto.HeaderBasedSchemaResolutionResolver; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.protobuf.ProtoConversions; import org.apache.avro.protobuf.ProtobufData; -import org.apache.kafka.common.utils.Utils; import java.time.Duration; import java.util.Map; @@ -88,7 +87,7 @@ public GenericRecord convert(String topic, Record record, int schemaId) { @Override public int getSchemaId(String topic, Record record) { - return resolver.resolve(topic, Utils.toNullableArray(record.value())).getSchemaId(); + return resolver.getSchemaId(topic, record); } @Override