Skip to content

feat(table-topic): Support schema resolution from registry[WIP] #2749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public class TopicConfig {
public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace";
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type";
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema";
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema, schema_latest";
public static final String TABLE_TOPIC_ID_COLUMNS_CONFIG = "automq.table.topic.id.columns";
public static final String TABLE_TOPIC_ID_COLUMNS_DOC = "The primary key, comma-separated list of columns that identify a row in tables."
+ "ex. [region, name]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;

Expand All @@ -43,13 +42,18 @@

public abstract class AbstractCustomKafkaProtobufDeserializer<T extends Message>
extends AbstractKafkaSchemaSerDe {
private static final int SCHEMA_ID_SIZE = 4;
private static final int HEADER_SIZE = SCHEMA_ID_SIZE + 1; // magic byte + schema id

protected final Map<SchemaKey, ProtobufSchemaWrapper> schemaCache;
protected final SchemaResolutionResolver schemaResolutionResolver;

public AbstractCustomKafkaProtobufDeserializer() {
this.schemaCache = new BoundedConcurrentHashMap<>(1000);
this.schemaResolutionResolver = new HeaderBasedSchemaResolutionResolver();
}

public AbstractCustomKafkaProtobufDeserializer(SchemaResolutionResolver schemaResolutionResolver) {
this.schemaCache = new BoundedConcurrentHashMap<>(1000);
this.schemaResolutionResolver = schemaResolutionResolver != null ? schemaResolutionResolver : new HeaderBasedSchemaResolutionResolver();
}

protected void configure(CustomKafkaProtobufDeserializerConfig config) {
Expand All @@ -76,30 +80,27 @@ protected T deserialize(String topic, Headers headers, byte[] payload)
throw new InvalidConfigurationException("Schema registry not found, make sure the schema.registry.url is set");
}

int schemaId = 0;
byte[] messageBytes;
MessageIndexes indexes;
Message message;

try {
// Phase 2: Message Header Parsing
ByteBuffer buffer = processHeader(payload);
schemaId = extractSchemaId(buffer);
indexes = extractMessageIndexes(buffer);
messageBytes = extractMessageBytes(buffer);
// Phase 2: Schema Resolution
SchemaResolutionResolver.SchemaResolution resolution = schemaResolutionResolver.resolve(topic, payload);
int schemaId = resolution.getSchemaId();
MessageIndexes indexes = resolution.getIndexes();
byte[] messageBytes = resolution.getMessageBytes();

// Phase 3: Schema Processing
ProtobufSchemaWrapper protobufSchemaWrapper = processSchema(topic, schemaId, indexes);
Descriptors.Descriptor targetDescriptor = protobufSchemaWrapper.getDescriptor();

// Phase 4: Message Deserialization
message = deserializeMessage(targetDescriptor, messageBytes);
Message message = deserializeMessage(targetDescriptor, messageBytes);

return (T) message;
@SuppressWarnings("unchecked")
T result = (T) message;
return result;
} catch (InterruptedIOException e) {
throw new TimeoutException("Error deserializing Protobuf message for id " + schemaId, e);
throw new TimeoutException("Error deserializing Protobuf message", e);
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing Protobuf message for id " + schemaId, e);
throw new SerializationException("Error deserializing Protobuf message", e);
}
}

Expand All @@ -110,62 +111,6 @@ private Message deserializeMessage(Descriptors.Descriptor descriptor, byte[] mes
return DynamicMessage.parseFrom(descriptor, new ByteArrayInputStream(messageBytes));
}

/**
* Phase 2a: Process the header of the message
*
* @param payload The serialized payload
* @return ByteBuffer positioned after the magic byte
*/
protected ByteBuffer processHeader(byte[] payload) {
return getByteBuffer(payload);
}

protected ByteBuffer getByteBuffer(byte[] payload) {
if (payload == null || payload.length < HEADER_SIZE) {
throw new SerializationException("Invalid payload size");
}
ByteBuffer buffer = ByteBuffer.wrap(payload);
byte magicByte = buffer.get();
if (magicByte != MAGIC_BYTE) {
throw new SerializationException("Unknown magic byte: " + magicByte);
}
return buffer;
}

/**
* Phase 2b: Extract the schema ID from the buffer
*
* @param buffer The byte buffer positioned after the magic byte
* @return The schema ID
*/
protected int extractSchemaId(ByteBuffer buffer) {
return buffer.getInt();
}

/**
* Phase 2c: Extract message indexes from the buffer
*
* @param buffer The byte buffer positioned after the schema ID
* @return The message indexes
*/
protected MessageIndexes extractMessageIndexes(ByteBuffer buffer) {
return MessageIndexes.readFrom(buffer);
}

/**
* Phase 2d: Extract the actual message bytes from the buffer
*
* @param buffer The byte buffer positioned after the message indexes
* @return The message bytes
*/
protected byte[] extractMessageBytes(ByteBuffer buffer) {
int messageLength = buffer.remaining();

byte[] messageBytes = new byte[messageLength];
buffer.get(messageBytes);
return messageBytes;
}

