Skip to content

Commit d5cdce6

Browse files
poorbarcodenodece
authored andcommitted
address comments
1 parent e64cd5e commit d5cdce6

File tree

3 files changed

+208
-130
lines changed

3 files changed

+208
-130
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public abstract class AbstractReplicator implements Replicator {
5959
protected final PulsarClientImpl replicationClient;
6060
protected final PulsarClientImpl client;
6161
protected final PulsarAdmin replicationAdmin;
62+
protected final PulsarAdmin admin;
6263
protected String replicatorId;
6364
@Getter
6465
protected final Topic localTopic;
@@ -122,6 +123,7 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl
122123
this.replicationClient = replicationClient;
123124
this.replicationAdmin = replicationAdmin;
124125
this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
126+
this.admin = brokerService.pulsar().getAdminClient();
125127
this.producer = null;
126128
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
127129
this.replicatorId = String.format("%s | %s",

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java

Lines changed: 104 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,17 @@
2222
import io.netty.buffer.ByteBuf;
2323
import java.util.List;
2424
import java.util.concurrent.CompletableFuture;
25-
import java.util.concurrent.CompletionException;
2625
import lombok.extern.slf4j.Slf4j;
2726
import org.apache.bookkeeper.mledger.Entry;
2827
import org.apache.bookkeeper.mledger.ManagedCursor;
2928
import org.apache.pulsar.broker.PulsarServerException;
3029
import org.apache.pulsar.broker.service.BrokerService;
3130
import org.apache.pulsar.client.admin.PulsarAdmin;
32-
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
33-
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
34-
import org.apache.pulsar.client.api.PulsarClientException;
31+
import org.apache.pulsar.client.admin.PulsarAdminException;
3532
import org.apache.pulsar.client.api.transaction.TxnID;
3633
import org.apache.pulsar.client.impl.MessageImpl;
3734
import org.apache.pulsar.client.impl.PulsarClientImpl;
3835
import org.apache.pulsar.common.naming.TopicName;
39-
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
4036
import org.apache.pulsar.common.protocol.Markers;
4137
import org.apache.pulsar.common.schema.SchemaInfo;
4238
import org.apache.pulsar.common.util.FutureUtil;
@@ -62,137 +58,115 @@ protected String getProducerName() {
6258

6359
@Override
6460
protected CompletableFuture<Void> prepareCreateProducer() {
65-
if (brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
66-
TopicName completeTopicName = TopicName.get(localTopicName);
67-
TopicName baseTopicName;
68-
if (completeTopicName.isPartitioned()) {
69-
baseTopicName = TopicName.get(completeTopicName.getPartitionedTopicName());
70-
} else {
71-
baseTopicName = completeTopicName;
72-
}
73-
// Set useFallbackForNonPIP344Brokers to true when mix of PIP-344 and non-PIP-344 brokers are used, it
74-
// can still work.
75-
return client.getLookup().getPartitionedTopicMetadata(baseTopicName, false, true)
76-
.thenCompose((localMetadata) -> replicationAdmin.topics()
77-
// https://github.com/apache/pulsar/pull/4963
78-
// Use the admin API instead of the client to fetch partitioned metadata
79-
// to prevent automatic topic creation on the remote cluster.
80-
// PIP-344 introduced an option to disable auto-creation when fetching partitioned
81-
// topic metadata via the client, but this requires Pulsar 3.0.x.
82-
// This change is a workaround to support Pulsar 2.4.2.
83-
.getPartitionedTopicMetadataAsync(baseTopicName.toString())
84-
.exceptionally(ex -> {
85-
Throwable throwable = FutureUtil.unwrapCompletionException(ex);
86-
if (throwable instanceof NotFoundException) {
87-
// Topic does not exist on the remote cluster.
88-
return new PartitionedTopicMetadata(0);
89-
}
90-
throw new CompletionException("Failed to get partitioned topic metadata", throwable);
91-
}).thenCompose(remoteMetadata -> {
92-
if (log.isDebugEnabled()) {
93-
log.debug("[{}] Local metadata partitions: {} Remote metadata partitions: {}",
94-
replicatorId, localMetadata.partitions, remoteMetadata.partitions);
95-
}
61+
return createRemoteTopicIfDoesNotExist(TopicName.get(localTopicName).getPartitionedTopicName());
62+
}
9663

97-
// Non-partitioned topic
98-
if (localMetadata.partitions == 0) {
99-
if (localMetadata.partitions == remoteMetadata.partitions) {
100-
return replicationAdmin.topics().createNonPartitionedTopicAsync(localTopicName)
101-
.exceptionally(ex -> {
102-
Throwable throwable = FutureUtil.unwrapCompletionException(ex);
103-
if (throwable instanceof ConflictException) {
104-
// Topic already exists on the remote cluster.
105-
return null;
106-
} else {
107-
throw new CompletionException(
108-
"Failed to create non-partitioned topic", throwable);
109-
}
110-
});
111-
} else {
112-
return FutureUtil.failedFuture(new PulsarClientException.NotAllowedException(
113-
"Topic type is not matched between local and remote cluster: local "
114-
+ "partitions: " + localMetadata.partitions
115-
+ ", remote partitions: " + remoteMetadata.partitions));
116-
}
64+
private CompletableFuture<Void> createRemoteTopicIfDoesNotExist(String partitionedTopic) {
65+
CompletableFuture<Void> res = new CompletableFuture<>();
66+
admin.topics().getPartitionedTopicMetadataAsync(partitionedTopic).whenComplete((local, t1) -> {
67+
if (t1 != null) {
68+
Throwable actEx = FutureUtil.unwrapCompletionException(t1);
69+
// Local topic is a non-partitioned topic, but end with "-partition-{num}".
70+
if (actEx instanceof PulsarAdminException.NotFoundException) {
71+
replicationAdmin.topics().getPartitionedTopicMetadataAsync(partitionedTopic)
72+
.whenComplete((remote, t2) -> {
73+
if (t2 != null) {
74+
Throwable actEx2 = FutureUtil.unwrapCompletionException(t2);
75+
if (actEx2 instanceof PulsarAdminException.NotFoundException) {
76+
// Both clusters have a non-partitioned topic, but the topic name is end with
77+
// "-partition-{num}".
78+
// Check partition metadata with the special name.
79+
FutureUtil.completeAfter(res, createRemoteTopicIfDoesNotExist(localTopicName));
11780
} else {
118-
if (remoteMetadata.partitions == 0) {
119-
if (log.isDebugEnabled()) {
120-
log.debug("[{}] Creating partitioned topic {} with {} partitions",
121-
replicatorId, baseTopicName, localMetadata.partitions);
122-
}
123-
// We maybe need to create a partitioned topic on remote cluster.
124-
return replicationAdmin.topics()
125-
.createPartitionedTopicAsync(baseTopicName.toString(),
126-
localMetadata.partitions)
127-
.exceptionally(ex -> {
128-
Throwable throwable = FutureUtil.unwrapCompletionException(ex);
129-
if (throwable instanceof ConflictException) {
130-
// Topic already exists on the remote cluster.
131-
// This can happen if the topic was created, or the topic is
132-
// non-partitioned.
133-
return null;
134-
} else {
135-
throw new CompletionException(
136-
"Failed to create partitioned topic", throwable);
137-
}
138-
})
139-
.thenCompose((__) -> replicationAdmin.topics()
140-
.getPartitionedTopicMetadataAsync(baseTopicName.toString()))
141-
.thenCompose(metadata -> {
142-
// Double check if the partitioned topic is created
143-
// successfully.
144-
// When partitions is equals to 0, it means this topic is
145-
// non-partitioned, we should throw an exception.
146-
if (completeTopicName.getPartitionIndex() >= metadata.partitions) {
147-
return FutureUtil.failedFuture(
148-
new PulsarClientException.NotAllowedException(
149-
"Topic type is not matched between "
150-
+ "local and "
151-
+ "remote cluster: local "
152-
+ "partitions: "
153-
+ localMetadata.partitions
154-
+ ", remote partitions: "
155-
+ remoteMetadata.partitions));
156-
}
157-
return CompletableFuture.completedFuture(null);
158-
});
159-
} else {
160-
if (localMetadata.partitions != remoteMetadata.partitions) {
161-
return FutureUtil.failedFuture(
162-
new PulsarClientException.NotAllowedException(
163-
"The number of topic partitions is inconsistent between "
164-
+ "local and"
165-
+ " remote "
166-
+ "clusters: local partitions: "
167-
+ localMetadata.partitions
168-
+ ", remote partitions: "
169-
+ remoteMetadata.partitions));
170-
}
171-
}
81+
// Failed to get remote partitions.
82+
String errorMsg = String.format("[%s] Can not start replicator because of failed to"
83+
+ " get topic partitions of remote cluster. The topic on the local cluster is"
84+
+ " a non-partitioned topic, but end with -partition-x",
85+
replicatorId);
86+
log.error(errorMsg, actEx);
87+
res.completeExceptionally(new PulsarServerException(errorMsg));
88+
return;
17289
}
173-
return CompletableFuture.completedFuture(null);
174-
}));
175-
} else {
176-
CompletableFuture<Void> topicCheckFuture = new CompletableFuture<>();
177-
replicationClient.getPartitionedTopicMetadata(localTopic.getName(), false, false)
178-
.whenComplete((metadata, ex) -> {
179-
if (ex == null) {
180-
if (metadata.partitions == 0) {
181-
topicCheckFuture.complete(null);
182-
} else {
183-
String errorMsg = String.format("%s Can not create the replicator due to the partitions in the"
184-
+ " remote cluster is not 0, but is %s",
185-
replicatorId, metadata.partitions);
186-
log.error(errorMsg);
187-
topicCheckFuture.completeExceptionally(
188-
new PulsarClientException.NotAllowedException(errorMsg));
90+
return;
91+
}
92+
// Local topic is a non-partitioned topic, but end with "-partition-{num}".
93+
// Remote side: it has a partitioned topic.
94+
String errorMsg = String.format("[%s] Can not start replicator because the"
95+
+ " partitions between local and remote cluster are different."
96+
+ " The topic on the local cluster is a non-partitioned topic, but end with"
97+
+ " -partition-x, and remote side it has %s partitions",
98+
replicatorId, remote.partitions);
99+
log.error(errorMsg);
100+
res.completeExceptionally(new PulsarServerException(errorMsg));
101+
});
102+
return;
103+
}
104+
// Failed to get local partitions.
105+
log.error("[{}] Failed to start replicator because of failed to get partitions of local cluster. The"
106+
+ " topic on the local cluster has {} partitions",
107+
replicatorId, local.partitions, actEx);
108+
res.completeExceptionally(actEx);
109+
return;
110+
}
111+
replicationAdmin.topics().getPartitionedTopicMetadataAsync(partitionedTopic).whenComplete((remote, t2) -> {
112+
if (t2 != null) {
113+
Throwable actEx = FutureUtil.unwrapCompletionException(t2);
114+
// Create the topic on the remote side.
115+
if (actEx instanceof PulsarAdminException.NotFoundException) {
116+
// Not allowed replicator to create topics on the remote side.
117+
if (!brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
118+
String errorMsg = String.format("[%s] Can not start replicator because there is no topic on"
119+
+ " the remote cluster. Please create a %s on the remote cluster",
120+
replicatorId, local.partitions == 0 ? "non-partitioned topic"
121+
: "partitioned topic with " + local.partitions + " partitions");
122+
log.error(errorMsg);
123+
res.completeExceptionally(new PulsarServerException(errorMsg));
124+
return;
125+
}
126+
// Create non-partitioned topics.
127+
// Print errors if failed to create topocs.
128+
res.whenComplete((__, t) -> {
129+
if (t == null) {
130+
return;
131+
}
132+
// Failed to get remote partitions.
133+
log.error("[{}] Failed to start replicator because of failed to"
134+
+ " create topic on the remote cluster. The topic on the local cluster has"
135+
+ " {} partitions", replicatorId, local.partitions, t);
136+
});
137+
if (local.partitions == 0) {
138+
FutureUtil.completeAfter(res, replicationAdmin.topics()
139+
.createNonPartitionedTopicAsync(partitionedTopic));
140+
return;
141+
}
142+
// Create partitioned topics.
143+
FutureUtil.completeAfter(res, replicationAdmin.topics()
144+
.createPartitionedTopicAsync(partitionedTopic, local.partitions));
145+
return;
189146
}
190-
} else {
191-
topicCheckFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
147+
// Failed to get remote partitions.
148+
String errorMsg = String.format("[%s] Can not start replicator because of failed to get"
149+
+ " topic partitions of remote cluster. The topic on the local cluster has"
150+
+ " %s partitions",
151+
replicatorId, local.partitions);
152+
log.error(errorMsg, actEx);
153+
res.completeExceptionally(new PulsarServerException(errorMsg));
154+
return;
155+
}
156+
// Compacted partitions.
157+
if (local.partitions == remote.partitions) {
158+
res.complete(null);
159+
return;
192160
}
161+
// Incompatible partitions between clusters.
162+
String errorMsg = String.format("[%s] Can not start replicator because the partitions between"
163+
+ " local and remote cluster are different. local: %s, remote: %s", replicatorId,
164+
local.partitions, remote.partitions);
165+
log.error(errorMsg);
166+
res.completeExceptionally(new PulsarServerException(errorMsg));
193167
});
194-
return topicCheckFuture;
195-
}
168+
});
169+
return res;
196170
}
197171

198172
@Override

0 commit comments

Comments
 (0)