Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.RetriableException;

Expand Down Expand Up @@ -83,6 +84,10 @@ public ConsumerGroupSummary describeConsumerGroup(final String group) {
return new ConsumerGroupSummary(results);
} catch (final GroupAuthorizationException e) {
throw new KsqlGroupAuthorizationException(AclOperation.DESCRIBE, group);
} catch (final GroupIdNotFoundException e) {
// KIP-1043: GroupIdNotFoundException signals group doesn't exist (a state, not a failure).
// Re-throw the exception, rather than generalizing it as a KafkaResponseGetFailedException.
throw e;
} catch (final Exception e) {
throw new KafkaResponseGetFailedException(
"Failed to describe Kafka consumer groups: " + group, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import static io.confluent.ksql.serde.FormatFactory.JSON;
import static io.confluent.ksql.serde.FormatFactory.KAFKA;
import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;

import com.google.common.collect.ImmutableList;

Expand All @@ -46,9 +44,12 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.raft.errors.RaftException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
Expand All @@ -67,6 +68,7 @@ public class KafkaConsumerGroupClientTest {
private static final int PARTITION_COUNT = 3;

private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();
private static final Logger log = LogManager.getLogger(KafkaConsumerGroupClientTest.class);

@ClassRule
public static final RuleChain clusterWithRetry = RuleChain
Expand Down Expand Up @@ -104,16 +106,16 @@ public void shouldListConsumerGroupsWhenTheyExist() {
verifyListsGroups(group1, ImmutableList.of(group0, group1));
}

// @Test
// public void shouldDescribeConsumerGroup() {
// givenTopicExistsWithData();
// try (KafkaConsumer<String, byte[]> c1 = createConsumer(group0)) {
// verifyDescribeConsumerGroup(1, group0, ImmutableList.of(c1));
// try (KafkaConsumer<String, byte[]> c2 = createConsumer(group0)) {
// verifyDescribeConsumerGroup(2, group0, ImmutableList.of(c1, c2));
// }
// }
// }
@Test
public void shouldDescribeConsumerGroup() {
givenTopicExistsWithData();
try (KafkaConsumer<String, byte[]> c1 = createConsumer(group0)) {
verifyDescribeConsumerGroup(1, group0, ImmutableList.of(c1));
try (KafkaConsumer<String, byte[]> c2 = createConsumer(group0)) {
verifyDescribeConsumerGroup(2, group0, ImmutableList.of(c1, c2));
}
}
}

@Test
public void shouldListConsumerGroupOffsetsWhenTheyExist() {
Expand All @@ -127,6 +129,7 @@ private void verifyDescribeConsumerGroup(
final List<KafkaConsumer<?, ?>> consumers
) {
final Supplier<ConsumerAndPartitionCount> pollAndGetCounts = () -> {
try{
consumers.forEach(consumer -> consumer.poll(Duration.ofMillis(1)));

final Collection<ConsumerSummary> summaries = consumerGroupClient
Expand All @@ -137,6 +140,12 @@ private void verifyDescribeConsumerGroup(
.sum();

return new ConsumerAndPartitionCount(consumers.size(), (int) partitionCount);
}catch(final GroupIdNotFoundException exception){
// Treat this scenario as a group with zero consumers.
log.warn("Consumer group '{}' not found. Returning 0 consumers and 0 partitions. Reason: {}",
group, exception.getMessage());
return new ConsumerAndPartitionCount(0, 0);
}
};

assertThatEventually(pollAndGetCounts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.rest.server.execution;

import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.parser.tree.ListTopics;
import io.confluent.ksql.rest.SessionProperties;
import io.confluent.ksql.rest.entity.KafkaTopicInfo;
Expand All @@ -28,10 +29,12 @@
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.ExecutorUtil;
import io.confluent.ksql.util.ReservedInternalTopics;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -43,9 +46,14 @@
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ListTopicsExecutor {

private static final Logger log = LoggerFactory.getLogger(ListTopicsExecutor.class);

private ListTopicsExecutor() {

}
Expand Down Expand Up @@ -133,11 +141,20 @@ private static Map<String, List<Integer>> getTopicConsumerAndGroupCounts(
final Map<String, Set<String>> topicConsumerGroupCount = new HashMap<>();

for (final String group : consumerGroups) {
final Collection<ConsumerSummary> consumerSummaryList =
consumerGroupClient.describeConsumerGroup(group).consumers();
Collection<ConsumerSummary> consumerSummaryList = Collections.emptyList();
try {
consumerSummaryList = ExecutorUtil.executeWithRetries(
() -> consumerGroupClient.describeConsumerGroup(group).consumers(),
e -> e instanceof GroupIdNotFoundException
);
} catch (GroupIdNotFoundException e) {
log.warn("Failed to describe consumer group {} after retries, treating as empty group", group);
}catch(Exception e){
throw new KafkaResponseGetFailedException(
"Failed to describe Kafka consumer groups: " + group, e);
}

for (final KafkaConsumerGroupClientImpl.ConsumerSummary summary : consumerSummaryList) {

for (final TopicPartition topicPartition : summary.partitions()) {
topicConsumerCount
.computeIfAbsent(topicPartition.topic(), k -> new AtomicInteger())
Expand Down