@@ -61,112 +61,82 @@ protected CompletableFuture<Void> prepareCreateProducer() {
6161 return createRemoteTopicIfDoesNotExist (TopicName .get (localTopicName ).getPartitionedTopicName ());
6262 }
6363
64- private CompletableFuture <Void > createRemoteTopicIfDoesNotExist (String partitionedTopic ) {
65- CompletableFuture <Void > res = new CompletableFuture <>();
66- admin .topics ().getPartitionedTopicMetadataAsync (partitionedTopic ).whenComplete ((local , t1 ) -> {
67- if (t1 != null ) {
68- Throwable actEx = FutureUtil .unwrapCompletionException (t1 );
69- // Local topic is a non-partitioned topic, but end with "-partition-{num}".
70- if (actEx instanceof PulsarAdminException .NotFoundException ) {
71- replicationAdmin .topics ().getPartitionedTopicMetadataAsync (partitionedTopic )
72- .whenComplete ((remote , t2 ) -> {
73- if (t2 != null ) {
74- Throwable actEx2 = FutureUtil .unwrapCompletionException (t2 );
75- if (actEx2 instanceof PulsarAdminException .NotFoundException ) {
76- // Both clusters have a non-partitioned topic, but the topic name is end with
77- // "-partition-{num}".
78- // Check partition metadata with the special name.
79- FutureUtil .completeAfter (res , createRemoteTopicIfDoesNotExist (localTopicName ));
80- } else {
81- // Failed to get remote partitions.
82- String errorMsg = String .format ("[%s] Can not start replicator because of failed to"
83- + " get topic partitions of remote cluster. The topic on the local cluster is"
84- + " a non-partitioned topic, but end with -partition-x" ,
85- replicatorId );
86- log .error (errorMsg , actEx );
87- res .completeExceptionally (new PulsarServerException (errorMsg ));
88- return ;
89- }
90- return ;
91- }
92- // Local topic is a non-partitioned topic, but end with "-partition-{num}".
93- // Remote side: it has a partitioned topic.
94- String errorMsg = String .format ("[%s] Can not start replicator because the"
95- + " partitions between local and remote cluster are different."
96- + " The topic on the local cluster is a non-partitioned topic, but end with"
97- + " -partition-x, and remote side it has %s partitions" ,
98- replicatorId , remote .partitions );
99- log .error (errorMsg );
100- res .completeExceptionally (new PulsarServerException (errorMsg ));
101- });
102- return ;
103- }
104- // Failed to get local partitions.
105- log .error ("[{}] Failed to start replicator because of failed to get partitions of local cluster. The"
106- + " topic on the local cluster has {} partitions" ,
107- replicatorId , local .partitions , actEx );
108- res .completeExceptionally (actEx );
109- return ;
110- }
111- replicationAdmin .topics ().getPartitionedTopicMetadataAsync (partitionedTopic ).whenComplete ((remote , t2 ) -> {
112- if (t2 != null ) {
113- Throwable actEx = FutureUtil .unwrapCompletionException (t2 );
114- // Create the topic on the remote side.
64+ private CompletableFuture <Integer > getLocalPartitionMetadata (String topic ) {
65+ return admin .topics ().getPartitionedTopicMetadataAsync (topic )
66+ .thenApply (metadata -> metadata .partitions )
67+ .exceptionallyCompose (t -> {
68+ Throwable actEx = FutureUtil .unwrapCompletionException (t );
11569 if (actEx instanceof PulsarAdminException .NotFoundException ) {
116- // Not allowed replicator to create topics on the remote side.
117- if (!brokerService .getPulsar ().getConfig ().isCreateTopicToRemoteClusterForReplication ()) {
118- String errorMsg = String .format ("[%s] Can not start replicator because there is no topic on"
119- + " the remote cluster. Please create a %s on the remote cluster" ,
120- replicatorId , local .partitions == 0 ? "non-partitioned topic"
121- : "partitioned topic with " + local .partitions + " partitions" );
122- log .error (errorMsg );
123- res .completeExceptionally (new PulsarServerException (errorMsg ));
124- return ;
125- }
126- // Create non-partitioned topics.
127- // Print errors if failed to create topocs.
128- res .whenComplete ((__ , t ) -> {
129- if (t == null ) {
130- return ;
131- }
132- // Failed to get remote partitions.
133- log .error ("[{}] Failed to start replicator because of failed to"
134- + " create topic on the remote cluster. The topic on the local cluster has"
135- + " {} partitions" , replicatorId , local .partitions , t );
136- });
137- if (local .partitions == 0 ) {
138- FutureUtil .completeAfter (res , replicationAdmin .topics ()
139- .createNonPartitionedTopicAsync (partitionedTopic ));
140- return ;
141- }
142- // Create partitioned topics.
143- FutureUtil .completeAfter (res , replicationAdmin .topics ()
144- .createPartitionedTopicAsync (partitionedTopic , local .partitions ));
145- return ;
70+ // Legacy edge case: Local topic is non-partitioned but name ends with "-partition-{num}".
71+ // This should never happen in practice because PIP-414 disables this naming pattern.
72+ return createRemoteTopicIfDoesNotExist (localTopicName )
73+ .thenApply (__ -> -1 ); // Special marker
14674 }
147- // Failed to get remote partitions.
148- String errorMsg = String .format ("[%s] Can not start replicator because of failed to get"
149- + " topic partitions of remote cluster. The topic on the local cluster has"
150- + " %s partitions" ,
151- replicatorId , local .partitions );
152- log .error (errorMsg , actEx );
153- res .completeExceptionally (new PulsarServerException (errorMsg ));
154- return ;
155- }
156- // Compacted partitions.
157- if (local .partitions == remote .partitions ) {
158- res .complete (null );
159- return ;
160- }
161- // Incompatible partitions between clusters.
162- String errorMsg = String .format ("[%s] Can not start replicator because the partitions between"
163- + " local and remote cluster are different. local: %s, remote: %s" , replicatorId ,
164- local .partitions , remote .partitions );
75+ return CompletableFuture .failedFuture (actEx );
76+ });
77+ }
78+
79+ private CompletableFuture <Integer > getRemotePartitionMetadata (String topic ) {
80+ return replicationAdmin .topics ().getPartitionedTopicMetadataAsync (topic )
81+ .thenApply (metadata -> metadata .partitions )
82+ .exceptionallyCompose (t -> {
83+ Throwable actEx = FutureUtil .unwrapCompletionException (t );
84+ if (actEx instanceof PulsarAdminException .NotFoundException ) {
85+ return CompletableFuture .completedFuture (-1 ); // Topic doesn't exist
86+ }
87+ return CompletableFuture .failedFuture (actEx );
88+ });
89+ }
90+
91+ private CompletableFuture <Void > handlePartitionComparison (String topic , int localPartitions , int remotePartitions ) {
92+ // Skip if already handled by recursion
93+ if (localPartitions == -1 ) {
94+ return CompletableFuture .completedFuture (null );
95+ }
96+
97+ // Remote topic doesn't exist - create it
98+ if (remotePartitions == -1 ) {
99+ if (!brokerService .getPulsar ().getConfig ().isCreateTopicToRemoteClusterForReplication ()) {
100+ String errorMsg = String .format ("[%s] Can not start replicator because there is no topic on"
101+ + " the remote cluster. Please create a %s on the remote cluster" ,
102+ replicatorId , localPartitions == 0 ? "non-partitioned topic"
103+ : "partitioned topic with " + localPartitions + " partitions" );
165104 log .error (errorMsg );
166- res .completeExceptionally (new PulsarServerException (errorMsg ));
105+ return CompletableFuture .failedFuture (new PulsarServerException (errorMsg ));
106+ }
107+
108+ CompletableFuture <Void > createFuture = localPartitions == 0
109+ ? replicationAdmin .topics ().createNonPartitionedTopicAsync (topic )
110+ : replicationAdmin .topics ().createPartitionedTopicAsync (topic , localPartitions );
111+
112+ return createFuture .whenComplete ((__ , t ) -> {
113+ if (t != null ) {
114+ log .error ("[{}] Failed to create topic on remote cluster. Local has {} partitions" ,
115+ replicatorId , localPartitions , t );
116+ }
167117 });
168- });
169- return res ;
118+ }
119+
120+ // Both exist - verify compatibility
121+ if (localPartitions == remotePartitions ) {
122+ return CompletableFuture .completedFuture (null );
123+ }
124+
125+ // Incompatible partitions
126+ String errorMsg = String .format ("[%s] Can not start replicator because the partitions between"
127+ + " local and remote cluster are different. local: %s, remote: %s" ,
128+ replicatorId , localPartitions , remotePartitions );
129+ log .error (errorMsg );
130+ return CompletableFuture .failedFuture (new PulsarServerException (errorMsg ));
131+ }
132+
133+ private CompletableFuture <Void > createRemoteTopicIfDoesNotExist (String partitionedTopic ) {
134+ return getLocalPartitionMetadata (partitionedTopic )
135+ .thenCompose (localPartitions ->
136+ getRemotePartitionMetadata (partitionedTopic ).thenCompose (remotePartitions ->
137+ handlePartitionComparison (partitionedTopic , localPartitions , remotePartitions )
138+ )
139+ );
170140 }
171141
172142 @ Override
0 commit comments