diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java index b84d6d9c..90babda7 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.schema.SchemaHash; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.shade.com.google.common.base.Strings; @@ -218,6 +219,14 @@ private Producer getOrCreateProducer(String topic, Schema schema) try { // Use this method for auto creating the non-exist topics. Otherwise, it will throw an // exception. + TopicName topicName = TopicName.get(topic); + // Step-1: create partitioned topic metadata. + if (topicName.isPartitioned()) { + pulsarClient + .getPartitionsForTopic(TopicName.get(topic).getPartitionedTopicName()) + .get(); + } + // Step-2: create partition. pulsarClient.getPartitionsForTopic(topic).get(); } catch (InterruptedException e) { Thread.currentThread().interrupt();