diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index aa70034b5..df5736c51 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -67,7 +67,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; @@ -102,7 +101,9 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.AddOffsetsToTxnRequestData; import org.apache.kafka.common.message.AddOffsetsToTxnResponseData; import org.apache.kafka.common.message.AddPartitionsToTxnRequestData; @@ -539,70 +540,69 @@ protected void handleInactive(KafkaHeaderAndRequest kafkaHeaderAndRequest, // Leverage pulsar admin to get partitioned topic metadata // NOTE: the returned future never completes exceptionally - private CompletableFuture getPartitionedTopicMetadataAsync(String topicName, - boolean allowAutoTopicCreation) { - final CompletableFuture future = new CompletableFuture<>(); - admin.topics().getPartitionedTopicMetadataAsync(topicName).whenComplete((metadata, e) -> { + private CompletableFuture getTopicMetadataAsync(String topic, + boolean allowAutoTopicCreation) { + final CompletableFuture future = new CompletableFuture<>(); + final TopicName topicName = TopicName.get(topic); + admin.topics().getPartitionedTopicMetadataAsync(topic).whenComplete((metadata, e) -> { if (e == null) { - if (metadata.partitions > 0) { - if (log.isDebugEnabled()) { - log.debug("Topic {} has {} partitions", topicName, metadata.partitions); - } - future.complete(metadata.partitions); - } else { - future.complete(TopicAndMetadata.NON_PARTITIONED_NUMBER); + if (log.isDebugEnabled()) { + log.debug("Topic {} has {} partitions", topic, metadata.partitions); } + future.complete(TopicAndMetadata.success(topic, metadata.partitions)); } else if (e instanceof PulsarAdminException.NotFoundException) { - if (allowAutoTopicCreation) { - String namespace = TopicName.get(topicName).getNamespace(); - admin.namespaces().getPoliciesAsync(namespace).whenComplete((policies, err) -> { - if (err != null || policies == null) { - log.error("[{}] Cannot get policies for namespace {}", ctx.channel(), namespace, err); - future.complete(TopicAndMetadata.INVALID_PARTITIONS); - } else { - boolean allowed = kafkaConfig.isAllowAutoTopicCreation(); - if (policies.autoTopicCreationOverride != null) { - allowed = policies.autoTopicCreationOverride.isAllowAutoTopicCreation(); - } - if (!allowed) { - log.error("[{}] Topic {} doesn't exist and it's not allowed " - + "to auto create partitioned topic", ctx.channel(), topicName); - future.complete(TopicAndMetadata.INVALID_PARTITIONS); - } else { - log.info("[{}] Topic {} doesn't exist, auto create it with {} partitions", - ctx.channel(), topicName, defaultNumPartitions); - admin.topics().createPartitionedTopicAsync(topicName, defaultNumPartitions, - Map.of("kafkaTopicUUID", UUID.randomUUID().toString())) - .whenComplete((__, createException) -> { - if (createException == null) { - future.complete(defaultNumPartitions); - } else { - log.warn("[{}] Failed to create partitioned topic {}: {}", - ctx.channel(), topicName, createException.getMessage()); - future.complete(TopicAndMetadata.INVALID_PARTITIONS); - } - }); - } + (allowAutoTopicCreation ? checkAllowAutoTopicCreation(topicName.getNamespace()) + : CompletableFuture.completedFuture(false)).whenComplete((allowed, err) -> { + if (err != null) { + log.error("[{}] Cannot get policies for namespace {}", + ctx.channel(), topicName.getNamespace(), err); + future.complete(TopicAndMetadata.failure(topic, Errors.UNKNOWN_SERVER_ERROR)); + return; + } + if (allowed) { + admin.topics().createPartitionedTopicAsync(topic, defaultNumPartitions) + .whenComplete((__, createException) -> { + if (createException == null) { + future.complete(TopicAndMetadata.success(topic, defaultNumPartitions)); + } else { + log.warn("[{}] Failed to create partitioned topic {}: {}", + ctx.channel(), topicName, createException.getMessage()); + future.complete(TopicAndMetadata.failure(topic, Errors.UNKNOWN_SERVER_ERROR)); + } + }); + } else { + try { + Topic.validate(topicName.getLocalName()); + future.complete(TopicAndMetadata.failure(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)); + } catch (InvalidTopicException ignored) { + future.complete(TopicAndMetadata.failure(topic, Errors.INVALID_TOPIC_EXCEPTION)); } - }); - } else { - log.error("[{}] Topic {} doesn't exist and it's not allowed to auto create partitioned topic", - ctx.channel(), topicName, e); - future.complete(TopicAndMetadata.INVALID_PARTITIONS); - } + } + }); } else { - log.error("[{}] Failed to get partitioned topic {}", ctx.channel(), topicName, e); - future.complete(TopicAndMetadata.INVALID_PARTITIONS); + log.error("[{}] Failed to get partitioned topic {}", ctx.channel(), topic, e); + future.complete(TopicAndMetadata.failure(topic, Errors.UNKNOWN_SERVER_ERROR)); } }); return future; } - private static String path(String... parts) { - StringBuilder sb = new StringBuilder(); - sb.append(POLICY_ROOT); - Joiner.on('/').appendTo(sb, parts); - return sb.toString(); + + private static String path(String... parts){ + StringBuilder sb = new StringBuilder(); + sb.append(POLICY_ROOT); + Joiner.on('/').appendTo(sb, parts); + return sb.toString(); + } + + private CompletableFuture checkAllowAutoTopicCreation(String namespace) { + return admin.namespaces().getPoliciesAsync(namespace).thenApply(policies -> { + if (policies != null && policies.autoTopicCreationOverride != null) { + return policies.autoTopicCreationOverride.isAllowAutoTopicCreation(); + } else { + return kafkaConfig.isAllowAutoTopicCreation(); + } + }); } private CompletableFuture> expandAllowedNamespaces(Set allowedNamespaces) { @@ -664,10 +664,9 @@ private List analyzeFullTopicNames(final Stream fullTo Collections.sort(partitionIndexes); final int lastIndex = partitionIndexes.get(partitionIndexes.size() - 1); if (lastIndex < 0) { - topicAndMetadataList.add( - new TopicAndMetadata(topic, TopicAndMetadata.NON_PARTITIONED_NUMBER)); + topicAndMetadataList.add(TopicAndMetadata.success(topic, 0)); // non-partitioned topic } else if (lastIndex == partitionIndexes.size() - 1) { - topicAndMetadataList.add(new TopicAndMetadata(topic, partitionIndexes.size())); + topicAndMetadataList.add(TopicAndMetadata.success(topic, partitionIndexes.size())); } else { // The partitions should be [0, 1, ..., n-1], `n` is the number of partitions. If the last index is not // `n-1`, there must be some missed partitions. @@ -721,17 +720,16 @@ private CompletableFuture> authorizeTopicsAsync(final Collectio private CompletableFuture> findTopicMetadata(final ListPair listPair, final boolean allowTopicAutoCreation) { - final Map> futureMap = CoreUtils.listToMap( + final Map> futureMap = CoreUtils.listToMap( listPair.getSuccessfulList(), - topic -> getPartitionedTopicMetadataAsync(topic, allowTopicAutoCreation) + topic -> getTopicMetadataAsync(topic, allowTopicAutoCreation) ); return CoreUtils.waitForAll(futureMap.values()).thenApply(__ -> - CoreUtils.mapToList(futureMap, (key, value) -> new TopicAndMetadata(key, value.join())) + CoreUtils.mapToList(futureMap, (___, value) -> value.join()) ).thenApply(authorizedTopicAndMetadataList -> ListUtils.union(authorizedTopicAndMetadataList, CoreUtils.listToList(listPair.getFailedList(), - topic -> new TopicAndMetadata(topic, TopicAndMetadata.AUTHORIZATION_FAILURE)) - ) + topic -> TopicAndMetadata.failure(topic, Errors.TOPIC_AUTHORIZATION_FAILED))) ); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/TopicAndMetadata.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/TopicAndMetadata.java index 6b79077d0..2b3217918 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/TopicAndMetadata.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/TopicAndMetadata.java @@ -36,29 +36,20 @@ @Getter public class TopicAndMetadata { - public static final int INVALID_PARTITIONS = -2; - public static final int AUTHORIZATION_FAILURE = -1; - public static final int NON_PARTITIONED_NUMBER = 0; - private final String topic; private final int numPartitions; + private final Errors error; - public boolean isPartitionedTopic() { - return numPartitions > 0; + public static TopicAndMetadata success(String topic, int numPartitions) { + return new TopicAndMetadata(topic, numPartitions, Errors.NONE); } - public boolean hasNoError() { - return numPartitions >= 0; + public static TopicAndMetadata failure(String topic, Errors error) { + return new TopicAndMetadata(topic, -1, error); } - public Errors error() { - if (hasNoError()) { - return Errors.NONE; - } else if (numPartitions == AUTHORIZATION_FAILURE) { - return Errors.TOPIC_AUTHORIZATION_FAILED; - } else { - return Errors.UNKNOWN_TOPIC_OR_PARTITION; - } + public boolean hasNoError() { + return error == Errors.NONE; } public CompletableFuture lookupAsync( @@ -70,14 +61,14 @@ public CompletableFuture lookupAsync( .map(lookupFunction) .collect(Collectors.toList()), partitionMetadataList -> new TopicMetadata( - error(), + error, getOriginalTopic.apply(topic), KopTopic.isInternalTopic(topic, metadataNamespace), partitionMetadataList )); } - public Stream stream() { + private Stream stream() { if (numPartitions > 0) { return IntStream.range(0, numPartitions) .mapToObj(i -> topic + "-partition-" + i); @@ -89,7 +80,7 @@ public Stream stream() { public TopicMetadata toTopicMetadata(final Function getOriginalTopic, final String metadataNamespace) { return new TopicMetadata( - error(), + error, getOriginalTopic.apply(topic), KopTopic.isInternalTopic(topic, metadataNamespace), Collections.emptyList() diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java index e4aecf83b..76bb5178c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java @@ -208,8 +208,8 @@ private static DecodeResult encodeKafkaResponse(MessageMetadata metadata, ByteBu final ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch( uncompressedPayload, singleMessageMetadata, i, numMessages); - final long timestamp = (metadata.getEventTime() > 0) - ? metadata.getEventTime() + final long timestamp = (singleMessageMetadata.getEventTime() > 0) + ? singleMessageMetadata.getEventTime() : metadata.getPublishTime(); ByteBuffer value = singleMessageMetadata.isNullValue() ? null diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndPulsarTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndPulsarTest.java index 52e6f4814..d612865b6 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndPulsarTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndPulsarTest.java @@ -168,4 +168,47 @@ public void testPublishZeroTimestampRecord() { assertEquals(consumerRecords.get(0).value(), "v1"); assertEquals(consumerRecords.get(0).timestamp(), 0L); } + + @Test(timeOut = 30000) + public void testPublishTimestampInBatch() { + String topic = "test-publish-timestamp-in-batch"; + String subscription = "test-group"; + Properties properties = newKafkaProducerProperties(); + int numRecords = 100; + @Cleanup + final KafkaProducer producer = new KafkaProducer<>(properties); + for (int i = 0; i < numRecords; i++) { + producer.send(new ProducerRecord<>(topic, null, (long) i, "k1", "v1", null)); + } + + producer.flush(); + + @Cleanup + final KafkaConsumer consumer = newKafkaConsumer(topic, subscription); + consumer.subscribe(Collections.singleton(topic)); + List> consumerRecords = receiveRecords(consumer, numRecords); + assertEquals(consumerRecords.size(), numRecords); + for (int i = 0; i < numRecords; i++) { + assertEquals(consumerRecords.get(i).key(), "k1"); + assertEquals(consumerRecords.get(i).value(), "v1"); + assertEquals(consumerRecords.get(i).timestamp(), i); + } + + // Test first record has specified timestamp + producer.send(new ProducerRecord<>(topic, null, 1L, "k1", "v1", null)); + for (int i = 0; i < numRecords; i++) { + producer.send(new ProducerRecord<>(topic, null, "k1", "v1", null)); + } + + consumerRecords = receiveRecords(consumer, numRecords); + assertEquals(consumerRecords.size(), numRecords + 1); + assertEquals(consumerRecords.get(0).key(), "k1"); + assertEquals(consumerRecords.get(0).value(), "v1"); + assertEquals(consumerRecords.get(0).timestamp(), 1L); + for (int i = 1; i < numRecords + 1; i++) { + assertEquals(consumerRecords.get(i).key(), "k1"); + assertEquals(consumerRecords.get(i).value(), "v1"); + assertTrue(consumerRecords.get(i).timestamp() > 0); + } + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java index b13831992..bd2f39dbe 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java @@ -58,6 +58,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collectors; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -110,8 +111,8 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.jetbrains.annotations.NotNull; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Ignore; import org.testng.annotations.Test; @@ -132,7 +133,7 @@ protected void resetConfig() { + SSL_PREFIX + "127.0.0.1:" + kafkaBrokerPortTls); } - @BeforeMethod + @BeforeClass @Override protected void setup() throws Exception { super.internalSetup(); @@ -165,7 +166,7 @@ protected void setup() throws Exception { serviceAddress = new InetSocketAddress(pulsar.getBindAddress(), kafkaBrokerPort); } - @AfterMethod + @AfterClass @Override protected void cleanup() throws Exception { super.internalCleanup(); @@ -702,11 +703,11 @@ private List getCreatedTopics(String topicName, int numTopics) { } private KafkaHeaderAndRequest createTopicMetadataRequest(List topics) { - return createTopicMetadataRequest(topics, true); + return createTopicMetadataRequest(topics, false); } private KafkaHeaderAndRequest createTopicMetadataRequest(List topics, boolean allowAutoTopicCreation) { - AbstractRequest.Builder builder = new MetadataRequest.Builder(topics, allowAutoTopicCreation); + MetadataRequest.Builder builder = new MetadataRequest.Builder(topics, allowAutoTopicCreation); return buildRequest(builder); } @@ -1187,7 +1188,7 @@ public void testIllegalManagedLedger() throws Exception { */ @Test(timeOut = 60000) public void testFetchMinBytesSingleConsumer() throws Exception { - final String topic = "testMinBytesTopic"; + final String topic = "testMinBytesTopicSingleConsumer"; final TopicPartition topicPartition = new TopicPartition(topic, 0); admin.topics().createPartitionedTopic(topic, 1); triggerTopicLookup(topic, 1); @@ -1223,4 +1224,18 @@ public void testFetchMinBytesSingleConsumer() throws Exception { Long waitingFetchesTriggered = kafkaRequestHandler.getRequestStats().getWaitingFetchesTriggered().get(); assertEquals((long) waitingFetchesTriggered, 1); } + + @Test(timeOut = 30000) + public void testTopicMetadataNotFound() { + final Function getMetadataResponseError = topic -> { + final CompletableFuture future = new CompletableFuture<>(); + kafkaRequestHandler.handleTopicMetadataRequest( + createTopicMetadataRequest(Collections.singletonList(topic)), future); + final MetadataResponse response = (MetadataResponse) future.join(); + assertTrue(response.errors().containsKey(topic)); + return response.errors().get(topic); + }; + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, getMetadataResponseError.apply("test-topic-not-found._-")); + assertEquals(Errors.INVALID_TOPIC_EXCEPTION, getMetadataResponseError.apply("???")); + } }