Skip to content

Impl/cherry picks #90

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: 2.10_ds
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
Expand Down Expand Up @@ -102,7 +101,9 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AddOffsetsToTxnRequestData;
import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
Expand Down Expand Up @@ -539,70 +540,69 @@ protected void handleInactive(KafkaHeaderAndRequest kafkaHeaderAndRequest,

// Leverage pulsar admin to get partitioned topic metadata
// NOTE: the returned future never completes exceptionally
private CompletableFuture<Integer> getPartitionedTopicMetadataAsync(String topicName,
boolean allowAutoTopicCreation) {
final CompletableFuture<Integer> future = new CompletableFuture<>();
admin.topics().getPartitionedTopicMetadataAsync(topicName).whenComplete((metadata, e) -> {
private CompletableFuture<TopicAndMetadata> getTopicMetadataAsync(String topic,
boolean allowAutoTopicCreation) {
final CompletableFuture<TopicAndMetadata> future = new CompletableFuture<>();
final TopicName topicName = TopicName.get(topic);
admin.topics().getPartitionedTopicMetadataAsync(topic).whenComplete((metadata, e) -> {
if (e == null) {
if (metadata.partitions > 0) {
if (log.isDebugEnabled()) {
log.debug("Topic {} has {} partitions", topicName, metadata.partitions);
}
future.complete(metadata.partitions);
} else {
future.complete(TopicAndMetadata.NON_PARTITIONED_NUMBER);
if (log.isDebugEnabled()) {
log.debug("Topic {} has {} partitions", topic, metadata.partitions);
}
future.complete(TopicAndMetadata.success(topic, metadata.partitions));
} else if (e instanceof PulsarAdminException.NotFoundException) {
if (allowAutoTopicCreation) {
String namespace = TopicName.get(topicName).getNamespace();
admin.namespaces().getPoliciesAsync(namespace).whenComplete((policies, err) -> {
if (err != null || policies == null) {
log.error("[{}] Cannot get policies for namespace {}", ctx.channel(), namespace, err);
future.complete(TopicAndMetadata.INVALID_PARTITIONS);
} else {
boolean allowed = kafkaConfig.isAllowAutoTopicCreation();
if (policies.autoTopicCreationOverride != null) {
allowed = policies.autoTopicCreationOverride.isAllowAutoTopicCreation();
}
if (!allowed) {
log.error("[{}] Topic {} doesn't exist and it's not allowed "
+ "to auto create partitioned topic", ctx.channel(), topicName);
future.complete(TopicAndMetadata.INVALID_PARTITIONS);
} else {
log.info("[{}] Topic {} doesn't exist, auto create it with {} partitions",
ctx.channel(), topicName, defaultNumPartitions);
admin.topics().createPartitionedTopicAsync(topicName, defaultNumPartitions,
Map.of("kafkaTopicUUID", UUID.randomUUID().toString()))
.whenComplete((__, createException) -> {
if (createException == null) {
future.complete(defaultNumPartitions);
} else {
log.warn("[{}] Failed to create partitioned topic {}: {}",
ctx.channel(), topicName, createException.getMessage());
future.complete(TopicAndMetadata.INVALID_PARTITIONS);
}
});
}
(allowAutoTopicCreation ? checkAllowAutoTopicCreation(topicName.getNamespace())
: CompletableFuture.completedFuture(false)).whenComplete((allowed, err) -> {
if (err != null) {
log.error("[{}] Cannot get policies for namespace {}",
ctx.channel(), topicName.getNamespace(), err);
future.complete(TopicAndMetadata.failure(topic, Errors.UNKNOWN_SERVER_ERROR));
return;
}
if (allowed) {
admin.topics().createPartitionedTopicAsync(topic, defaultNumPartitions)
.whenComplete((__, createException) -> {
if (createException == null) {
future.complete(TopicAndMetadata.success(topic, defaultNumPartitions));
} else {
log.warn("[{}] Failed to create partitioned topic {}: {}",
ctx.channel(), topicName, createException.getMessage());
future.complete(TopicAndMetadata.failure(topic, Errors.UNKNOWN_SERVER_ERROR));
}
});
} else {
try {
Topic.validate(topicName.getLocalName());
future.complete(TopicAndMetadata.failure(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION));
} catch (InvalidTopicException ignored) {
future.complete(TopicAndMetadata.failure(topic, Errors.INVALID_TOPIC_EXCEPTION));
}
});
} else {
log.error("[{}] Topic {} doesn't exist and it's not allowed to auto create partitioned topic",
ctx.channel(), topicName, e);
future.complete(TopicAndMetadata.INVALID_PARTITIONS);
}
}
});
} else {
log.error("[{}] Failed to get partitioned topic {}", ctx.channel(), topicName, e);
future.complete(TopicAndMetadata.INVALID_PARTITIONS);
log.error("[{}] Failed to get partitioned topic {}", ctx.channel(), topic, e);
future.complete(TopicAndMetadata.failure(topic, Errors.UNKNOWN_SERVER_ERROR));
}
});
return future;
}

private static String path(String... parts) {
StringBuilder sb = new StringBuilder();
sb.append(POLICY_ROOT);
Joiner.on('/').appendTo(sb, parts);
return sb.toString();

private static String path(String... parts){
StringBuilder sb = new StringBuilder();
sb.append(POLICY_ROOT);
Joiner.on('/').appendTo(sb, parts);
return sb.toString();
}

private CompletableFuture<Boolean> checkAllowAutoTopicCreation(String namespace) {
return admin.namespaces().getPoliciesAsync(namespace).thenApply(policies -> {
if (policies != null && policies.autoTopicCreationOverride != null) {
return policies.autoTopicCreationOverride.isAllowAutoTopicCreation();
} else {
return kafkaConfig.isAllowAutoTopicCreation();
}
});
}

private CompletableFuture<Set<String>> expandAllowedNamespaces(Set<String> allowedNamespaces) {
Expand Down Expand Up @@ -664,10 +664,9 @@ private List<TopicAndMetadata> analyzeFullTopicNames(final Stream<String> fullTo
Collections.sort(partitionIndexes);
final int lastIndex = partitionIndexes.get(partitionIndexes.size() - 1);
if (lastIndex < 0) {
topicAndMetadataList.add(
new TopicAndMetadata(topic, TopicAndMetadata.NON_PARTITIONED_NUMBER));
topicAndMetadataList.add(TopicAndMetadata.success(topic, 0)); // non-partitioned topic
} else if (lastIndex == partitionIndexes.size() - 1) {
topicAndMetadataList.add(new TopicAndMetadata(topic, partitionIndexes.size()));
topicAndMetadataList.add(TopicAndMetadata.success(topic, partitionIndexes.size()));
} else {
// The partitions should be [0, 1, ..., n-1], `n` is the number of partitions. If the last index is not
// `n-1`, there must be some missed partitions.
Expand Down Expand Up @@ -721,17 +720,16 @@ private CompletableFuture<ListPair<String>> authorizeTopicsAsync(final Collectio

private CompletableFuture<List<TopicAndMetadata>> findTopicMetadata(final ListPair<String> listPair,
final boolean allowTopicAutoCreation) {
final Map<String, CompletableFuture<Integer>> futureMap = CoreUtils.listToMap(
final Map<String, CompletableFuture<TopicAndMetadata>> futureMap = CoreUtils.listToMap(
listPair.getSuccessfulList(),
topic -> getPartitionedTopicMetadataAsync(topic, allowTopicAutoCreation)
topic -> getTopicMetadataAsync(topic, allowTopicAutoCreation)
);
return CoreUtils.waitForAll(futureMap.values()).thenApply(__ ->
CoreUtils.mapToList(futureMap, (key, value) -> new TopicAndMetadata(key, value.join()))
CoreUtils.mapToList(futureMap, (___, value) -> value.join())
).thenApply(authorizedTopicAndMetadataList ->
ListUtils.union(authorizedTopicAndMetadataList,
CoreUtils.listToList(listPair.getFailedList(),
topic -> new TopicAndMetadata(topic, TopicAndMetadata.AUTHORIZATION_FAILURE))
)
topic -> TopicAndMetadata.failure(topic, Errors.TOPIC_AUTHORIZATION_FAILED)))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,20 @@
@Getter
public class TopicAndMetadata {

public static final int INVALID_PARTITIONS = -2;
public static final int AUTHORIZATION_FAILURE = -1;
public static final int NON_PARTITIONED_NUMBER = 0;

private final String topic;
private final int numPartitions;
private final Errors error;

public boolean isPartitionedTopic() {
return numPartitions > 0;
public static TopicAndMetadata success(String topic, int numPartitions) {
return new TopicAndMetadata(topic, numPartitions, Errors.NONE);
}

public boolean hasNoError() {
return numPartitions >= 0;
public static TopicAndMetadata failure(String topic, Errors error) {
return new TopicAndMetadata(topic, -1, error);
}

public Errors error() {
if (hasNoError()) {
return Errors.NONE;
} else if (numPartitions == AUTHORIZATION_FAILURE) {
return Errors.TOPIC_AUTHORIZATION_FAILED;
} else {
return Errors.UNKNOWN_TOPIC_OR_PARTITION;
}
public boolean hasNoError() {
return error == Errors.NONE;
}

public CompletableFuture<TopicMetadata> lookupAsync(
Expand All @@ -70,14 +61,14 @@ public CompletableFuture<TopicMetadata> lookupAsync(
.map(lookupFunction)
.collect(Collectors.toList()), partitionMetadataList ->
new TopicMetadata(
error(),
error,
getOriginalTopic.apply(topic),
KopTopic.isInternalTopic(topic, metadataNamespace),
partitionMetadataList
));
}

public Stream<String> stream() {
private Stream<String> stream() {
if (numPartitions > 0) {
return IntStream.range(0, numPartitions)
.mapToObj(i -> topic + "-partition-" + i);
Expand All @@ -89,7 +80,7 @@ public Stream<String> stream() {
public TopicMetadata toTopicMetadata(final Function<String, String> getOriginalTopic,
final String metadataNamespace) {
return new TopicMetadata(
error(),
error,
getOriginalTopic.apply(topic),
KopTopic.isInternalTopic(topic, metadataNamespace),
Collections.emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ private static DecodeResult encodeKafkaResponse(MessageMetadata metadata, ByteBu
final ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(
uncompressedPayload, singleMessageMetadata, i, numMessages);

final long timestamp = (metadata.getEventTime() > 0)
? metadata.getEventTime()
final long timestamp = (singleMessageMetadata.getEventTime() > 0)
? singleMessageMetadata.getEventTime()
: metadata.getPublishTime();
ByteBuffer value = singleMessageMetadata.isNullValue()
? null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,47 @@ public void testPublishZeroTimestampRecord() {
assertEquals(consumerRecords.get(0).value(), "v1");
assertEquals(consumerRecords.get(0).timestamp(), 0L);
}

@Test(timeOut = 30000)
public void testPublishTimestampInBatch() {
String topic = "test-publish-timestamp-in-batch";
String subscription = "test-group";
Properties properties = newKafkaProducerProperties();
int numRecords = 100;
@Cleanup
final KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < numRecords; i++) {
producer.send(new ProducerRecord<>(topic, null, (long) i, "k1", "v1", null));
}

producer.flush();

@Cleanup
final KafkaConsumer<String, String> consumer = newKafkaConsumer(topic, subscription);
consumer.subscribe(Collections.singleton(topic));
List<ConsumerRecord<String, String>> consumerRecords = receiveRecords(consumer, numRecords);
assertEquals(consumerRecords.size(), numRecords);
for (int i = 0; i < numRecords; i++) {
assertEquals(consumerRecords.get(i).key(), "k1");
assertEquals(consumerRecords.get(i).value(), "v1");
assertEquals(consumerRecords.get(i).timestamp(), i);
}

// Test first record has specified timestamp
producer.send(new ProducerRecord<>(topic, null, 1L, "k1", "v1", null));
for (int i = 0; i < numRecords; i++) {
producer.send(new ProducerRecord<>(topic, null, "k1", "v1", null));
}

consumerRecords = receiveRecords(consumer, numRecords);
assertEquals(consumerRecords.size(), numRecords + 1);
assertEquals(consumerRecords.get(0).key(), "k1");
assertEquals(consumerRecords.get(0).value(), "v1");
assertEquals(consumerRecords.get(0).timestamp(), 1L);
for (int i = 1; i < numRecords + 1; i++) {
assertEquals(consumerRecords.get(i).key(), "k1");
assertEquals(consumerRecords.get(i).value(), "v1");
assertTrue(consumerRecords.get(i).timestamp() > 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -110,8 +111,8 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.jetbrains.annotations.NotNull;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Ignore;
import org.testng.annotations.Test;
Expand All @@ -132,7 +133,7 @@ protected void resetConfig() {
+ SSL_PREFIX + "127.0.0.1:" + kafkaBrokerPortTls);
}

@BeforeMethod
@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
Expand Down Expand Up @@ -165,7 +166,7 @@ protected void setup() throws Exception {
serviceAddress = new InetSocketAddress(pulsar.getBindAddress(), kafkaBrokerPort);
}

@AfterMethod
@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
Expand Down Expand Up @@ -702,11 +703,11 @@ private List<String> getCreatedTopics(String topicName, int numTopics) {
}

private KafkaHeaderAndRequest createTopicMetadataRequest(List<String> topics) {
return createTopicMetadataRequest(topics, true);
return createTopicMetadataRequest(topics, false);
}

private KafkaHeaderAndRequest createTopicMetadataRequest(List<String> topics, boolean allowAutoTopicCreation) {
AbstractRequest.Builder builder = new MetadataRequest.Builder(topics, allowAutoTopicCreation);
MetadataRequest.Builder builder = new MetadataRequest.Builder(topics, allowAutoTopicCreation);
return buildRequest(builder);
}

Expand Down Expand Up @@ -1187,7 +1188,7 @@ public void testIllegalManagedLedger() throws Exception {
*/
@Test(timeOut = 60000)
public void testFetchMinBytesSingleConsumer() throws Exception {
final String topic = "testMinBytesTopic";
final String topic = "testMinBytesTopicSingleConsumer";
final TopicPartition topicPartition = new TopicPartition(topic, 0);
admin.topics().createPartitionedTopic(topic, 1);
triggerTopicLookup(topic, 1);
Expand Down Expand Up @@ -1223,4 +1224,18 @@ public void testFetchMinBytesSingleConsumer() throws Exception {
Long waitingFetchesTriggered = kafkaRequestHandler.getRequestStats().getWaitingFetchesTriggered().get();
assertEquals((long) waitingFetchesTriggered, 1);
}

@Test(timeOut = 30000)
public void testTopicMetadataNotFound() {
final Function<String, Errors> getMetadataResponseError = topic -> {
final CompletableFuture<AbstractResponse> future = new CompletableFuture<>();
kafkaRequestHandler.handleTopicMetadataRequest(
createTopicMetadataRequest(Collections.singletonList(topic)), future);
final MetadataResponse response = (MetadataResponse) future.join();
assertTrue(response.errors().containsKey(topic));
return response.errors().get(topic);
};
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, getMetadataResponseError.apply("test-topic-not-found._-"));
assertEquals(Errors.INVALID_TOPIC_EXCEPTION, getMetadataResponseError.apply("???"));
}
}