2222import io .netty .buffer .ByteBuf ;
2323import java .util .List ;
2424import java .util .concurrent .CompletableFuture ;
25+ import java .util .concurrent .CompletionException ;
2526import lombok .extern .slf4j .Slf4j ;
2627import org .apache .bookkeeper .mledger .Entry ;
2728import org .apache .bookkeeper .mledger .ManagedCursor ;
2829import org .apache .pulsar .broker .PulsarServerException ;
2930import org .apache .pulsar .broker .service .BrokerService ;
31+ import org .apache .pulsar .client .admin .PulsarAdmin ;
32+ import org .apache .pulsar .client .admin .PulsarAdminException .ConflictException ;
33+ import org .apache .pulsar .client .admin .PulsarAdminException .NotFoundException ;
3034import org .apache .pulsar .client .api .PulsarClientException ;
3135import org .apache .pulsar .client .api .transaction .TxnID ;
3236import org .apache .pulsar .client .impl .MessageImpl ;
3337import org .apache .pulsar .client .impl .PulsarClientImpl ;
38+ import org .apache .pulsar .common .naming .TopicName ;
39+ import org .apache .pulsar .common .partition .PartitionedTopicMetadata ;
3440import org .apache .pulsar .common .protocol .Markers ;
3541import org .apache .pulsar .common .schema .SchemaInfo ;
3642import org .apache .pulsar .common .util .FutureUtil ;
@@ -40,9 +46,10 @@ public class GeoPersistentReplicator extends PersistentReplicator {
4046
4147 public GeoPersistentReplicator (PersistentTopic topic , ManagedCursor cursor , String localCluster ,
4248 String remoteCluster , BrokerService brokerService ,
43- PulsarClientImpl replicationClient )
49+ PulsarClientImpl replicationClient , PulsarAdmin replicationAdmin )
4450 throws PulsarServerException {
45- super (localCluster , topic , cursor , remoteCluster , topic .getName (), brokerService , replicationClient );
51+ super (localCluster , topic , cursor , remoteCluster , topic .getName (), brokerService , replicationClient ,
52+ replicationAdmin );
4653 }
4754
4855 /**
@@ -56,7 +63,115 @@ protected String getProducerName() {
5663 @ Override
5764 protected CompletableFuture <Void > prepareCreateProducer () {
5865 if (brokerService .getPulsar ().getConfig ().isCreateTopicToRemoteClusterForReplication ()) {
59- return CompletableFuture .completedFuture (null );
66+ TopicName completeTopicName = TopicName .get (localTopicName );
67+ TopicName baseTopicName ;
68+ if (completeTopicName .isPartitioned ()) {
69+ baseTopicName = TopicName .get (completeTopicName .getPartitionedTopicName ());
70+ } else {
71+ baseTopicName = completeTopicName ;
72+ }
73+ // Set useFallbackForNonPIP344Brokers to true when mix of PIP-344 and non-PIP-344 brokers are used, it
74+ // can still work.
75+ return client .getLookup ().getPartitionedTopicMetadata (baseTopicName , false , true )
76+ .thenCompose ((localMetadata ) -> replicationAdmin .topics ()
77+ // https://github.com/apache/pulsar/pull/4963
78+ // Use the admin API instead of the client to fetch partitioned metadata
79+ // to prevent automatic topic creation on the remote cluster.
80+ // PIP-344 introduced an option to disable auto-creation when fetching partitioned
81+ // topic metadata via the client, but this requires Pulsar 3.0.x.
82+ // This change is a workaround to support Pulsar 2.4.2.
83+ .getPartitionedTopicMetadataAsync (baseTopicName .toString ())
84+ .exceptionally (ex -> {
85+ Throwable throwable = FutureUtil .unwrapCompletionException (ex );
86+ if (throwable instanceof NotFoundException ) {
87+ // Topic does not exist on the remote cluster.
88+ return new PartitionedTopicMetadata (0 );
89+ }
90+ throw new CompletionException ("Failed to get partitioned topic metadata" , throwable );
91+ }).thenCompose (remoteMetadata -> {
92+ if (log .isDebugEnabled ()) {
93+ log .debug ("[{}] Local metadata partitions: {} Remote metadata partitions: {}" ,
94+ replicatorId , localMetadata .partitions , remoteMetadata .partitions );
95+ }
96+
97+ // Non-partitioned topic
98+ if (localMetadata .partitions == 0 ) {
99+ if (localMetadata .partitions == remoteMetadata .partitions ) {
100+ return replicationAdmin .topics ().createNonPartitionedTopicAsync (localTopicName )
101+ .exceptionally (ex -> {
102+ Throwable throwable = FutureUtil .unwrapCompletionException (ex );
103+ if (throwable instanceof ConflictException ) {
104+ // Topic already exists on the remote cluster.
105+ return null ;
106+ } else {
107+ throw new CompletionException (
108+ "Failed to create non-partitioned topic" , throwable );
109+ }
110+ });
111+ } else {
112+ return FutureUtil .failedFuture (new PulsarClientException .NotAllowedException (
113+ "Topic type is not matched between local and remote cluster: local "
114+ + "partitions: " + localMetadata .partitions
115+ + ", remote partitions: " + remoteMetadata .partitions ));
116+ }
117+ } else {
118+ if (remoteMetadata .partitions == 0 ) {
119+ if (log .isDebugEnabled ()) {
120+ log .debug ("[{}] Creating partitioned topic {} with {} partitions" ,
121+ replicatorId , baseTopicName , localMetadata .partitions );
122+ }
123+ // We maybe need to create a partitioned topic on remote cluster.
124+ return replicationAdmin .topics ()
125+ .createPartitionedTopicAsync (baseTopicName .toString (),
126+ localMetadata .partitions )
127+ .exceptionally (ex -> {
128+ Throwable throwable = FutureUtil .unwrapCompletionException (ex );
129+ if (throwable instanceof ConflictException ) {
130+ // Topic already exists on the remote cluster.
131+ // This can happen if the topic was created, or the topic is
132+ // non-partitioned.
133+ return null ;
134+ } else {
135+ throw new CompletionException (
136+ "Failed to create partitioned topic" , throwable );
137+ }
138+ })
139+ .thenCompose ((__ ) -> replicationAdmin .topics ()
140+ .getPartitionedTopicMetadataAsync (baseTopicName .toString ()))
141+ .thenCompose (metadata -> {
142+ // Double check if the partitioned topic is created
143+ // successfully.
144+ // When partitions is equals to 0, it means this topic is
145+ // non-partitioned, we should throw an exception.
146+ if (completeTopicName .getPartitionIndex () >= metadata .partitions ) {
147+ return FutureUtil .failedFuture (
148+ new PulsarClientException .NotAllowedException (
149+ "Topic type is not matched between "
150+ + "local and "
151+ + "remote cluster: local "
152+ + "partitions: "
153+ + localMetadata .partitions
154+ + ", remote partitions: "
155+ + remoteMetadata .partitions ));
156+ }
157+ return CompletableFuture .completedFuture (null );
158+ });
159+ } else {
160+ if (localMetadata .partitions != remoteMetadata .partitions ) {
161+ return FutureUtil .failedFuture (
162+ new PulsarClientException .NotAllowedException (
163+ "The number of topic partitions is inconsistent between "
164+ + "local and"
165+ + " remote "
166+ + "clusters: local partitions: "
167+ + localMetadata .partitions
168+ + ", remote partitions: "
169+ + remoteMetadata .partitions ));
170+ }
171+ }
172+ }
173+ return CompletableFuture .completedFuture (null );
174+ }));
60175 } else {
61176 CompletableFuture <Void > topicCheckFuture = new CompletableFuture <>();
62177 replicationClient .getPartitionedTopicMetadata (localTopic .getName (), false , false )
0 commit comments