diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 0584a8ee14..b416e7ce7c 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -265,7 +265,7 @@ public class TopicConfig { public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace"; public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace"; public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type"; - public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema"; + public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema, schema_latest"; public static final String TABLE_TOPIC_ID_COLUMNS_CONFIG = "automq.table.topic.id.columns"; public static final String TABLE_TOPIC_ID_COLUMNS_DOC = "The primary key, comma-separated list of columns that identify a row in tables." + "ex. [region, name]"; diff --git a/core/src/main/java/kafka/automq/table/deserializer/proto/AbstractCustomKafkaProtobufDeserializer.java b/core/src/main/java/kafka/automq/table/deserializer/proto/AbstractCustomKafkaProtobufDeserializer.java index 605afca2a1..4874b1e644 100644 --- a/core/src/main/java/kafka/automq/table/deserializer/proto/AbstractCustomKafkaProtobufDeserializer.java +++ b/core/src/main/java/kafka/automq/table/deserializer/proto/AbstractCustomKafkaProtobufDeserializer.java @@ -33,7 +33,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InterruptedIOException; -import java.nio.ByteBuffer; import java.util.Map; import java.util.Objects; @@ -43,13 +42,18 @@ public abstract class AbstractCustomKafkaProtobufDeserializer extends AbstractKafkaSchemaSerDe { - private static final int SCHEMA_ID_SIZE = 4; - private static final int HEADER_SIZE = SCHEMA_ID_SIZE + 1; // magic byte + schema id protected final Map schemaCache; + protected final SchemaResolutionResolver schemaResolutionResolver; public AbstractCustomKafkaProtobufDeserializer() { this.schemaCache = new BoundedConcurrentHashMap<>(1000); + this.schemaResolutionResolver = new HeaderBasedSchemaResolutionResolver(); + } + + public AbstractCustomKafkaProtobufDeserializer(SchemaResolutionResolver schemaResolutionResolver) { + this.schemaCache = new BoundedConcurrentHashMap<>(1000); + this.schemaResolutionResolver = schemaResolutionResolver != null ? schemaResolutionResolver : new HeaderBasedSchemaResolutionResolver(); } protected void configure(CustomKafkaProtobufDeserializerConfig config) { @@ -76,30 +80,27 @@ protected T deserialize(String topic, Headers headers, byte[] payload) throw new InvalidConfigurationException("Schema registry not found, make sure the schema.registry.url is set"); } - int schemaId = 0; - byte[] messageBytes; - MessageIndexes indexes; - Message message; - try { - // Phase 2: Message Header Parsing - ByteBuffer buffer = processHeader(payload); - schemaId = extractSchemaId(buffer); - indexes = extractMessageIndexes(buffer); - messageBytes = extractMessageBytes(buffer); + // Phase 2: Schema Resolution + SchemaResolutionResolver.SchemaResolution resolution = schemaResolutionResolver.resolve(topic, payload); + int schemaId = resolution.getSchemaId(); + MessageIndexes indexes = resolution.getIndexes(); + byte[] messageBytes = resolution.getMessageBytes(); // Phase 3: Schema Processing ProtobufSchemaWrapper protobufSchemaWrapper = processSchema(topic, schemaId, indexes); Descriptors.Descriptor targetDescriptor = protobufSchemaWrapper.getDescriptor(); // Phase 4: Message Deserialization - message = deserializeMessage(targetDescriptor, messageBytes); + Message message = deserializeMessage(targetDescriptor, messageBytes); - return (T) message; + @SuppressWarnings("unchecked") + T result = (T) message; + return result; } catch (InterruptedIOException e) { - throw new TimeoutException("Error deserializing Protobuf message for id " + schemaId, e); + throw new TimeoutException("Error deserializing Protobuf message", e); } catch (IOException | RuntimeException e) { - throw new SerializationException("Error deserializing Protobuf message for id " + schemaId, e); + throw new SerializationException("Error deserializing Protobuf message", e); } } @@ -110,62 +111,6 @@ private Message deserializeMessage(Descriptors.Descriptor descriptor, byte[] mes return DynamicMessage.parseFrom(descriptor, new ByteArrayInputStream(messageBytes)); } - /** - * Phase 2a: Process the header of the message - * - * @param payload The serialized payload - * @return ByteBuffer positioned after the magic byte - */ - protected ByteBuffer processHeader(byte[] payload) { - return getByteBuffer(payload); - } - - protected ByteBuffer getByteBuffer(byte[] payload) { - if (payload == null || payload.length < HEADER_SIZE) { - throw new SerializationException("Invalid payload size"); - } - ByteBuffer buffer = ByteBuffer.wrap(payload); - byte magicByte = buffer.get(); - if (magicByte != MAGIC_BYTE) { - throw new SerializationException("Unknown magic byte: " + magicByte); - } - return buffer; - } - - /** - * Phase 2b: Extract the schema ID from the buffer - * - * @param buffer The byte buffer positioned after the magic byte - * @return The schema ID - */ - protected int extractSchemaId(ByteBuffer buffer) { - return buffer.getInt(); - } - - /** - * Phase 2c: Extract message indexes from the buffer - * - * @param buffer The byte buffer positioned after the schema ID - * @return The message indexes - */ - protected MessageIndexes extractMessageIndexes(ByteBuffer buffer) { - return MessageIndexes.readFrom(buffer); - } - - /** - * Phase 2d: Extract the actual message bytes from the buffer - * - * @param buffer The byte buffer positioned after the message indexes - * @return The message bytes - */ - protected byte[] extractMessageBytes(ByteBuffer buffer) { - int messageLength = buffer.remaining(); - - byte[] messageBytes = new byte[messageLength]; - buffer.get(messageBytes); - return messageBytes; - } - /** * Phase 3: Process and retrieve the schema * diff --git a/core/src/main/java/kafka/automq/table/deserializer/proto/CustomKafkaProtobufDeserializer.java b/core/src/main/java/kafka/automq/table/deserializer/proto/CustomKafkaProtobufDeserializer.java index 310120e3dc..d81b822715 100644 --- a/core/src/main/java/kafka/automq/table/deserializer/proto/CustomKafkaProtobufDeserializer.java +++ b/core/src/main/java/kafka/automq/table/deserializer/proto/CustomKafkaProtobufDeserializer.java @@ -35,10 +35,19 @@ public class CustomKafkaProtobufDeserializer public CustomKafkaProtobufDeserializer() { } + public CustomKafkaProtobufDeserializer(SchemaResolutionResolver resolver) { + super(resolver); + } + public CustomKafkaProtobufDeserializer(SchemaRegistryClient schemaRegistry) { this.schemaRegistry = schemaRegistry; } + public CustomKafkaProtobufDeserializer(SchemaRegistryClient schemaRegistry, SchemaResolutionResolver schemaResolutionResolver) { + super(schemaResolutionResolver); + this.schemaRegistry = schemaRegistry; + } + @Override public void configure(Map configs, boolean isKey) { CustomKafkaProtobufDeserializerConfig config = new CustomKafkaProtobufDeserializerConfig(configs); @@ -63,4 +72,4 @@ public void close() { throw new RuntimeException("Exception while closing deserializer", e); } } -} \ No newline at end of file +} diff --git a/core/src/main/java/kafka/automq/table/deserializer/proto/HeaderBasedSchemaResolutionResolver.java b/core/src/main/java/kafka/automq/table/deserializer/proto/HeaderBasedSchemaResolutionResolver.java new file mode 100644 index 0000000000..74d1f5e91a --- /dev/null +++ b/core/src/main/java/kafka/automq/table/deserializer/proto/HeaderBasedSchemaResolutionResolver.java @@ -0,0 +1,83 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.deserializer.proto; + +import kafka.automq.table.deserializer.proto.schema.MessageIndexes; +import kafka.automq.table.transformer.InvalidDataException; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.record.Record; + +import java.nio.ByteBuffer; + +/** + * Default implementation of SchemaResolutionResolver that parses schema information from message headers. + * This implementation handles the standard Confluent Kafka protobuf message format with magic byte, + * schema ID, message indexes, and message payload. + */ +public class HeaderBasedSchemaResolutionResolver implements SchemaResolutionResolver { + + private static final int SCHEMA_ID_SIZE = 4; + private static final int HEADER_SIZE = SCHEMA_ID_SIZE + 1; // magic byte + schema id + private static final byte MAGIC_BYTE = 0x0; + + @Override + public SchemaResolution resolve(String topic, byte[] payload) { + if (payload == null) { + throw new SerializationException("Payload cannot be null"); + } + + if (payload.length < HEADER_SIZE) { + throw new SerializationException("Invalid payload size: " + payload.length + ", expected at least " + HEADER_SIZE); + } + + ByteBuffer buffer = ByteBuffer.wrap(payload); + + // Extract magic byte + byte magicByte = buffer.get(); + if (magicByte != MAGIC_BYTE) { + throw new SerializationException("Unknown magic byte: " + magicByte); + } + + // Extract schema ID + int schemaId = buffer.getInt(); + + // Extract message indexes + MessageIndexes indexes = MessageIndexes.readFrom(buffer); + + // Extract message bytes + int messageLength = buffer.remaining(); + byte[] messageBytes = new byte[messageLength]; + buffer.get(messageBytes); + + return new SchemaResolution(schemaId, indexes, messageBytes); + } + + @Override + public int getSchemaId(String topic, Record record) { + // io.confluent.kafka.serializers.DeserializationContext#constructor + ByteBuffer buffer = record.value().duplicate(); + if (buffer.get() != MAGIC_BYTE) { + throw new InvalidDataException("Unknown magic byte!"); + } + return buffer.getInt(); + } + +} diff --git a/core/src/main/java/kafka/automq/table/deserializer/proto/RegistryBasedSchemaResolutionResolver.java b/core/src/main/java/kafka/automq/table/deserializer/proto/RegistryBasedSchemaResolutionResolver.java new file mode 100644 index 0000000000..848b6102ec --- /dev/null +++ b/core/src/main/java/kafka/automq/table/deserializer/proto/RegistryBasedSchemaResolutionResolver.java @@ -0,0 +1,118 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.deserializer.proto; + +import kafka.automq.table.deserializer.proto.schema.MessageIndexes; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.record.Record; + +import com.automq.stream.utils.Time; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; + +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; + +/** + * Implementation of SchemaResolutionResolver that retrieves the latest schema from Schema Registry by subject name. + * This implementation includes caching mechanism to avoid frequent registry queries. + * Cache entries are refreshed every 5 minutes. + */ +public class RegistryBasedSchemaResolutionResolver implements SchemaResolutionResolver { + + private static final long CACHE_REFRESH_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + private static final MessageIndexes DEFAULT_INDEXES = new MessageIndexes(Collections.singletonList(0)); + + private final SchemaRegistryClient schemaRegistry; + private final ConcurrentHashMap schemaCache = new ConcurrentHashMap<>(); + private final Time time; + + public RegistryBasedSchemaResolutionResolver(SchemaRegistryClient schemaRegistry) { + this.schemaRegistry = schemaRegistry; + time = Time.SYSTEM; + } + + @Override + public SchemaResolution resolve(String topic, byte[] payload) { + if (payload == null) { + throw new SerializationException("Payload cannot be null"); + } + + String subject = getSubjectName(topic); + RegistryBasedSchemaResolutionResolver.CachedSchemaInfo cachedInfo = getCachedSchemaInfo(subject); + + return new SchemaResolution(cachedInfo.schemaId, DEFAULT_INDEXES, payload); + } + + @Override + public int getSchemaId(String topic, Record record) { + String subject = getSubjectName(topic); + RegistryBasedSchemaResolutionResolver.CachedSchemaInfo cachedInfo = getCachedSchemaInfo(subject); + return cachedInfo.schemaId; + } + + private RegistryBasedSchemaResolutionResolver.CachedSchemaInfo getCachedSchemaInfo(String subject) { + long currentTime = time.milliseconds(); + + return schemaCache.compute(subject, (key, existing) -> { + // If we have existing data and it's still fresh, use it + if (existing != null && currentTime - existing.lastUpdated <= CACHE_REFRESH_INTERVAL_MS) { + return existing; + } + + // Try to get fresh data from registry + try { + SchemaMetadata latestSchema = schemaRegistry.getLatestSchemaMetadata(subject); + return new RegistryBasedSchemaResolutionResolver.CachedSchemaInfo(latestSchema.getId(), currentTime); + } catch (IOException | RestClientException e) { + // If we have existing cached data (even if expired), use it as fallback + if (existing != null) { + // Log warning but continue with stale data + System.err.println("Warning: Failed to refresh schema for subject " + subject + + ", using cached data from " + + new java.util.Date(existing.lastUpdated) + ": " + e.getMessage()); + return existing; + } + // No cached data and fresh fetch failed - this is a hard error + throw new SerializationException("Error retrieving schema for subject " + subject + + " and no cached data available", e); + } + }); + } + + private String getSubjectName(String topic) { + // Follow the Confluent naming convention: -value or -key + return topic + "-value"; + } + + private static class CachedSchemaInfo { + final int schemaId; + final long lastUpdated; + + CachedSchemaInfo(int schemaId, long lastUpdated) { + this.schemaId = schemaId; + this.lastUpdated = lastUpdated; + } + } +} diff --git a/core/src/main/java/kafka/automq/table/deserializer/proto/SchemaResolutionResolver.java b/core/src/main/java/kafka/automq/table/deserializer/proto/SchemaResolutionResolver.java new file mode 100644 index 0000000000..1ec0f5c63f --- /dev/null +++ b/core/src/main/java/kafka/automq/table/deserializer/proto/SchemaResolutionResolver.java @@ -0,0 +1,74 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.deserializer.proto; + +import kafka.automq.table.deserializer.proto.schema.MessageIndexes; + +import org.apache.kafka.common.record.Record; + + +/** + * Interface for resolving schema information for protobuf message deserialization. + * This interface supports different strategies for obtaining schema ID and message structure: + * - Parse from message header (standard Confluent format) + * - Lookup latest schema from registry by subject name + */ +public interface SchemaResolutionResolver { + + /** + * Resolves schema information for the given message payload and topic. + * + * @param topic The Kafka topic name + * @param payload The serialized protobuf payload + * @return SchemaResolution containing schema ID, message indexes, and message bytes + * @throws org.apache.kafka.common.errors.SerializationException if resolution fails + */ + SchemaResolution resolve(String topic, byte[] payload); + + + int getSchemaId(String topic, Record record); + + /** + * Container class for resolved schema information. + */ + class SchemaResolution { + private final int schemaId; + private final MessageIndexes indexes; + private final byte[] messageBytes; + + public SchemaResolution(int schemaId, MessageIndexes indexes, byte[] messageBytes) { + this.schemaId = schemaId; + this.indexes = indexes; + this.messageBytes = messageBytes; + } + + public int getSchemaId() { + return schemaId; + } + + public MessageIndexes getIndexes() { + return indexes; + } + + public byte[] getMessageBytes() { + return messageBytes; + } + } +} diff --git a/core/src/main/java/kafka/automq/table/transformer/AvroKafkaRecordConvert.java b/core/src/main/java/kafka/automq/table/transformer/AvroKafkaRecordConvert.java index ef565395cf..61367e86d9 100644 --- a/core/src/main/java/kafka/automq/table/transformer/AvroKafkaRecordConvert.java +++ b/core/src/main/java/kafka/automq/table/transformer/AvroKafkaRecordConvert.java @@ -26,12 +26,16 @@ import org.apache.avro.generic.GenericRecord; +import java.nio.ByteBuffer; import java.util.Map; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroDeserializer; public class AvroKafkaRecordConvert implements KafkaRecordConvert { + // Source: io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe#MAGIC_BYTE + private static final byte MAGIC_BYTE = 0x0; + private final Deserializer deserializer; public AvroKafkaRecordConvert() { @@ -57,6 +61,16 @@ public GenericRecord convert(String topic, Record record, int schemaId) { } } + // io.confluent.kafka.serializers.DeserializationContext#constructor + @Override + public int getSchemaId(String topic, org.apache.kafka.common.record.Record record) { + ByteBuffer buffer = record.value().duplicate(); + if (buffer.get() != MAGIC_BYTE) { + throw new InvalidDataException("Unknown magic byte!"); + } + return buffer.getInt(); + } + @Override public void configure(Map configs, boolean isKey) { deserializer.configure(configs, isKey); diff --git a/core/src/main/java/kafka/automq/table/transformer/ConverterFactory.java b/core/src/main/java/kafka/automq/table/transformer/ConverterFactory.java index e2eb1ef0a1..2c38470c13 100644 --- a/core/src/main/java/kafka/automq/table/transformer/ConverterFactory.java +++ b/core/src/main/java/kafka/automq/table/transformer/ConverterFactory.java @@ -20,6 +20,8 @@ package kafka.automq.table.transformer; import kafka.automq.table.deserializer.proto.ProtobufSchemaProvider; +import kafka.automq.table.deserializer.proto.RegistryBasedSchemaResolutionResolver; +import kafka.automq.table.deserializer.proto.SchemaResolutionResolver; import org.apache.kafka.server.record.TableTopicSchemaType; @@ -41,11 +43,9 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; -import static kafka.automq.table.transformer.SchemaFormat.AVRO; - public class ConverterFactory { private final String registryUrl; - private final Map> recordConvertMap = new ConcurrentHashMap<>(); + private final Map> recordConvertMap = new ConcurrentHashMap<>(); private SchemaRegistryClient schemaRegistry; private final Cache topicSchemaFormatCache = CacheBuilder.newBuilder() @@ -80,7 +80,10 @@ public Converter converter(TableTopicSchemaType type, String topic) { return new SchemalessConverter(); } case SCHEMA: { - return new LazyRegistrySchemaConvert(() -> createRegistrySchemaConverter(topic)); + return new LazyRegistrySchemaConvert(() -> createRegistrySchemaConverter(topic, false)); + } + case SCHEMA_LATEST: { + return new LazyRegistrySchemaConvert(() -> createRegistrySchemaConverter(topic, true)); } default: { throw new IllegalArgumentException("Unsupported converter type: " + type); @@ -88,7 +91,7 @@ public Converter converter(TableTopicSchemaType type, String topic) { } } - private Converter createRegistrySchemaConverter(String topic) { + private Converter createRegistrySchemaConverter(String topic, boolean useLatestSchema) { if (schemaRegistry != null) { try { SchemaFormat format = topicSchemaFormatCache.getIfPresent(topic); @@ -104,7 +107,7 @@ private Converter createRegistrySchemaConverter(String topic) { return createAvroConverter(topic); } case PROTOBUF: { - return createProtobufConverter(topic); + return createProtobufConverter(topic, useLatestSchema); } default: { throw new IllegalArgumentException("Unsupported schema format: " + format); @@ -123,14 +126,15 @@ private String getSubjectName(String topic) { } private Converter createAvroConverter(String topic) { - KafkaRecordConvert recordConvert = recordConvertMap.computeIfAbsent(AVRO, + KafkaRecordConvert recordConvert = recordConvertMap.computeIfAbsent(SchemaFormat.AVRO.name(), format -> createKafkaAvroRecordConvert(registryUrl)); return new RegistrySchemaAvroConverter(recordConvert, topic); } - private Converter createProtobufConverter(String topic) { - KafkaRecordConvert recordConvert = recordConvertMap.computeIfAbsent(SchemaFormat.PROTOBUF, - format -> createKafkaProtobufRecordConvert(registryUrl)); + private Converter createProtobufConverter(String topic, boolean useLatestSchema) { + String cacheKey = useLatestSchema ? SchemaFormat.PROTOBUF.name() + "_LATEST" : SchemaFormat.PROTOBUF.name(); + KafkaRecordConvert recordConvert = recordConvertMap.computeIfAbsent(cacheKey, + key -> createKafkaProtobufRecordConvert(registryUrl, useLatestSchema)); return new RegistrySchemaAvroConverter(recordConvert, topic); } @@ -142,8 +146,14 @@ private KafkaRecordConvert createKafkaAvroRecordConvert(String re } @SuppressWarnings("unchecked") - private KafkaRecordConvert createKafkaProtobufRecordConvert(String registryUrl) { - ProtobufKafkaRecordConvert protobufKafkaRecordConvert = new ProtobufKafkaRecordConvert(schemaRegistry); + private KafkaRecordConvert createKafkaProtobufRecordConvert(String registryUrl, boolean useLatestSchema) { + ProtobufKafkaRecordConvert protobufKafkaRecordConvert; + if (useLatestSchema) { + SchemaResolutionResolver resolver = new RegistryBasedSchemaResolutionResolver(schemaRegistry); + protobufKafkaRecordConvert = new ProtobufKafkaRecordConvert(schemaRegistry, resolver); + } else { + protobufKafkaRecordConvert = new ProtobufKafkaRecordConvert(schemaRegistry); + } protobufKafkaRecordConvert.configure(Map.of("schema.registry.url", registryUrl), false); return protobufKafkaRecordConvert; } diff --git a/core/src/main/java/kafka/automq/table/transformer/KafkaRecordConvert.java b/core/src/main/java/kafka/automq/table/transformer/KafkaRecordConvert.java index 53df3d0e6e..f6c878f261 100644 --- a/core/src/main/java/kafka/automq/table/transformer/KafkaRecordConvert.java +++ b/core/src/main/java/kafka/automq/table/transformer/KafkaRecordConvert.java @@ -25,6 +25,10 @@ public interface KafkaRecordConvert { T convert(String topic, org.apache.kafka.common.record.Record record, int schemaId); + + // io.confluent.kafka.serializers.DeserializationContext#constructor + int getSchemaId(String topic, org.apache.kafka.common.record.Record record); + /** * Configure this class. * @param configs configs in key/value pairs diff --git a/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java b/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java index 4055be2f2c..5576582c67 100644 --- a/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java +++ b/core/src/main/java/kafka/automq/table/transformer/ProtobufKafkaRecordConvert.java @@ -20,6 +20,8 @@ package kafka.automq.table.transformer; import kafka.automq.table.deserializer.proto.CustomKafkaProtobufDeserializer; +import kafka.automq.table.deserializer.proto.HeaderBasedSchemaResolutionResolver; +import kafka.automq.table.deserializer.proto.SchemaResolutionResolver; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.serialization.Deserializer; @@ -47,17 +49,27 @@ public class ProtobufKafkaRecordConvert implements KafkaRecordConvert(); + this.resolver = new HeaderBasedSchemaResolutionResolver(); + this.deserializer = new CustomKafkaProtobufDeserializer<>(resolver); } public ProtobufKafkaRecordConvert(SchemaRegistryClient schemaRegistry) { - this.deserializer = new CustomKafkaProtobufDeserializer<>(schemaRegistry); + this.resolver = new HeaderBasedSchemaResolutionResolver(); + this.deserializer = new CustomKafkaProtobufDeserializer<>(schemaRegistry, resolver); + } + + public ProtobufKafkaRecordConvert(SchemaRegistryClient schemaRegistry, SchemaResolutionResolver resolver) { + this.deserializer = new CustomKafkaProtobufDeserializer<>(schemaRegistry, resolver); + this.resolver = resolver; } @VisibleForTesting public ProtobufKafkaRecordConvert(Deserializer deserializer) { this.deserializer = deserializer; + this.resolver = new HeaderBasedSchemaResolutionResolver(); } @Override @@ -73,6 +85,11 @@ public GenericRecord convert(String topic, Record record, int schemaId) { return ProtoToAvroConverter.convert(protoMessage, schema); } + @Override + public int getSchemaId(String topic, Record record) { + return resolver.getSchemaId(topic, record); + } + @Override public void configure(Map configs, boolean isKey) { deserializer.configure(configs, isKey); diff --git a/core/src/main/java/kafka/automq/table/transformer/RegistrySchemaAvroConverter.java b/core/src/main/java/kafka/automq/table/transformer/RegistrySchemaAvroConverter.java index 9fdd659d5c..a6145d70fc 100644 --- a/core/src/main/java/kafka/automq/table/transformer/RegistrySchemaAvroConverter.java +++ b/core/src/main/java/kafka/automq/table/transformer/RegistrySchemaAvroConverter.java @@ -55,7 +55,7 @@ public RegistrySchemaAvroConverter(KafkaRecordConvert recordConve @Override public Record convert(org.apache.kafka.common.record.Record record) { - int schemaId = getSchemaId(record); + int schemaId = recordConvert.getSchemaId(topic, record); GenericRecord value; try { value = recordConvert.convert(topic, record, schemaId); diff --git a/server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java b/server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java index ab93e75817..de92aafc5f 100644 --- a/server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java +++ b/server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java @@ -27,7 +27,8 @@ public enum TableTopicSchemaType { SCHEMALESS("schemaless"), - SCHEMA("schema"); + SCHEMA("schema"), + SCHEMA_LATEST("schema_latest"); public final String name; private static final List VALUES = asList(values());