diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/constants/MessageConstants.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/constants/MessageConstants.java new file mode 100644 index 0000000..21f39c1 --- /dev/null +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/constants/MessageConstants.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.clients.constants; + +public final class MessageConstants { + private MessageConstants() { + + } + + public static final String KAFKA_MESSAGE_HEADER_PREFIX = "kafka.header."; +} diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java index 83fb86a..114755c 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java @@ -43,10 +43,15 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; +import org.apache.kafka.clients.constants.MessageConstants; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -209,11 +214,11 @@ private PulsarKafkaConsumer(ConsumerConfig consumerConfig, Schema keySchema, private SubscriptionInitialPosition getStrategy(final String strategy) { switch(strategy) { - case "earliest": - return SubscriptionInitialPosition.Earliest; - default: + case "earliest": + return SubscriptionInitialPosition.Earliest; + default: return SubscriptionInitialPosition.Latest; - } + } } @Override @@ -386,8 +391,8 @@ public ConsumerRecords poll(long timeoutMillis) { TopicPartition tp = new TopicPartition(topic, partition); if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) { - log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp); - resetOffsets(tp); + log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp); + resetOffsets(tp); } K key = getKey(topic, msg); @@ -404,10 +409,33 @@ public ConsumerRecords poll(long timeoutMillis) { timestamp = msg.getEventTime(); timestampType = TimestampType.CREATE_TIME; } - - ConsumerRecord consumerRecord = new ConsumerRecord<>(topic, partition, offset, timestamp, - timestampType, -1, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, value); - + ConsumerRecord consumerRecord; + if (msg.getProperties() != null) { + Headers headers = new RecordHeaders(); + msg.getProperties().forEach((k, v) -> { + // Kafka Specific Headers from the record + if (k.startsWith(MessageConstants.KAFKA_MESSAGE_HEADER_PREFIX)) { + String originalKey = k.replace(MessageConstants.KAFKA_MESSAGE_HEADER_PREFIX, ""); + try { + headers.add(originalKey, Hex.decodeHex(v)); + } catch (DecoderException e) { + log.warn("Corrupted Header Key : {}", originalKey); + throw new RuntimeException(e); + } + } else { + // Default headers injected by the PulsarKafkaProducerClient + // For ex:KafkaMessageRouter.PARTITION_ID + headers.add(k, v.getBytes()); + } + }); + consumerRecord = new ConsumerRecord<>(topic, partition, offset, timestamp, timestampType, -1L, + msg.hasKey() ? msg.getKey().length() : 0, msg.getData() != null ? msg.getData().length : 0, + key, value, headers); + } else { + consumerRecord = new ConsumerRecord<>(topic, partition, offset, timestamp, timestampType, -1L, + msg.hasKey() ? msg.getKey().length() : 0, msg.getData() != null ? msg.getData().length : 0, + key, value); + } records.computeIfAbsent(tp, k -> new ArrayList<>()).add(consumerRecord); // Update last offset seen by application @@ -520,10 +548,12 @@ private CompletableFuture doCommitOffsets(Map implements Producer { private static final Logger log = LoggerFactory.getLogger(PulsarKafkaProducer.class); @@ -387,6 +386,17 @@ private int buildMessage(TypedMessageBuilder builder, ProducerRecord { + String key = MessageConstants.KAFKA_MESSAGE_HEADER_PREFIX + header.key(); + builder.property(key, Hex.encodeHexString(header.value())); + log.debug("Formatted Kafka Specific Headers Before : {}, After : {}", header.key(), key); + }); + } + return value.length; } diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java new file mode 100644 index 0000000..7e4ada5 --- /dev/null +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.kafka.clients.consumer; + +import static org.mockito.ArgumentMatchers.anyCollection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import org.apache.commons.codec.binary.Hex; +import org.apache.kafka.clients.constants.MessageConstants; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.naming.TopicName; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class PulsarKafkaConsumerTest { + + @Test + public void testPulsarKafkaConsumerWithHeaders_noAck() throws Exception { + Consumer consumer = Mockito.mock(Consumer.class); + String topic = "topic"; + + Mockito.when(Mockito.mock(TopicName.class).getPartitionedTopicName()).thenReturn(topic); + Mockito.doReturn("topic").when(consumer).getTopic(); + + MessageMetadata messageMetadata = new MessageMetadata(); + messageMetadata.setPublishTime(System.currentTimeMillis()); + + Map headerMap = new HashMap<>(); + String kafkaHeaderKey = MessageConstants.KAFKA_MESSAGE_HEADER_PREFIX + "header1"; + String kafkaHeaderValue = Hex.encodeHexString(kafkaHeaderKey.getBytes()); + headerMap.put(kafkaHeaderKey, kafkaHeaderValue); + headerMap.put(KafkaMessageRouter.PARTITION_ID, "0"); + Message msg = new MessageImpl<>( + topic, + "1:1", + headerMap, + "string".getBytes(), + Schema.BYTES, + messageMetadata + ); + + PulsarClient mockClient = Mockito.mock(PulsarClient.class); + PulsarClientImpl mockClientImpl = Mockito.mock(PulsarClientImpl.class); + + CompletableFuture mockNoOfPartitionFuture = CompletableFuture.completedFuture(1); + + ClientBuilder mockClientBuilder = Mockito.mock(ClientBuilder.class); + Mockito.doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(Mockito.anyString()); + Mockito.doReturn(mockClient).when(mockClientBuilder).build(); + + Mockito.when(mockClientImpl.getNumberOfPartitions(Mockito.anyString())).thenReturn(mockNoOfPartitionFuture); + + Properties properties = new Properties(); + + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Collections.singletonList("pulsar://localhost:6650")); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-subscription-name"); + + PulsarKafkaConsumer pulsarKafkaConsumer = + new PulsarKafkaConsumer<>(properties, new IntegerDeserializer(), new StringDeserializer()); + + PulsarKafkaConsumer pulsarKafkaConsumerSpy = Mockito.spy(pulsarKafkaConsumer); + + Mockito.doNothing().when(pulsarKafkaConsumerSpy).seekToEnd(anyCollection()); + + pulsarKafkaConsumerSpy.received(consumer, msg); + pulsarKafkaConsumerSpy.poll(100); + pulsarKafkaConsumerSpy.close(); + + Assert.assertEquals(kafkaHeaderValue, msg.getProperty(kafkaHeaderKey)); + Mockito.verify(pulsarKafkaConsumerSpy).seekToEnd(anyCollection()); + Mockito.verify(consumer, Mockito.times(0)).acknowledgeCumulativeAsync(Mockito.any(MessageId.class)); + } + + +} + diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java index b4a7e79..e77eaa4 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java @@ -30,7 +30,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; - +import com.google.api.client.util.Maps; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -38,11 +38,17 @@ import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; import org.apache.avro.reflect.Nullable; +import org.apache.commons.codec.binary.Hex; +import org.apache.kafka.clients.constants.MessageConstants; import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.ProducerBuilder; @@ -55,8 +61,6 @@ import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig; import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -65,12 +69,6 @@ import org.testng.annotations.ObjectFactory; import org.testng.annotations.Test; -import com.google.api.client.util.Maps; - -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.ToString; - @PrepareForTest({PulsarClientKafkaConfig.class, PulsarProducerKafkaConfig.class}) @PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"}) public class PulsarKafkaProducerTest { @@ -104,12 +102,12 @@ public void testPulsarKafkaProducer() { ClientBuilder mockClientBuilder = mock(ClientBuilder.class); ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class); doAnswer(invocation -> { - Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000."); + Assert.assertEquals((int) invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000."); return mockProducerBuilder; }).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class)); doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString()); doAnswer(invocation -> { - Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000."); + Assert.assertEquals((int) invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000."); return mockClientBuilder; }).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class)); @@ -161,18 +159,19 @@ public void testPulsarKafkaInterceptor() throws PulsarClientException { doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString()); doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString()); doReturn(mockProducerBuilder).when(mockProducerBuilder).clone(); - doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept( - (org.apache.pulsar.client.api.ProducerInterceptor) any()); + doReturn(mockProducerBuilder).when(mockProducerBuilder) + .intercept((org.apache.pulsar.client.api.ProducerInterceptor) any()); doReturn(mockProducer).when(mockProducerBuilder).create(); doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage(); doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync(); PowerMockito.mockStatic(PulsarClientKafkaConfig.class); PowerMockito.mockStatic(PulsarProducerKafkaConfig.class); when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder); - when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder); + when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn( + mockProducerBuilder); Properties properties = new Properties(); - List interceptors = new ArrayList(); + List interceptors = new ArrayList(); interceptors.add("org.apache.kafka.clients.producer.PulsarKafkaProducerTest$PulsarKafkaProducerInterceptor"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -185,11 +184,10 @@ public void testPulsarKafkaInterceptor() throws PulsarClientException { // Act PulsarKafkaProducer pulsarKafkaProducer = new PulsarKafkaProducer<>(properties); - pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,"key", "value")); + pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1, "key", "value")); // Verify - verify(mockProducerBuilder, atLeastOnce()).intercept( - (org.apache.pulsar.client.api.ProducerInterceptor)any()); + verify(mockProducerBuilder, atLeastOnce()).intercept((org.apache.pulsar.client.api.ProducerInterceptor) any()); } @Test @@ -212,18 +210,19 @@ public void testPulsarKafkaSendAvro() throws PulsarClientException { doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString()); doReturn(mockProducerBuilder).when(mockProducerBuilder).autoUpdatePartitionsInterval(anyInt(), any()); doReturn(mockProducerBuilder).when(mockProducerBuilder).clone(); - doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept( - (org.apache.pulsar.client.api.ProducerInterceptor) any()); + doReturn(mockProducerBuilder).when(mockProducerBuilder) + .intercept((org.apache.pulsar.client.api.ProducerInterceptor) any()); doReturn(mockProducer).when(mockProducerBuilder).create(); doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage(); doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync(); PowerMockito.mockStatic(PulsarClientKafkaConfig.class); PowerMockito.mockStatic(PulsarProducerKafkaConfig.class); when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder); - when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder); + when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn( + mockProducerBuilder); Properties properties = new Properties(); - List interceptors = new ArrayList(); + List interceptors = new ArrayList(); interceptors.add("org.apache.kafka.clients.producer.PulsarKafkaProducerTest$PulsarKafkaProducerInterceptor"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -246,15 +245,29 @@ public void testPulsarKafkaSendAvro() throws PulsarClientException { foo.setField2("field2"); foo.setField3(3); - pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,foo, bar)); + Headers headers = new RecordHeaders(); + String header1 = "header1"; + String header2 = "header2"; + headers.add(header1, header1.getBytes()); + headers.add(header2, header2.getBytes()); + String keyHeader1 = MessageConstants.KAFKA_MESSAGE_HEADER_PREFIX + header1; + String keyHeader2 = MessageConstants.KAFKA_MESSAGE_HEADER_PREFIX + header2; + + pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1, foo, bar, headers)); + // Verify verify(mockTypedMessageBuilder, times(1)).sendAsync(); - verify(mockProducerBuilder, times(1)).intercept( - (org.apache.pulsar.client.api.ProducerInterceptor) any()); + verify(mockProducerBuilder, times(1)).intercept((org.apache.pulsar.client.api.ProducerInterceptor) any()); + + verify(mockTypedMessageBuilder, times(1)).property(keyHeader1, Hex.encodeHexString(header1.getBytes())); + verify(mockTypedMessageBuilder, times(1)).property(keyHeader2, Hex.encodeHexString(header2.getBytes())); + } - @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.") + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value " + + "2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 " + + "milliseconds.") public void testPulsarKafkaProducerKeepAliveIntervalIllegalArgumentException() { Properties properties = new Properties(); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -272,12 +285,12 @@ public void testAutoRefreshPartitions() throws Exception { ClientBuilder mockClientBuilder = mock(ClientBuilder.class); ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class); doAnswer(invocation -> { - Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000."); + Assert.assertEquals((int) invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000."); return mockProducerBuilder; }).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class)); doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString()); doAnswer(invocation -> { - Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000."); + Assert.assertEquals((int) invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000."); return mockClientBuilder; }).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class)); @@ -313,7 +326,8 @@ public void testAutoRefreshPartitions() throws Exception { producer.close(); } - public static class PulsarKafkaProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor { + public static class PulsarKafkaProducerInterceptor + implements org.apache.kafka.clients.producer.ProducerInterceptor { @Override public ProducerRecord onSend(ProducerRecord record) { diff --git a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java index e9f865b..37cca88 100644 --- a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java +++ b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -20,7 +20,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -31,25 +30,23 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - import lombok.Cleanup; import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; - import org.apache.avro.reflect.Nullable; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.consumer.PulsarKafkaConsumer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.PulsarKafkaProducer; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -62,6 +59,7 @@ import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.StringSchema; +import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter; import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema; import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; import org.testng.Assert; @@ -100,7 +98,7 @@ public void testSimpleProducerConsumer() throws Exception { producerProperties.put("bootstrap.servers", getPlainTextServiceUrl()); producerProperties.put("key.serializer", IntegerSerializer.class.getName()); producerProperties.put("value.serializer", StringSerializer.class.getName()); - Producer producer = new KafkaProducer<>(producerProperties); + Producer producer = new PulsarKafkaProducer<>(producerProperties); Properties consumerProperties = new Properties(); consumerProperties.put("bootstrap.servers", getPlainTextServiceUrl()); @@ -108,7 +106,7 @@ public void testSimpleProducerConsumer() throws Exception { consumerProperties.put("key.deserializer", IntegerDeserializer.class.getName()); consumerProperties.put("value.deserializer", StringDeserializer.class.getName()); consumerProperties.put("enable.auto.commit", "true"); - Consumer consumer = new KafkaConsumer<>(consumerProperties); + Consumer consumer = new PulsarKafkaConsumer<>(consumerProperties); consumer.subscribe(Arrays.asList(topic)); List offsets = new ArrayList<>(); @@ -151,7 +149,7 @@ public void testSimpleConsumer() throws Exception { props.put("value.deserializer", StringDeserializer.class.getName()); @Cleanup - Consumer consumer = new KafkaConsumer<>(props); + Consumer consumer = new PulsarKafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); @Cleanup @@ -172,7 +170,7 @@ public void testSimpleConsumer() throws Exception { String key = Integer.toString(received.get()); String value = "hello-" + received.get(); log.info("Receive record : key = {}, value = {}, topic = {}, ptn = {}", - key, value, record.topic(), record.partition()); + key, value, record.topic(), record.partition()); assertEquals(record.key(), key); assertEquals(record.value(), value); @@ -195,7 +193,7 @@ public void testConsumerAutoCommit() throws Exception { props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); - Consumer consumer = new KafkaConsumer<>(props); + Consumer consumer = new PulsarKafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); @Cleanup @@ -219,7 +217,7 @@ public void testConsumerAutoCommit() throws Exception { consumer.close(); // Re-open consumer and verify every message was acknowledged - Consumer consumer2 = new KafkaConsumer<>(props); + Consumer consumer2 = new PulsarKafkaConsumer<>(props); consumer2.subscribe(Arrays.asList(topic)); ConsumerRecords records = consumer2.poll(100); @@ -238,7 +236,7 @@ public void testConsumerManualOffsetCommit() throws Exception { props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); - Consumer consumer = new KafkaConsumer<>(props); + Consumer consumer = new PulsarKafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); @Cleanup @@ -268,7 +266,7 @@ public void testConsumerManualOffsetCommit() throws Exception { consumer.close(); // Re-open consumer and verify every message was acknowledged - Consumer consumer2 = new KafkaConsumer<>(props); + Consumer consumer2 = new PulsarKafkaConsumer<>(props); consumer2.subscribe(Arrays.asList(topic)); ConsumerRecords records = consumer2.poll(100); @@ -300,7 +298,7 @@ public void testPartitions() throws Exception { // Create 2 Kakfa consumer and verify each gets half of the messages List> consumers = new ArrayList<>(); for (int c = 0; c < 2; c++) { - Consumer consumer = new KafkaConsumer<>(props); + Consumer consumer = new PulsarKafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); consumers.add(consumer); } @@ -314,7 +312,7 @@ public void testPartitions() throws Exception { consumers.forEach(consumer -> { int expectedMessaged = N / consumers.size(); - for (int i = 0; i < expectedMessaged;) { + for (int i = 0; i < expectedMessaged; ) { ConsumerRecords records = consumer.poll(100); i += records.count(); } @@ -342,7 +340,7 @@ public void testExplicitPartitions() throws Exception { producerProperties.put("value.serializer", StringSerializer.class.getName()); @Cleanup - Producer producer = new KafkaProducer<>(producerProperties); + Producer producer = new PulsarKafkaProducer<>(producerProperties); Properties props = new Properties(); props.put("bootstrap.servers", getPlainTextServiceUrl()); @@ -353,7 +351,7 @@ public void testExplicitPartitions() throws Exception { // Create Kakfa consumer and verify all messages came from intended partition @Cleanup - Consumer consumer = new KafkaConsumer<>(props); + Consumer consumer = new PulsarKafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); int N = 8 * 3; @@ -366,7 +364,7 @@ public void testExplicitPartitions() throws Exception { producer.flush(); - for (int i = 0; i < N;) { + for (int i = 0; i < N; ) { ConsumerRecords records = consumer.poll(100); i += records.count(); @@ -395,7 +393,8 @@ public void close() { } @Override - public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, + Cluster cluster) { // Dummy implementation that always return same partition return USED_PARTITION; } @@ -417,7 +416,7 @@ public void testCustomRouter() throws Exception { producerProperties.put("partitioner.class", MyCustomPartitioner.class.getName()); @Cleanup - Producer producer = new KafkaProducer<>(producerProperties); + Producer producer = new PulsarKafkaProducer<>(producerProperties); Properties props = new Properties(); props.put("bootstrap.servers", getPlainTextServiceUrl()); @@ -428,7 +427,7 @@ public void testCustomRouter() throws Exception { // Create Kakfa consumer and verify all messages came from intended partition @Cleanup - Consumer consumer = new KafkaConsumer<>(props); + Consumer consumer = new PulsarKafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); int N = 8 * 3; @@ -439,7 +438,7 @@ public void testCustomRouter() throws Exception { producer.flush(); - for (int i = 0; i < N;) { + for (int i = 0; i < N; ) { ConsumerRecords records = consumer.poll(100); i += records.count(); @@ -465,7 +464,7 @@ public void testConsumerSeek() throws Exception { props.put("value.deserializer", StringDeserializer.class.getName()); props.put("pulsar.consumer.acknowledgments.group.time.millis", "0"); - Consumer consumer = new KafkaConsumer<>(props); + Consumer consumer = new PulsarKafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); @Cleanup @@ -522,7 +521,7 @@ public void testConsumerSeekToEnd() throws Exception { props.put("value.deserializer", StringDeserializer.class.getName()); props.put("pulsar.consumer.acknowledgments.group.time.millis", "0"); - Consumer consumer = new KafkaConsumer<>(props); + Consumer consumer = new PulsarKafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); @Cleanup @@ -552,7 +551,7 @@ public void testConsumerSeekToEnd() throws Exception { consumer.close(); // Recreate the consumer - consumer = new KafkaConsumer<>(props); + consumer = new PulsarKafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); ConsumerRecords records = consumer.poll(100); @@ -578,7 +577,7 @@ public void testSimpleProducer() throws Exception { props.put("key.serializer", IntegerSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); - Producer producer = new KafkaProducer<>(props); + Producer producer = new PulsarKafkaProducer<>(props); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord(topic, i, "hello-" + i)); @@ -611,7 +610,7 @@ public void testProducerCallback() throws Exception { props.put("key.serializer", IntegerSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); - Producer producer = new KafkaProducer<>(props); + Producer producer = new PulsarKafkaProducer<>(props); CountDownLatch counter = new CountDownLatch(10); @@ -644,15 +643,15 @@ public void testProducerAvroSchemaWithPulsarKafkaClient() throws Exception { PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build(); org.apache.pulsar.client.api.Consumer pulsarConsumer = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("my-subscription") - .subscribe(); + .topic(topic) + .subscriptionName("my-subscription") + .subscribe(); Properties props = new Properties(); props.put("bootstrap.servers", getPlainTextServiceUrl()); props.put("key.serializer", IntegerSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); - Producer producer = new KafkaProducer<>(props, barSchema, fooSchema); + Producer producer = new PulsarKafkaProducer<>(props, barSchema, fooSchema); for (int i = 0; i < 10; i++) { Bar bar = new Bar(); bar.setField1(true); @@ -691,12 +690,13 @@ public void testConsumerAvroSchemaWithPulsarKafkaClient() throws Exception { props.put("value.deserializer", StringDeserializer.class.getName()); @Cleanup - Consumer consumer = new KafkaConsumer(props, new StringSchema(), fooSchema); + Consumer consumer = new PulsarKafkaConsumer(props, new StringSchema(), fooSchema); consumer.subscribe(Arrays.asList(topic)); @Cleanup PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build(); - org.apache.pulsar.client.api.Producer pulsarProducer = pulsarClient.newProducer(fooSchema).topic(topic).create(); + org.apache.pulsar.client.api.Producer pulsarProducer = + pulsarClient.newProducer(fooSchema).topic(topic).create(); for (int i = 0; i < 10; i++) { Foo foo = new Foo(); @@ -741,10 +741,10 @@ public void testProducerConsumerAvroSchemaWithPulsarKafkaClient() throws Excepti props.put("value.deserializer", StringDeserializer.class.getName()); @Cleanup - Consumer consumer = new KafkaConsumer<>(props, barSchema, fooSchema); + Consumer consumer = new PulsarKafkaConsumer<>(props, barSchema, fooSchema); consumer.subscribe(Arrays.asList(topic)); - Producer producer = new KafkaProducer<>(props, barSchema, fooSchema); + Producer producer = new PulsarKafkaProducer<>(props, barSchema, fooSchema); for (int i = 0; i < 10; i++) { Bar bar = new Bar(); @@ -795,10 +795,10 @@ public void testProducerConsumerJsonSchemaWithPulsarKafkaClient() throws Excepti props.put("value.deserializer", StringDeserializer.class.getName()); @Cleanup - Consumer consumer = new KafkaConsumer<>(props, barSchema, fooSchema); + Consumer consumer = new PulsarKafkaConsumer<>(props, barSchema, fooSchema); consumer.subscribe(Arrays.asList(topic)); - Producer producer = new KafkaProducer<>(props, barSchema, fooSchema); + Producer producer = new PulsarKafkaProducer<>(props, barSchema, fooSchema); for (int i = 0; i < 10; i++) { Bar bar = new Bar(); @@ -849,10 +849,10 @@ public void testProducerConsumerMixedSchemaWithPulsarKafkaClient() throws Except props.put("value.deserializer", StringDeserializer.class.getName()); @Cleanup - Consumer consumer = new KafkaConsumer<>(props, keySchema, valueSchema); + Consumer consumer = new PulsarKafkaConsumer<>(props, keySchema, valueSchema); consumer.subscribe(Arrays.asList(topic)); - Producer producer = new KafkaProducer<>(props, keySchema, valueSchema); + Producer producer = new PulsarKafkaProducer<>(props, keySchema, valueSchema); for (int i = 0; i < 10; i++) { Foo foo = new Foo(); @@ -882,4 +882,59 @@ public void testProducerConsumerMixedSchemaWithPulsarKafkaClient() throws Except } } } + + @Test + public void testProducerConsumerHeadersWithPulsarKafkaClient() throws Exception { + String topic = "persistent://public/default/testProduceAndConsumeWithHeaders"; + + Properties producerProperties = new Properties(); + producerProperties.put("bootstrap.servers", getPlainTextServiceUrl()); + producerProperties.put("key.serializer", IntegerSerializer.class.getName()); + producerProperties.put("value.serializer", StringSerializer.class.getName()); + Producer producer = new PulsarKafkaProducer<>(producerProperties); + + Properties consumerProperties = new Properties(); + consumerProperties.put("bootstrap.servers", getPlainTextServiceUrl()); + consumerProperties.put("group.id", "my-subscription-name"); + consumerProperties.put("key.deserializer", IntegerDeserializer.class.getName()); + consumerProperties.put("value.deserializer", StringDeserializer.class.getName()); + consumerProperties.put("enable.auto.commit", "true"); + Consumer consumer = new PulsarKafkaConsumer<>(consumerProperties); + consumer.subscribe(Arrays.asList(topic)); + + List offsets = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + Headers headers = new RecordHeaders(); + String header = "header" + i; + headers.add(header, header.getBytes()); + RecordMetadata md = + producer.send(new ProducerRecord<>(topic, 1, i, "hello-" + i, headers)).get(); + offsets.add(md.offset()); + log.info("Published message at {}", Long.toHexString(md.offset())); + } + + producer.flush(); + producer.close(); + + AtomicInteger received = new AtomicInteger(); + while (received.get() < 10) { + ConsumerRecords records = consumer.poll(100); + records.forEach(record -> { + assertEquals(record.key().intValue(), received.get()); + assertEquals(record.value(), "hello-" + received.get()); + assertEquals(record.offset(), offsets.get(received.get()).longValue()); + assertEquals(record.headers().lastHeader("header" + record.key()).key(), "header" + record.key()); + assertEquals(new String(record.headers().lastHeader("header" + record.key()).value()), + "header" + record.key()); + assertEquals(new String(record.headers().lastHeader(KafkaMessageRouter.PARTITION_ID).value()),"1"); + + received.incrementAndGet(); + }); + + consumer.commitSync(); + } + + consumer.close(); + } }