Skip to content

[feature] Integration with the Pulsar Schema Registry while writing AVRO using a Kafka Producer #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: 2.10_ds
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,7 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
int timeoutMs = produceRequest.timeout();
String namespacePrefix = currentNamespacePrefix();
final AtomicInteger unfinishedAuthorizationCount = new AtomicInteger(numPartitions);
SchemaManager schemaManager = schemaManagerForTenant.apply(getCurrentTenant());
Runnable completeOne = () -> {
// When complete one authorization or failed, will do the action first.
if (unfinishedAuthorizationCount.decrementAndGet() == 0) {
Expand All @@ -859,6 +860,7 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
}
AppendRecordsContext appendRecordsContext = AppendRecordsContext.get(
topicManager,
schemaManager,
this::startSendOperationForThrottling,
this::completeSendOperationForThrottling,
pendingTopicFuturesMap);
Expand Down Expand Up @@ -2255,12 +2257,14 @@ protected void handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest
}
};

SchemaManager schemaManager = schemaManagerForTenant.apply(getCurrentTenant());
for (WriteTxnMarkersRequest.TxnMarkerEntry marker : markers) {
long producerId = marker.producerId();
TransactionResult transactionResult = marker.transactionResult();
Map<TopicPartition, MemoryRecords> controlRecords = generateTxnMarkerRecords(marker);
AppendRecordsContext appendRecordsContext = AppendRecordsContext.get(
topicManager,
schemaManager,
this::startSendOperationForThrottling,
this::completeSendOperationForThrottling,
this.pendingTopicFuturesMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,12 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private boolean kafkaApplyAvroSchemaOnDecode = false;

@FieldContext(
category = CATEGORY_KOP,
doc = "Register the AVRO schema into the Pulsar Schema Registry while encoding Kafka messages encoded with KafkaAvroSerializer"
)
private boolean kafkaRegisterAvroSchemaOnEncode = false;

@FieldContext(
category = CATEGORY_KOP,
doc = "The broker id, default is 1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@
*/
public interface EntryFormatter {

/**
* Encode Kafka records to a ByteBuf.
*
* @param encodeRequest contains messages with Kafka's format
* @return the EncodeResult contains the ByteBuf of an entry that is to be written to Bookie
*/
default EncodeResult encode(EncodeRequest encodeRequest,
String pulsarTopicName, SchemaManager schemaManager) {
return encode(encodeRequest);
}

/**
* Encode Kafka records to a ByteBuf.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public static EntryFormatter create(final KafkaServiceConfiguration kafkaConfig,
final ImmutableMap<String, EntryFilterWithClassLoader> entryfilterMap) {
final String format = kafkaConfig.getEntryFormat();
final boolean applyAvroSchemaOnDecode = kafkaConfig.isKafkaApplyAvroSchemaOnDecode();
final boolean registerAvroSchemaOnEncode = kafkaConfig.isKafkaRegisterAvroSchemaOnEncode();
try {
EntryFormat entryFormat = Enum.valueOf(EntryFormat.class, format.toUpperCase());

Expand All @@ -43,7 +44,7 @@ public static EntryFormatter create(final KafkaServiceConfiguration kafkaConfig,

switch (entryFormat) {
case PULSAR:
return new PulsarEntryFormatter(applyAvroSchemaOnDecode, entryfilters);
return new PulsarEntryFormatter(applyAvroSchemaOnDecode, registerAvroSchemaOnEncode, entryfilters);
case KAFKA:
return new KafkaV1EntryFormatter(applyAvroSchemaOnDecode, entryfilters);
case MIXED_KAFKA:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
*/
package io.streamnative.pulsar.handlers.kop.format;

import com.google.common.collect.ImmutableMap;
import io.streamnative.pulsar.handlers.kop.schemaregistry.model.Schema;
import io.streamnative.pulsar.handlers.kop.schemaregistry.model.SchemaStorage;
import io.streamnative.pulsar.handlers.kop.schemaregistry.model.SchemaStorageAccessor;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.AllArgsConstructor;
Expand All @@ -26,6 +28,7 @@
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

Expand All @@ -40,6 +43,31 @@ public class PulsarAdminSchemaManager implements SchemaManager{
private final SchemaStorageAccessor kafkaSchemaRegistry;
private final ConcurrentHashMap<String, KeyValueSchemaIds> cache = new ConcurrentHashMap<>();

@Override
public CompletableFuture<BytesSchemaVersion> getSchema(String topic, int id) {
SchemaStorage schemaStorageForTenant = kafkaSchemaRegistry.getSchemaStorageForTenant(tenant);
CompletableFuture<Schema> schemaById = schemaStorageForTenant.findSchemaById(id);
return schemaById.thenCompose(schema -> {
log.info("Kafka Schema {}", schema);
SchemaInfo schemaInfo = SchemaInfo
.builder()
.schema(schema.getSchemaDefinition().getBytes(StandardCharsets.UTF_8))
.name("kafka-schema")
.type(SchemaType.AVRO)
.properties(ImmutableMap.of())
.build();
return pulsarAdmin.schemas().createSchemaAsync(topic, schemaInfo)
.thenCompose(v -> {
return pulsarAdmin.schemas()
.getVersionBySchemaAsync(topic, schemaInfo)
.thenApply(schemaId -> {
return BytesSchemaVersion.of(new LongSchemaVersion(schemaId).bytes());
});
});
});

}

@Override
public CompletableFuture<KeyValueSchemaIds> getSchemaIds(String topic, BytesSchemaVersion schemaVersion) {
if (schemaVersion == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.streamnative.pulsar.handlers.kop.utils.PulsarMessageBuilder;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.StreamSupport;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.kafka.common.header.Header;
Expand All @@ -34,6 +38,7 @@
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;


/**
Expand All @@ -45,13 +50,26 @@ public class PulsarEntryFormatter extends AbstractEntryFormatter {
private static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
private static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024;

private static final byte[] EMPTY_ARRAY = new byte[0];

private static final byte MAGIC_ZERO = 0;

private final boolean registerAvroSchemaOnEncode;

public PulsarEntryFormatter(boolean applyAvroSchemaOnDecode,
boolean registerAvroSchemaOnEncode,
ImmutableList<EntryFilterWithClassLoader> entryfilters) {
super(applyAvroSchemaOnDecode, entryfilters);
this.registerAvroSchemaOnEncode = registerAvroSchemaOnEncode;
}

@Override
public EncodeResult encode(final EncodeRequest encodeRequest) {
return encode(encodeRequest, null, null);
}

@Override
public EncodeResult encode(final EncodeRequest encodeRequest, String pulsarTopicName, SchemaManager schemaManager) {
final MemoryRecords records = encodeRequest.getRecords();
final int numMessages = encodeRequest.getAppendInfo().numMessages();
long currentBatchSizeBytes = 0;
Expand All @@ -68,7 +86,7 @@ public EncodeResult encode(final EncodeRequest encodeRequest) {
records.batches().forEach(recordBatch -> {
boolean controlBatch = recordBatch.isControlBatch();
StreamSupport.stream(recordBatch.spliterator(), true).forEachOrdered(record -> {
MessageImpl<byte[]> message = recordToEntry(record);
MessageImpl<byte[]> message = recordToEntry(record, pulsarTopicName, schemaManager);
messages.add(message);
if (recordBatch.isTransactional()) {
msgMetadata.setTxnidMostBits(recordBatch.producerId());
Expand Down Expand Up @@ -122,7 +140,7 @@ public EncodeResult encode(final EncodeRequest encodeRequest) {
// convert kafka Record to Pulsar Message.
// convert kafka Record to Pulsar Message.
// called when publish received Kafka Record into Pulsar.
private static MessageImpl<byte[]> recordToEntry(Record record) {
private MessageImpl<byte[]> recordToEntry(Record record, String pulsarTopicName, SchemaManager schemaManager) {

PulsarMessageBuilder builder = PulsarMessageBuilder.newBuilder();

Expand All @@ -137,9 +155,39 @@ private static MessageImpl<byte[]> recordToEntry(Record record) {

// value
if (record.hasValue()) {
byte[] value = new byte[record.valueSize()];
record.value().get(value);
builder.value(value);
ByteBuffer recordValue = record.value();
int size = recordValue.remaining();
if (size == 0) {
builder.value(EMPTY_ARRAY);
} else {
if (registerAvroSchemaOnEncode
&& size >= 5 // MAGIC + 4 bytes schema id
&& recordValue.get(0) == MAGIC_ZERO) {
int schemaId = recordValue.getInt(1);
log.info("Schema ID: {}", schemaId);

BytesSchemaVersion schemaVersion;
try {
schemaVersion = schemaManager.getSchema(pulsarTopicName, schemaId)
.get();
} catch (Exception err) {
throw new RuntimeException(err);
}
log.info("Schema version {}", schemaVersion);
if (schemaVersion != null) {
builder.getMetadataBuilder().setSchemaVersion(schemaVersion.get());
}
byte[] value = new byte[record.valueSize() - 5];
// skip magic + schema id
recordValue.position(recordValue.position() + 5);
recordValue.get(value);
builder.value(value);
} else {
byte[] value = new byte[record.valueSize()];
recordValue.get(value);
builder.value(value);
}
}
} else {
builder.value(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ interface KeyValueSchemaIds {
* @return
*/
CompletableFuture<KeyValueSchemaIds> getSchemaIds(String topic, BytesSchemaVersion schemaVersion);

CompletableFuture<BytesSchemaVersion> getSchema(String topic, int id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.streamnative.pulsar.handlers.kop.PendingTopicFutures;
import java.util.Map;
import java.util.function.Consumer;

import io.streamnative.pulsar.handlers.kop.format.SchemaManager;
import lombok.Getter;
import org.apache.kafka.common.TopicPartition;

Expand All @@ -34,6 +36,8 @@ protected AppendRecordsContext newObject(Handle<AppendRecordsContext> handle) {

private final Recycler.Handle<AppendRecordsContext> recyclerHandle;
private KafkaTopicManager topicManager;

private SchemaManager schemaManager;
private Consumer<Integer> startSendOperationForThrottling;
private Consumer<Integer> completeSendOperationForThrottling;
private Map<TopicPartition, PendingTopicFutures> pendingTopicFuturesMap;
Expand All @@ -44,10 +48,12 @@ private AppendRecordsContext(Recycler.Handle<AppendRecordsContext> recyclerHandl

// recycler and get for this object
public static AppendRecordsContext get(final KafkaTopicManager topicManager,
final SchemaManager schemaManager,
final Consumer<Integer> startSendOperationForThrottling,
final Consumer<Integer> completeSendOperationForThrottling,
final Map<TopicPartition, PendingTopicFutures> pendingTopicFuturesMap) {
AppendRecordsContext context = RECYCLER.get();
context.schemaManager = schemaManager;
context.topicManager = topicManager;
context.startSendOperationForThrottling = startSendOperationForThrottling;
context.completeSendOperationForThrottling = completeSendOperationForThrottling;
Expand All @@ -58,6 +64,7 @@ public static AppendRecordsContext get(final KafkaTopicManager topicManager,

public void recycle() {
topicManager = null;
schemaManager = null;
startSendOperationForThrottling = null;
completeSendOperationForThrottling = null;
pendingTopicFuturesMap = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ public CompletableFuture<Long> appendRecords(final MemoryRecords records,
time.nanoseconds() - beforeRecordsProcess, TimeUnit.NANOSECONDS);

long beforeEncodingStarts = time.nanoseconds();
final EncodeResult encodeResult = entryFormatter.encode(encodeRequest);
final EncodeResult encodeResult = entryFormatter.encode(encodeRequest,
fullPartitionName, appendRecordsContext.getSchemaManager());
encodeRequest.recycle();

requestStats.getProduceEncodeStats().registerSuccessfulEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static Object[] instances() {
}

public SchemaRegistryProxyTest(String entryFormat, boolean applyAvroSchemaOnDecode) {
super(entryFormat, applyAvroSchemaOnDecode);
super(entryFormat, applyAvroSchemaOnDecode, false);
}

@BeforeMethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.util.FutureUtil;
Expand All @@ -62,26 +63,31 @@ public class SchemaRegistryTest extends KopProtocolHandlerTestBase {
protected String bootstrapServers;
protected boolean applyAvroSchemaOnDecode;

protected boolean kafkaRegisterAvroSchemaOnEncode;

@Factory
public static Object[] instances() {
return new Object[] {
new SchemaRegistryTest("pulsar", false),
new SchemaRegistryTest("pulsar", true),
new SchemaRegistryTest("kafka", false),
new SchemaRegistryTest("kafka", true)
new SchemaRegistryTest("pulsar", true, true),
//new SchemaRegistryTest("pulsar", false, false),
//new SchemaRegistryTest("pulsar", true, false),
//new SchemaRegistryTest("kafka", false, false),
//new SchemaRegistryTest("kafka", true, false)
};
}

public SchemaRegistryTest(String entryFormat, boolean applyAvroSchemaOnDecode) {
public SchemaRegistryTest(String entryFormat, boolean applyAvroSchemaOnDecode, boolean kafkaRegisterAvroSchemaOnEncode) {
super(entryFormat);
this.applyAvroSchemaOnDecode = applyAvroSchemaOnDecode;
this.kafkaRegisterAvroSchemaOnEncode = kafkaRegisterAvroSchemaOnEncode;
}

@BeforeClass
@Override
protected void setup() throws Exception {
super.enableSchemaRegistry = true;
this.conf.setKafkaApplyAvroSchemaOnDecode(applyAvroSchemaOnDecode);
this.conf.setKafkaRegisterAvroSchemaOnEncode(kafkaRegisterAvroSchemaOnEncode);
this.internalSetup();
bootstrapServers = "localhost:" + getKafkaBrokerPort();
}
Expand Down Expand Up @@ -131,6 +137,9 @@ public void testAvroProduceAndConsume() throws Throwable {
String topic = "SchemaRegistryTest-testAvroProduceAndConsume_" + entryFormat + "_" + applyAvroSchemaOnDecode;
IndexedRecord avroRecord = createAvroRecord();
Object[] objects = new Object[]{avroRecord, true, 130, 345L, 1.23f, 2.34d, "abc", "def".getBytes()};
if (kafkaRegisterAvroSchemaOnEncode) {
objects = new Object[]{avroRecord};
}
@Cleanup
KafkaProducer<Integer, Object> producer = createAvroProducer();
for (int i = 0; i < objects.length; i++) {
Expand Down