diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 625764c17..245927061 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -850,6 +850,7 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, int timeoutMs = produceRequest.timeout(); String namespacePrefix = currentNamespacePrefix(); final AtomicInteger unfinishedAuthorizationCount = new AtomicInteger(numPartitions); + SchemaManager schemaManager = schemaManagerForTenant.apply(getCurrentTenant()); Runnable completeOne = () -> { // When complete one authorization or failed, will do the action first. if (unfinishedAuthorizationCount.decrementAndGet() == 0) { @@ -859,6 +860,7 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, } AppendRecordsContext appendRecordsContext = AppendRecordsContext.get( topicManager, + schemaManager, this::startSendOperationForThrottling, this::completeSendOperationForThrottling, pendingTopicFuturesMap); @@ -2255,12 +2257,14 @@ protected void handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest } }; + SchemaManager schemaManager = schemaManagerForTenant.apply(getCurrentTenant()); for (WriteTxnMarkersRequest.TxnMarkerEntry marker : markers) { long producerId = marker.producerId(); TransactionResult transactionResult = marker.transactionResult(); Map controlRecords = generateTxnMarkerRecords(marker); AppendRecordsContext appendRecordsContext = AppendRecordsContext.get( topicManager, + schemaManager, this::startSendOperationForThrottling, this::completeSendOperationForThrottling, this.pendingTopicFuturesMap); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 116a08378..032a125ee 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -418,6 +418,12 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private boolean kafkaApplyAvroSchemaOnDecode = false; + @FieldContext( + category = CATEGORY_KOP, + doc = "Register the AVRO schema into the Pulsar Schema Registry while encoding Kafka messages encoded with KafkaAvroSerializer" + ) + private boolean kafkaRegisterAvroSchemaOnEncode = false; + @FieldContext( category = CATEGORY_KOP, doc = "The broker id, default is 1" diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java index 925e58a2b..7d2e08181 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java @@ -24,6 +24,17 @@ */ public interface EntryFormatter { + /** + * Encode Kafka records to a ByteBuf. + * + * @param encodeRequest contains messages with Kafka's format + * @return the EncodeResult contains the ByteBuf of an entry that is to be written to Bookie + */ + default EncodeResult encode(EncodeRequest encodeRequest, + String pulsarTopicName, SchemaManager schemaManager) { + return encode(encodeRequest); + } + /** * Encode Kafka records to a ByteBuf. * diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java index adc931bda..d4fdcb209 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java @@ -35,6 +35,7 @@ public static EntryFormatter create(final KafkaServiceConfiguration kafkaConfig, final ImmutableMap entryfilterMap) { final String format = kafkaConfig.getEntryFormat(); final boolean applyAvroSchemaOnDecode = kafkaConfig.isKafkaApplyAvroSchemaOnDecode(); + final boolean registerAvroSchemaOnEncode = kafkaConfig.isKafkaRegisterAvroSchemaOnEncode(); try { EntryFormat entryFormat = Enum.valueOf(EntryFormat.class, format.toUpperCase()); @@ -43,7 +44,7 @@ public static EntryFormatter create(final KafkaServiceConfiguration kafkaConfig, switch (entryFormat) { case PULSAR: - return new PulsarEntryFormatter(applyAvroSchemaOnDecode, entryfilters); + return new PulsarEntryFormatter(applyAvroSchemaOnDecode, registerAvroSchemaOnEncode, entryfilters); case KAFKA: return new KafkaV1EntryFormatter(applyAvroSchemaOnDecode, entryfilters); case MIXED_KAFKA: diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarAdminSchemaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarAdminSchemaManager.java index 1d9d6d32d..6f6b3aaf5 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarAdminSchemaManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarAdminSchemaManager.java @@ -13,10 +13,12 @@ */ package io.streamnative.pulsar.handlers.kop.format; +import com.google.common.collect.ImmutableMap; import io.streamnative.pulsar.handlers.kop.schemaregistry.model.Schema; import io.streamnative.pulsar.handlers.kop.schemaregistry.model.SchemaStorage; import io.streamnative.pulsar.handlers.kop.schemaregistry.model.SchemaStorageAccessor; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import lombok.AllArgsConstructor; @@ -26,6 +28,7 @@ import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.LongSchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -40,6 +43,31 @@ public class PulsarAdminSchemaManager implements SchemaManager{ private final SchemaStorageAccessor kafkaSchemaRegistry; private final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + @Override + public CompletableFuture getSchema(String topic, int id) { + SchemaStorage schemaStorageForTenant = kafkaSchemaRegistry.getSchemaStorageForTenant(tenant); + CompletableFuture schemaById = schemaStorageForTenant.findSchemaById(id); + return schemaById.thenCompose(schema -> { + log.info("Kafka Schema {}", schema); + SchemaInfo schemaInfo = SchemaInfo + .builder() + .schema(schema.getSchemaDefinition().getBytes(StandardCharsets.UTF_8)) + .name("kafka-schema") + .type(SchemaType.AVRO) + .properties(ImmutableMap.of()) + .build(); + return pulsarAdmin.schemas().createSchemaAsync(topic, schemaInfo) + .thenCompose(v -> { + return pulsarAdmin.schemas() + .getVersionBySchemaAsync(topic, schemaInfo) + .thenApply(schemaId -> { + return BytesSchemaVersion.of(new LongSchemaVersion(schemaId).bytes()); + }); + }); + }); + + } + @Override public CompletableFuture getSchemaIds(String topic, BytesSchemaVersion schemaVersion) { if (schemaVersion == null) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java index c7edc2d32..75c714a01 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java @@ -19,8 +19,12 @@ import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import io.streamnative.pulsar.handlers.kop.utils.PulsarMessageBuilder; + +import java.nio.ByteBuffer; import java.util.List; import java.util.stream.StreamSupport; + +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.MathUtils; import org.apache.kafka.common.header.Header; @@ -34,6 +38,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Commands.ChecksumType; +import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; /** @@ -45,13 +50,26 @@ public class PulsarEntryFormatter extends AbstractEntryFormatter { private static final int INITIAL_BATCH_BUFFER_SIZE = 1024; private static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024; + private static final byte[] EMPTY_ARRAY = new byte[0]; + + private static final byte MAGIC_ZERO = 0; + + private final boolean registerAvroSchemaOnEncode; + public PulsarEntryFormatter(boolean applyAvroSchemaOnDecode, + boolean registerAvroSchemaOnEncode, ImmutableList entryfilters) { super(applyAvroSchemaOnDecode, entryfilters); + this.registerAvroSchemaOnEncode = registerAvroSchemaOnEncode; } @Override public EncodeResult encode(final EncodeRequest encodeRequest) { + return encode(encodeRequest, null, null); + } + + @Override + public EncodeResult encode(final EncodeRequest encodeRequest, String pulsarTopicName, SchemaManager schemaManager) { final MemoryRecords records = encodeRequest.getRecords(); final int numMessages = encodeRequest.getAppendInfo().numMessages(); long currentBatchSizeBytes = 0; @@ -68,7 +86,7 @@ public EncodeResult encode(final EncodeRequest encodeRequest) { records.batches().forEach(recordBatch -> { boolean controlBatch = recordBatch.isControlBatch(); StreamSupport.stream(recordBatch.spliterator(), true).forEachOrdered(record -> { - MessageImpl message = recordToEntry(record); + MessageImpl message = recordToEntry(record, pulsarTopicName, schemaManager); messages.add(message); if (recordBatch.isTransactional()) { msgMetadata.setTxnidMostBits(recordBatch.producerId()); @@ -122,7 +140,7 @@ public EncodeResult encode(final EncodeRequest encodeRequest) { // convert kafka Record to Pulsar Message. // convert kafka Record to Pulsar Message. // called when publish received Kafka Record into Pulsar. - private static MessageImpl recordToEntry(Record record) { + private MessageImpl recordToEntry(Record record, String pulsarTopicName, SchemaManager schemaManager) { PulsarMessageBuilder builder = PulsarMessageBuilder.newBuilder(); @@ -137,9 +155,39 @@ private static MessageImpl recordToEntry(Record record) { // value if (record.hasValue()) { - byte[] value = new byte[record.valueSize()]; - record.value().get(value); - builder.value(value); + ByteBuffer recordValue = record.value(); + int size = recordValue.remaining(); + if (size == 0) { + builder.value(EMPTY_ARRAY); + } else { + if (registerAvroSchemaOnEncode + && size >= 5 // MAGIC + 4 bytes schema id + && recordValue.get(0) == MAGIC_ZERO) { + int schemaId = recordValue.getInt(1); + log.info("Schema ID: {}", schemaId); + + BytesSchemaVersion schemaVersion; + try { + schemaVersion = schemaManager.getSchema(pulsarTopicName, schemaId) + .get(); + } catch (Exception err) { + throw new RuntimeException(err); + } + log.info("Schema version {}", schemaVersion); + if (schemaVersion != null) { + builder.getMetadataBuilder().setSchemaVersion(schemaVersion.get()); + } + byte[] value = new byte[record.valueSize() - 5]; + // skip magic + schema id + recordValue.position(recordValue.position() + 5); + recordValue.get(value); + builder.value(value); + } else { + byte[] value = new byte[record.valueSize()]; + recordValue.get(value); + builder.value(value); + } + } } else { builder.value(null); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/SchemaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/SchemaManager.java index 830de6752..cc445c8b0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/SchemaManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/SchemaManager.java @@ -30,4 +30,6 @@ interface KeyValueSchemaIds { * @return */ CompletableFuture getSchemaIds(String topic, BytesSchemaVersion schemaVersion); + + CompletableFuture getSchema(String topic, int id); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AppendRecordsContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AppendRecordsContext.java index 960dc5d48..14b9ee28a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AppendRecordsContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AppendRecordsContext.java @@ -18,6 +18,8 @@ import io.streamnative.pulsar.handlers.kop.PendingTopicFutures; import java.util.Map; import java.util.function.Consumer; + +import io.streamnative.pulsar.handlers.kop.format.SchemaManager; import lombok.Getter; import org.apache.kafka.common.TopicPartition; @@ -34,6 +36,8 @@ protected AppendRecordsContext newObject(Handle handle) { private final Recycler.Handle recyclerHandle; private KafkaTopicManager topicManager; + + private SchemaManager schemaManager; private Consumer startSendOperationForThrottling; private Consumer completeSendOperationForThrottling; private Map pendingTopicFuturesMap; @@ -44,10 +48,12 @@ private AppendRecordsContext(Recycler.Handle recyclerHandl // recycler and get for this object public static AppendRecordsContext get(final KafkaTopicManager topicManager, + final SchemaManager schemaManager, final Consumer startSendOperationForThrottling, final Consumer completeSendOperationForThrottling, final Map pendingTopicFuturesMap) { AppendRecordsContext context = RECYCLER.get(); + context.schemaManager = schemaManager; context.topicManager = topicManager; context.startSendOperationForThrottling = startSendOperationForThrottling; context.completeSendOperationForThrottling = completeSendOperationForThrottling; @@ -58,6 +64,7 @@ public static AppendRecordsContext get(final KafkaTopicManager topicManager, public void recycle() { topicManager = null; + schemaManager = null; startSendOperationForThrottling = null; completeSendOperationForThrottling = null; pendingTopicFuturesMap = null; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java index 2af57102d..e2f4c25b9 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java @@ -353,7 +353,8 @@ public CompletableFuture appendRecords(final MemoryRecords records, time.nanoseconds() - beforeRecordsProcess, TimeUnit.NANOSECONDS); long beforeEncodingStarts = time.nanoseconds(); - final EncodeResult encodeResult = entryFormatter.encode(encodeRequest); + final EncodeResult encodeResult = entryFormatter.encode(encodeRequest, + fullPartitionName, appendRecordsContext.getSchemaManager()); encodeRequest.recycle(); requestStats.getProduceEncodeStats().registerSuccessfulEvent( diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryProxyTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryProxyTest.java index 017c456ea..1a76777fc 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryProxyTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryProxyTest.java @@ -35,7 +35,7 @@ public static Object[] instances() { } public SchemaRegistryProxyTest(String entryFormat, boolean applyAvroSchemaOnDecode) { - super(entryFormat, applyAvroSchemaOnDecode); + super(entryFormat, applyAvroSchemaOnDecode, false); } @BeforeMethod diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryTest.java index 5c761e496..da801ede7 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.util.FutureUtil; @@ -62,19 +63,23 @@ public class SchemaRegistryTest extends KopProtocolHandlerTestBase { protected String bootstrapServers; protected boolean applyAvroSchemaOnDecode; + protected boolean kafkaRegisterAvroSchemaOnEncode; + @Factory public static Object[] instances() { return new Object[] { - new SchemaRegistryTest("pulsar", false), - new SchemaRegistryTest("pulsar", true), - new SchemaRegistryTest("kafka", false), - new SchemaRegistryTest("kafka", true) + new SchemaRegistryTest("pulsar", true, true), + //new SchemaRegistryTest("pulsar", false, false), + //new SchemaRegistryTest("pulsar", true, false), + //new SchemaRegistryTest("kafka", false, false), + //new SchemaRegistryTest("kafka", true, false) }; } - public SchemaRegistryTest(String entryFormat, boolean applyAvroSchemaOnDecode) { + public SchemaRegistryTest(String entryFormat, boolean applyAvroSchemaOnDecode, boolean kafkaRegisterAvroSchemaOnEncode) { super(entryFormat); this.applyAvroSchemaOnDecode = applyAvroSchemaOnDecode; + this.kafkaRegisterAvroSchemaOnEncode = kafkaRegisterAvroSchemaOnEncode; } @BeforeClass @@ -82,6 +87,7 @@ public SchemaRegistryTest(String entryFormat, boolean applyAvroSchemaOnDecode) { protected void setup() throws Exception { super.enableSchemaRegistry = true; this.conf.setKafkaApplyAvroSchemaOnDecode(applyAvroSchemaOnDecode); + this.conf.setKafkaRegisterAvroSchemaOnEncode(kafkaRegisterAvroSchemaOnEncode); this.internalSetup(); bootstrapServers = "localhost:" + getKafkaBrokerPort(); } @@ -131,6 +137,9 @@ public void testAvroProduceAndConsume() throws Throwable { String topic = "SchemaRegistryTest-testAvroProduceAndConsume_" + entryFormat + "_" + applyAvroSchemaOnDecode; IndexedRecord avroRecord = createAvroRecord(); Object[] objects = new Object[]{avroRecord, true, 130, 345L, 1.23f, 2.34d, "abc", "def".getBytes()}; + if (kafkaRegisterAvroSchemaOnEncode) { + objects = new Object[]{avroRecord}; + } @Cleanup KafkaProducer producer = createAvroProducer(); for (int i = 0; i < objects.length; i++) {