Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 5193592

Browse files
[fix][transaction] Use pulsar format when write marker to __consumer_offsets topic (#1994)
### Motivation We should use pulsar format to encode the marker when writing into the `__consumer_offsets` topic, otherwise it will have the following errors: ``` 2023-08-09T09:17:18,398+0000 [pulsar-client-io-69-3] ERROR org.apache.pulsar.client.impl.ConsumerImpl - [public/__kafka/__consumer_offsets-partition-42][reader-ca0ccccd79] Discarding corrupted message at 6:5 2023-08-09T09:17:18,402+0000 [pulsar-client-io-69-3] WARN org.apache.pulsar.client.impl.ConsumerImpl - [reader-ca0ccccd79] [98942] unable to obtain message in batch java.lang.IllegalStateException: java.lang.IllegalStateException: Some required fields are missing ``` ### Modifications Use pulsar format when write marker to __consumer_offsets topic
1 parent 33d3c04 commit 5193592

File tree

4 files changed

+14
-7
lines changed

4 files changed

+14
-7
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
*/
2525
public class EntryFormatterFactory {
2626

27-
enum EntryFormat {
27+
public enum EntryFormat {
2828
PULSAR,
2929
KAFKA,
3030
MIXED_KAFKA

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ class AnalyzeResult {
112112
public class PartitionLog {
113113

114114
public static final String KAFKA_TOPIC_UUID_PROPERTY_NAME = "kafkaTopicUUID";
115+
public static final String KAFKA_ENTRY_FORMATTER_PROPERTY_NAME = "kafkaEntryFormat";
115116
private static final String PID_PREFIX = "KOP-PID-PREFIX";
116117

117118
private static final KopLogValidator.CompressionCodec DEFAULT_COMPRESSION =
@@ -252,7 +253,8 @@ private CompletableFuture<Map<String, String>> fetchTopicProperties(Optional<Per
252253
private EntryFormatter buildEntryFormatter(Map<String, String> topicProperties) {
253254
final String entryFormat;
254255
if (topicProperties != null) {
255-
entryFormat = topicProperties.getOrDefault("kafkaEntryFormat", kafkaConfig.getEntryFormat());
256+
entryFormat = topicProperties
257+
.getOrDefault(KAFKA_ENTRY_FORMATTER_PROPERTY_NAME, kafkaConfig.getEntryFormat());
256258
} else {
257259
entryFormat = kafkaConfig.getEntryFormat();
258260
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515

1616
import com.google.common.collect.Sets;
1717
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
18+
import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory;
19+
import io.streamnative.pulsar.handlers.kop.storage.PartitionLog;
1820
import java.util.Collections;
1921
import java.util.List;
22+
import java.util.Map;
2023
import java.util.Set;
2124
import lombok.extern.slf4j.Slf4j;
2225
import org.apache.kafka.common.internals.Topic;
@@ -332,10 +335,12 @@ private static void createTopicIfNotExist(final KafkaServiceConfiguration conf,
332335
final String topic,
333336
final int numPartitions,
334337
final boolean partitioned) throws PulsarAdminException {
338+
Map<String, String> properties = Map.of(
339+
PartitionLog.KAFKA_ENTRY_FORMATTER_PROPERTY_NAME, EntryFormatterFactory.EntryFormat.PULSAR.name());
335340
if (partitioned) {
336341
log.info("Creating partitioned topic {} (with {} partitions) if it does not exist", topic, numPartitions);
337342
try {
338-
admin.topics().createPartitionedTopic(topic, numPartitions);
343+
admin.topics().createPartitionedTopic(topic, numPartitions, properties);
339344
} catch (PulsarAdminException.ConflictException e) {
340345
log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage());
341346
}
@@ -347,7 +352,7 @@ private static void createTopicIfNotExist(final KafkaServiceConfiguration conf,
347352
} else {
348353
log.info("Creating non-partitioned topic {}-{} if it does not exist", topic, numPartitions);
349354
try {
350-
admin.topics().createNonPartitionedTopic(topic);
355+
admin.topics().createNonPartitionedTopic(topic, properties);
351356
} catch (PulsarAdminException.ConflictException e) {
352357
log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage());
353358
}

kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,11 @@ public void testCreateKafkaMetadataIfMissing() throws Exception {
126126
verify(mockNamespaces, times(1)).setNamespaceMessageTTL(eq(conf.getKafkaMetadataTenant() + "/"
127127
+ conf.getKafkaMetadataNamespace()), any(Integer.class));
128128
verify(mockTopics, times(1)).createPartitionedTopic(
129-
eq(offsetsTopic.getFullName()), eq(conf.getOffsetsTopicNumPartitions()));
129+
eq(offsetsTopic.getFullName()), eq(conf.getOffsetsTopicNumPartitions()), any());
130130
verify(mockTopics, times(1)).createPartitionedTopic(
131-
eq(txnTopic.getFullName()), eq(conf.getKafkaTxnLogTopicNumPartitions()));
131+
eq(txnTopic.getFullName()), eq(conf.getKafkaTxnLogTopicNumPartitions()), any());
132132
verify(mockTopics, times(1)).createPartitionedTopic(
133-
eq(txnProducerStateTopic.getFullName()), eq(conf.getKafkaTxnProducerStateTopicNumPartitions()));
133+
eq(txnProducerStateTopic.getFullName()), eq(conf.getKafkaTxnProducerStateTopicNumPartitions()), any());
134134
// check user topics namespace doesn't set the policy
135135
verify(mockNamespaces, times(1)).createNamespace(eq(conf.getKafkaTenant() + "/"
136136
+ conf.getKafkaNamespace()), any(Set.class));

0 commit comments

Comments
 (0)