Skip to content

Commit 128d933

Browse files
Demogorgon314eolivelli
authored andcommitted
[fix] Unify fetch offset topic name (#1837)
The CLI needs to check the current committed topic’s name before resetting. But in KoP it will transfer to a short topic name. https://github.com/apache/kafka/blob/d46c3f259cce25c43f20fba3943d5cb34ed909ea/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L818-L820 ``` bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group kop/s1 --topic public/default/p3:0 --reset-offsets --shift-by -10 --execute Cannot shift offset for partition public/default/p3-0 since there is no current committed offset ``` <img width="1120" alt="image" src="https://user-images.githubusercontent.com/22697570/236761387-c06930c1-3072-43b2-84cb-7e16f2504999.png"> Return full name when list consumer group offsets. (cherry picked from commit e58b7b9)
1 parent b08731e commit 128d933

File tree

4 files changed

+20
-14
lines changed

4 files changed

+20
-14
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1109,16 +1109,14 @@ protected CompletableFuture<Void> storeGroupId(String groupId, String groupIdPat
11091109
@VisibleForTesting
11101110
public <T> void replaceTopicPartition(Map<TopicPartition, T> replacedMap,
11111111
Map<TopicPartition, TopicPartition> replacingIndex) {
1112-
String namespacePrefix = currentNamespacePrefix();
11131112
Map<TopicPartition, T> newMap = new HashMap<>();
11141113
replacedMap.entrySet().removeIf(entry -> {
11151114
if (replacingIndex.containsKey(entry.getKey())) {
11161115
newMap.put(replacingIndex.get(entry.getKey()), entry.getValue());
11171116
return true;
11181117
} else if (KopTopic.isFullTopicName(entry.getKey().topic())) {
11191118
newMap.put(new TopicPartition(
1120-
KopTopic.removeDefaultNamespacePrefix(entry.getKey().topic(),
1121-
namespacePrefix),
1119+
KopTopic.removePersistentDomain(entry.getKey().topic()),
11221120
entry.getKey().partition()),
11231121
entry.getValue());
11241122
return true;

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KopTopic.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ public static String removeDefaultNamespacePrefix(String fullTopicName, String n
4242
}
4343
}
4444

45+
public static String removePersistentDomain(String fullTopicName) {
46+
if (fullTopicName.startsWith(persistentDomain)) {
47+
return fullTopicName.substring(persistentDomain.length());
48+
} else {
49+
return fullTopicName;
50+
}
51+
}
52+
4553
@Getter
4654
private final String originalName;
4755
@Getter

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -833,7 +833,7 @@ public void testEmptyReplacingIndex() {
833833
final String topic = "test-topic";
834834

835835
// 1. original tp
836-
final TopicPartition tp0 = new TopicPartition(topic, 0);
836+
final TopicPartition tp0 = new TopicPartition(namespace + "/" + topic, 0);
837837

838838
// 2. full topic and tp
839839
final String fullNameTopic = "persistent://" + namespace + "/" + topic;
@@ -851,7 +851,7 @@ public void testEmptyReplacingIndex() {
851851
assertEquals(1, replacedMap.size());
852852

853853
// 5. after replace, replacedMap has a short topic name
854-
replacedMap.forEach(((topicPartition, s) -> assertEquals(tp0, topicPartition)));
854+
replacedMap.forEach(((topicPartition, s) -> assertEquals(topicPartition, tp0)));
855855
}
856856

857857
@Test
@@ -860,7 +860,7 @@ public void testNonEmptyReplacingIndex() {
860860
final String topic = "test-topic";
861861

862862
// 1. original tp
863-
final TopicPartition tp0 = new TopicPartition(topic, 0);
863+
final TopicPartition tp0 = new TopicPartition(namespace + "/" + topic, 0);
864864

865865
// 2. full topic and tp
866866
final String fullNameTopic = "persistent://" + namespace + "/" + topic;
@@ -879,7 +879,7 @@ public void testNonEmptyReplacingIndex() {
879879
assertEquals(1, replacedMap.size());
880880

881881
// 5. after replace, replacedMap has a short topic name
882-
replacedMap.forEach(((topicPartition, s) -> assertEquals(tp0, topicPartition)));
882+
replacedMap.forEach(((topicPartition, s) -> assertEquals(topicPartition, tp0)));
883883
}
884884

885885
@Test(timeOut = 20000000)
@@ -959,12 +959,10 @@ group2, new ListConsumerGroupOffsetsSpec().topicPartitions(null))
959959
// topic name from offset fetch response must be short topic name
960960
offsetAndMetadataMapGroup1.keySet().forEach(topicPartition -> assertEquals(topic, topicPartition.topic()));
961961

962-
963962
consumer.close();
964963
consumer2.close();
965964
consumer2b.close();
966965
kafkaAdmin.close();
967-
968966
}
969967

970968
@Test(timeOut = 20000)

tests/src/test/java/io/streamnative/pulsar/handlers/kop/OffsetResetTest.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.time.Duration;
2727
import java.util.Collections;
2828
import java.util.List;
29+
import java.util.Map;
2930
import java.util.Properties;
3031
import java.util.concurrent.TimeUnit;
3132
import java.util.stream.Collectors;
@@ -79,7 +80,7 @@ protected void cleanup() throws Exception {
7980

8081
@Test(timeOut = 30000, enabled = false)
8182
public void testGreaterThanEndOffset() throws Exception {
82-
final String topic = "test-reset-offset-topic";
83+
final String topic = "public/default/test-reset-offset-topic";
8384
final String group = "test-reset-offset-groupid";
8485
final int numPartitions = 1;
8586

@@ -313,8 +314,9 @@ private long describeGroups(String group, String topic) {
313314
.map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toList())) {
314315
log.info("offset part: {}", adminClient.listConsumerGroupOffsets(group)
315316
.partitionsToOffsetAndMetadata().get());
316-
long offset = adminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get()
317-
.get(topicPartition).offset();
317+
Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap =
318+
adminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get();
319+
long offset = topicPartitionOffsetAndMetadataMap.get(topicPartition).offset();
318320
long leo = consumer.endOffsets(Collections.singletonList(topicPartition))
319321
.get(topicPartition);
320322
lag += (leo - offset);
@@ -386,7 +388,7 @@ private void readFromOffsetMessagePulsar() throws Exception {
386388

387389
@Test(timeOut = 30000)
388390
public void testCliReset() throws Exception {
389-
String topic = "test-reset-offset-topic";
391+
String topic = "public/default/test-reset-offset-topic";
390392
final String group = "test-reset-offset-groupid";
391393
final int numPartitions = 10;
392394

@@ -420,7 +422,7 @@ public void testCliReset() throws Exception {
420422
assertEquals(msgs, totalMsgs);
421423
kConsumer.getConsumer().commitSync();
422424
readFromOffsetMessagePulsar();
423-
assertEquals(0, describeGroups(group, topic));
425+
assertEquals(describeGroups(group, topic), 0);
424426

425427
// simulate the consumer has closed
426428
kConsumer.close();

0 commit comments

Comments
 (0)