Skip to content

Commit 90de7bd

Browse files
committed
[improve][broker] Refactor topic creation logic
1 parent ec882a1 commit 90de7bd

File tree

1 file changed

+72
-102
lines changed

1 file changed

+72
-102
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java

Lines changed: 72 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -61,112 +61,82 @@ protected CompletableFuture<Void> prepareCreateProducer() {
6161
return createRemoteTopicIfDoesNotExist(TopicName.get(localTopicName).getPartitionedTopicName());
6262
}
6363

64-
private CompletableFuture<Void> createRemoteTopicIfDoesNotExist(String partitionedTopic) {
65-
CompletableFuture<Void> res = new CompletableFuture<>();
66-
admin.topics().getPartitionedTopicMetadataAsync(partitionedTopic).whenComplete((local, t1) -> {
67-
if (t1 != null) {
68-
Throwable actEx = FutureUtil.unwrapCompletionException(t1);
69-
// Local topic is a non-partitioned topic, but end with "-partition-{num}".
70-
if (actEx instanceof PulsarAdminException.NotFoundException) {
71-
replicationAdmin.topics().getPartitionedTopicMetadataAsync(partitionedTopic)
72-
.whenComplete((remote, t2) -> {
73-
if (t2 != null) {
74-
Throwable actEx2 = FutureUtil.unwrapCompletionException(t2);
75-
if (actEx2 instanceof PulsarAdminException.NotFoundException) {
76-
// Both clusters have a non-partitioned topic, but the topic name is end with
77-
// "-partition-{num}".
78-
// Check partition metadata with the special name.
79-
FutureUtil.completeAfter(res, createRemoteTopicIfDoesNotExist(localTopicName));
80-
} else {
81-
// Failed to get remote partitions.
82-
String errorMsg = String.format("[%s] Can not start replicator because of failed to"
83-
+ " get topic partitions of remote cluster. The topic on the local cluster is"
84-
+ " a non-partitioned topic, but end with -partition-x",
85-
replicatorId);
86-
log.error(errorMsg, actEx);
87-
res.completeExceptionally(new PulsarServerException(errorMsg));
88-
return;
89-
}
90-
return;
91-
}
92-
// Local topic is a non-partitioned topic, but end with "-partition-{num}".
93-
// Remote side: it has a partitioned topic.
94-
String errorMsg = String.format("[%s] Can not start replicator because the"
95-
+ " partitions between local and remote cluster are different."
96-
+ " The topic on the local cluster is a non-partitioned topic, but end with"
97-
+ " -partition-x, and remote side it has %s partitions",
98-
replicatorId, remote.partitions);
99-
log.error(errorMsg);
100-
res.completeExceptionally(new PulsarServerException(errorMsg));
101-
});
102-
return;
103-
}
104-
// Failed to get local partitions.
105-
log.error("[{}] Failed to start replicator because of failed to get partitions of local cluster. The"
106-
+ " topic on the local cluster has {} partitions",
107-
replicatorId, local.partitions, actEx);
108-
res.completeExceptionally(actEx);
109-
return;
110-
}
111-
replicationAdmin.topics().getPartitionedTopicMetadataAsync(partitionedTopic).whenComplete((remote, t2) -> {
112-
if (t2 != null) {
113-
Throwable actEx = FutureUtil.unwrapCompletionException(t2);
114-
// Create the topic on the remote side.
64+
private CompletableFuture<Integer> getLocalPartitionMetadata(String topic) {
65+
return admin.topics().getPartitionedTopicMetadataAsync(topic)
66+
.thenApply(metadata -> metadata.partitions)
67+
.exceptionallyCompose(t -> {
68+
Throwable actEx = FutureUtil.unwrapCompletionException(t);
11569
if (actEx instanceof PulsarAdminException.NotFoundException) {
116-
// Not allowed replicator to create topics on the remote side.
117-
if (!brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
118-
String errorMsg = String.format("[%s] Can not start replicator because there is no topic on"
119-
+ " the remote cluster. Please create a %s on the remote cluster",
120-
replicatorId, local.partitions == 0 ? "non-partitioned topic"
121-
: "partitioned topic with " + local.partitions + " partitions");
122-
log.error(errorMsg);
123-
res.completeExceptionally(new PulsarServerException(errorMsg));
124-
return;
125-
}
126-
// Create non-partitioned topics.
127-
// Print errors if failed to create topocs.
128-
res.whenComplete((__, t) -> {
129-
if (t == null) {
130-
return;
131-
}
132-
// Failed to get remote partitions.
133-
log.error("[{}] Failed to start replicator because of failed to"
134-
+ " create topic on the remote cluster. The topic on the local cluster has"
135-
+ " {} partitions", replicatorId, local.partitions, t);
136-
});
137-
if (local.partitions == 0) {
138-
FutureUtil.completeAfter(res, replicationAdmin.topics()
139-
.createNonPartitionedTopicAsync(partitionedTopic));
140-
return;
141-
}
142-
// Create partitioned topics.
143-
FutureUtil.completeAfter(res, replicationAdmin.topics()
144-
.createPartitionedTopicAsync(partitionedTopic, local.partitions));
145-
return;
70+
// Legacy edge case: Local topic is non-partitioned but name ends with "-partition-{num}".
71+
// This should never happen in practice because PIP-414 disables this naming pattern.
72+
return createRemoteTopicIfDoesNotExist(localTopicName)
73+
.thenApply(__ -> -1); // Special marker
14674
}
147-
// Failed to get remote partitions.
148-
String errorMsg = String.format("[%s] Can not start replicator because of failed to get"
149-
+ " topic partitions of remote cluster. The topic on the local cluster has"
150-
+ " %s partitions",
151-
replicatorId, local.partitions);
152-
log.error(errorMsg, actEx);
153-
res.completeExceptionally(new PulsarServerException(errorMsg));
154-
return;
155-
}
156-
// Compacted partitions.
157-
if (local.partitions == remote.partitions) {
158-
res.complete(null);
159-
return;
160-
}
161-
// Incompatible partitions between clusters.
162-
String errorMsg = String.format("[%s] Can not start replicator because the partitions between"
163-
+ " local and remote cluster are different. local: %s, remote: %s", replicatorId,
164-
local.partitions, remote.partitions);
75+
return CompletableFuture.failedFuture(actEx);
76+
});
77+
}
78+
79+
private CompletableFuture<Integer> getRemotePartitionMetadata(String topic) {
80+
return replicationAdmin.topics().getPartitionedTopicMetadataAsync(topic)
81+
.thenApply(metadata -> metadata.partitions)
82+
.exceptionallyCompose(t -> {
83+
Throwable actEx = FutureUtil.unwrapCompletionException(t);
84+
if (actEx instanceof PulsarAdminException.NotFoundException) {
85+
return CompletableFuture.completedFuture(-1); // Topic doesn't exist
86+
}
87+
return CompletableFuture.failedFuture(actEx);
88+
});
89+
}
90+
91+
private CompletableFuture<Void> handlePartitionComparison(String topic, int localPartitions, int remotePartitions) {
92+
// Skip if already handled by recursion
93+
if (localPartitions == -1) {
94+
return CompletableFuture.completedFuture(null);
95+
}
96+
97+
// Remote topic doesn't exist - create it
98+
if (remotePartitions == -1) {
99+
if (!brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
100+
String errorMsg = String.format("[%s] Can not start replicator because there is no topic on"
101+
+ " the remote cluster. Please create a %s on the remote cluster",
102+
replicatorId, localPartitions == 0 ? "non-partitioned topic"
103+
: "partitioned topic with " + localPartitions + " partitions");
165104
log.error(errorMsg);
166-
res.completeExceptionally(new PulsarServerException(errorMsg));
105+
return CompletableFuture.failedFuture(new PulsarServerException(errorMsg));
106+
}
107+
108+
CompletableFuture<Void> createFuture = localPartitions == 0
109+
? replicationAdmin.topics().createNonPartitionedTopicAsync(topic)
110+
: replicationAdmin.topics().createPartitionedTopicAsync(topic, localPartitions);
111+
112+
return createFuture.whenComplete((__, t) -> {
113+
if (t != null) {
114+
log.error("[{}] Failed to create topic on remote cluster. Local has {} partitions",
115+
replicatorId, localPartitions, t);
116+
}
167117
});
168-
});
169-
return res;
118+
}
119+
120+
// Both exist - verify compatibility
121+
if (localPartitions == remotePartitions) {
122+
return CompletableFuture.completedFuture(null);
123+
}
124+
125+
// Incompatible partitions
126+
String errorMsg = String.format("[%s] Can not start replicator because the partitions between"
127+
+ " local and remote cluster are different. local: %s, remote: %s",
128+
replicatorId, localPartitions, remotePartitions);
129+
log.error(errorMsg);
130+
return CompletableFuture.failedFuture(new PulsarServerException(errorMsg));
131+
}
132+
133+
private CompletableFuture<Void> createRemoteTopicIfDoesNotExist(String partitionedTopic) {
134+
return getLocalPartitionMetadata(partitionedTopic)
135+
.thenCompose(localPartitions ->
136+
getRemotePartitionMetadata(partitionedTopic).thenCompose(remotePartitions ->
137+
handlePartitionComparison(partitionedTopic, localPartitions, remotePartitions)
138+
)
139+
);
170140
}
171141

172142
@Override

0 commit comments

Comments
 (0)