/**
* Phase 3: Process and retrieve the schema
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,19 @@ public class CustomKafkaProtobufDeserializer<T extends Message>
public CustomKafkaProtobufDeserializer() {
}

public CustomKafkaProtobufDeserializer(SchemaResolutionResolver resolver) {
super(resolver);
}

public CustomKafkaProtobufDeserializer(SchemaRegistryClient schemaRegistry) {
this.schemaRegistry = schemaRegistry;
}

public CustomKafkaProtobufDeserializer(SchemaRegistryClient schemaRegistry, SchemaResolutionResolver schemaResolutionResolver) {
super(schemaResolutionResolver);
this.schemaRegistry = schemaRegistry;
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
CustomKafkaProtobufDeserializerConfig config = new CustomKafkaProtobufDeserializerConfig(configs);
Expand All @@ -63,4 +72,4 @@ public void close() {
throw new RuntimeException("Exception while closing deserializer", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.automq.table.deserializer.proto;

import kafka.automq.table.deserializer.proto.schema.MessageIndexes;
import kafka.automq.table.transformer.InvalidDataException;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.record.Record;

import java.nio.ByteBuffer;

/**
* Default implementation of SchemaResolutionResolver that parses schema information from message headers.
* This implementation handles the standard Confluent Kafka protobuf message format with magic byte,
* schema ID, message indexes, and message payload.
*/
public class HeaderBasedSchemaResolutionResolver implements SchemaResolutionResolver {

private static final int SCHEMA_ID_SIZE = 4;
private static final int HEADER_SIZE = SCHEMA_ID_SIZE + 1; // magic byte + schema id
private static final byte MAGIC_BYTE = 0x0;

@Override
public SchemaResolution resolve(String topic, byte[] payload) {
if (payload == null) {
throw new SerializationException("Payload cannot be null");
}

if (payload.length < HEADER_SIZE) {
throw new SerializationException("Invalid payload size: " + payload.length + ", expected at least " + HEADER_SIZE);
}

ByteBuffer buffer = ByteBuffer.wrap(payload);

// Extract magic byte
byte magicByte = buffer.get();
if (magicByte != MAGIC_BYTE) {
throw new SerializationException("Unknown magic byte: " + magicByte);
}

// Extract schema ID
int schemaId = buffer.getInt();

// Extract message indexes
MessageIndexes indexes = MessageIndexes.readFrom(buffer);

// Extract message bytes
int messageLength = buffer.remaining();
byte[] messageBytes = new byte[messageLength];
buffer.get(messageBytes);

return new SchemaResolution(schemaId, indexes, messageBytes);
}

@Override
public int getSchemaId(String topic, Record record) {
// io.confluent.kafka.serializers.DeserializationContext#constructor
ByteBuffer buffer = record.value().duplicate();
if (buffer.get() != MAGIC_BYTE) {
throw new InvalidDataException("Unknown magic byte!");
}
return buffer.getInt();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.automq.table.deserializer.proto;

import kafka.automq.table.deserializer.proto.schema.MessageIndexes;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.record.Record;

import com.automq.stream.utils.Time;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;

import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;

/**
* Implementation of SchemaResolutionResolver that retrieves the latest schema from Schema Registry by subject name.
* This implementation includes caching mechanism to avoid frequent registry queries.
* Cache entries are refreshed every 5 minutes.
*/
public class RegistryBasedSchemaResolutionResolver implements SchemaResolutionResolver {

private static final long CACHE_REFRESH_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
private static final MessageIndexes DEFAULT_INDEXES = new MessageIndexes(Collections.singletonList(0));

private final SchemaRegistryClient schemaRegistry;
private final ConcurrentHashMap<String, RegistryBasedSchemaResolutionResolver.CachedSchemaInfo> schemaCache = new ConcurrentHashMap<>();
private final Time time;

public RegistryBasedSchemaResolutionResolver(SchemaRegistryClient schemaRegistry) {
this.schemaRegistry = schemaRegistry;
time = Time.SYSTEM;
}

@Override
public SchemaResolution resolve(String topic, byte[] payload) {
if (payload == null) {
throw new SerializationException("Payload cannot be null");
}

String subject = getSubjectName(topic);
RegistryBasedSchemaResolutionResolver.CachedSchemaInfo cachedInfo = getCachedSchemaInfo(subject);

return new SchemaResolution(cachedInfo.schemaId, DEFAULT_INDEXES, payload);
}

@Override
public int getSchemaId(String topic, Record record) {
String subject = getSubjectName(topic);
RegistryBasedSchemaResolutionResolver.CachedSchemaInfo cachedInfo = getCachedSchemaInfo(subject);
return cachedInfo.schemaId;
}

private RegistryBasedSchemaResolutionResolver.CachedSchemaInfo getCachedSchemaInfo(String subject) {
long currentTime = time.milliseconds();

return schemaCache.compute(subject, (key, existing) -> {
// If we have existing data and it's still fresh, use it
if (existing != null && currentTime - existing.lastUpdated <= CACHE_REFRESH_INTERVAL_MS) {
return existing;
}

// Try to get fresh data from registry
try {
SchemaMetadata latestSchema = schemaRegistry.getLatestSchemaMetadata(subject);
return new RegistryBasedSchemaResolutionResolver.CachedSchemaInfo(latestSchema.getId(), currentTime);
} catch (IOException | RestClientException e) {
// If we have existing cached data (even if expired), use it as fallback
if (existing != null) {
// Log warning but continue with stale data
System.err.println("Warning: Failed to refresh schema for subject " + subject +
", using cached data from " +
new java.util.Date(existing.lastUpdated) + ": " + e.getMessage());
return existing;
}
// No cached data and fresh fetch failed - this is a hard error
throw new SerializationException("Error retrieving schema for subject " + subject +
" and no cached data available", e);
}
});
}

private String getSubjectName(String topic) {
// Follow the Confluent naming convention: <topic>-value or <topic>-key
return topic + "-value";
}

private static class CachedSchemaInfo {
final int schemaId;
final long lastUpdated;

CachedSchemaInfo(int schemaId, long lastUpdated) {
this.schemaId = schemaId;
this.lastUpdated = lastUpdated;
}
}
}
Loading
Loading