Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
Expand All @@ -57,6 +58,8 @@ public abstract class AbstractReplicator implements Replicator {
protected final String remoteCluster;
protected final PulsarClientImpl replicationClient;
protected final PulsarClientImpl client;
protected final PulsarAdmin replicationAdmin;
protected final PulsarAdmin admin;
protected String replicatorId;
@Getter
protected final Topic localTopic;
Expand Down Expand Up @@ -107,7 +110,8 @@ public enum State {
}

public AbstractReplicator(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName,
String replicatorPrefix, BrokerService brokerService, PulsarClientImpl replicationClient)
String replicatorPrefix, BrokerService brokerService, PulsarClientImpl replicationClient,
PulsarAdmin replicationAdmin)
throws PulsarServerException {
this.brokerService = brokerService;
this.localTopic = localTopic;
Expand All @@ -117,7 +121,9 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl
this.remoteTopicName = remoteTopicName;
this.remoteCluster = StringInterner.intern(remoteCluster);
this.replicationClient = replicationClient;
this.replicationAdmin = replicationAdmin;
this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
this.admin = brokerService.pulsar().getAdminClient();
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
this.replicatorId = String.format("%s | %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -51,9 +52,10 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
private final NonPersistentReplicatorStatsImpl stats = new NonPersistentReplicatorStatsImpl();

public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException {
BrokerService brokerService, PulsarClientImpl replicationClient,
PulsarAdmin replicationAdmin) throws PulsarServerException {
super(localCluster, topic, remoteCluster, topic.getName(), topic.getReplicatorPrefix(), brokerService,
replicationClient);
replicationClient, replicationAdmin);
// NonPersistentReplicator does not support limitation so far, so reset pending queue size to the default value.
producerBuilder.maxPendingMessages(1000);
producerBuilder.blockIfQueueFull(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@
import org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
Expand Down Expand Up @@ -628,14 +630,15 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, No
String localCluster) {
return AbstractReplicator.validatePartitionedTopicAsync(nonPersistentTopic.getName(), brokerService)
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData)))
.thenAccept(replicationClient -> {
.getClusterAsync(remoteCluster))
.thenAccept((clusterData) -> {
PulsarClient replicationClient = brokerService.getReplicationClient(remoteCluster, clusterData);
PulsarAdmin replicationAdmin = brokerService.getClusterPulsarAdmin(remoteCluster, clusterData);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new NonPersistentReplicator(NonPersistentTopic.this, localCluster,
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
remoteCluster, brokerService, (PulsarClientImpl) replicationClient,
replicationAdmin);
} catch (PulsarServerException e) {
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;
Expand All @@ -40,9 +42,10 @@ public class GeoPersistentReplicator extends PersistentReplicator {

public GeoPersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster,
String remoteCluster, BrokerService brokerService,
PulsarClientImpl replicationClient)
PulsarClientImpl replicationClient, PulsarAdmin replicationAdmin)
throws PulsarServerException {
super(localCluster, topic, cursor, remoteCluster, topic.getName(), brokerService, replicationClient);
super(localCluster, topic, cursor, remoteCluster, topic.getName(), brokerService, replicationClient,
replicationAdmin);
}

/**
Expand All @@ -55,29 +58,85 @@ protected String getProducerName() {

@Override
protected CompletableFuture<Void> prepareCreateProducer() {
if (brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Void> topicCheckFuture = new CompletableFuture<>();
replicationClient.getPartitionedTopicMetadata(localTopic.getName(), false, false)
.whenComplete((metadata, ex) -> {
if (ex == null) {
if (metadata.partitions == 0) {
topicCheckFuture.complete(null);
} else {
String errorMsg = String.format("%s Can not create the replicator due to the partitions in the"
+ " remote cluster is not 0, but is %s",
replicatorId, metadata.partitions);
log.error(errorMsg);
topicCheckFuture.completeExceptionally(
new PulsarClientException.NotAllowedException(errorMsg));
return createRemoteTopicIfDoesNotExist(TopicName.get(localTopicName).getPartitionedTopicName());
}

private CompletableFuture<Integer> getLocalPartitionMetadata(String topic) {
return admin.topics().getPartitionedTopicMetadataAsync(topic)
.thenApply(metadata -> metadata.partitions)
.exceptionallyCompose(t -> {
Throwable actEx = FutureUtil.unwrapCompletionException(t);
if (actEx instanceof PulsarAdminException.NotFoundException) {
// Legacy edge case: Local topic is non-partitioned but name ends with "-partition-{num}".
// This should never happen in practice because PIP-414 disables this naming pattern.
return createRemoteTopicIfDoesNotExist(localTopicName)
.thenApply(__ -> -1); // Special marker
}
} else {
topicCheckFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return CompletableFuture.failedFuture(actEx);
});
}

private CompletableFuture<Integer> getRemotePartitionMetadata(String topic) {
return replicationAdmin.topics().getPartitionedTopicMetadataAsync(topic)
.thenApply(metadata -> metadata.partitions)
.exceptionallyCompose(t -> {
Throwable actEx = FutureUtil.unwrapCompletionException(t);
if (actEx instanceof PulsarAdminException.NotFoundException) {
return CompletableFuture.completedFuture(-1); // Topic doesn't exist
}
return CompletableFuture.failedFuture(actEx);
});
}

private CompletableFuture<Void> handlePartitionComparison(String topic, int localPartitions, int remotePartitions) {
// Skip if already handled by recursion
if (localPartitions == -1) {
return CompletableFuture.completedFuture(null);
}

// Remote topic doesn't exist - create it
if (remotePartitions == -1) {
if (!brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
String errorMsg = String.format("[%s] Can not start replicator because there is no topic on"
+ " the remote cluster. Please create a %s on the remote cluster",
replicatorId, localPartitions == 0 ? "non-partitioned topic"
: "partitioned topic with " + localPartitions + " partitions");
log.error(errorMsg);
return CompletableFuture.failedFuture(new PulsarServerException(errorMsg));
}

CompletableFuture<Void> createFuture = localPartitions == 0
? replicationAdmin.topics().createNonPartitionedTopicAsync(topic)
: replicationAdmin.topics().createPartitionedTopicAsync(topic, localPartitions);

return createFuture.whenComplete((__, t) -> {
if (t != null) {
log.error("[{}] Failed to create topic on remote cluster. Local has {} partitions",
replicatorId, localPartitions, t);
}
});
return topicCheckFuture;
}

// Both exist - verify compatibility
if (localPartitions == remotePartitions) {
return CompletableFuture.completedFuture(null);
}

// Incompatible partitions
String errorMsg = String.format("[%s] Can not start replicator because the partitions between"
+ " local and remote cluster are different. local: %s, remote: %s",
replicatorId, localPartitions, remotePartitions);
log.error(errorMsg);
return CompletableFuture.failedFuture(new PulsarServerException(errorMsg));
}

private CompletableFuture<Void> createRemoteTopicIfDoesNotExist(String partitionedTopic) {
return getLocalPartitionMetadata(partitionedTopic)
.thenCompose(localPartitions ->
getRemotePartitionMetadata(partitionedTopic).thenCompose(remotePartitions ->
handlePartitionComparison(partitionedTopic, localPartitions, remotePartitions)
)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -119,11 +120,12 @@ protected enum ReasonOfWaitForCursorRewinding {
protected final LinkedList<InFlightTask> inFlightTasks = new LinkedList<>();

public PersistentReplicator(String localCluster, PersistentTopic localTopic, ManagedCursor cursor,
String remoteCluster, String remoteTopic,
BrokerService brokerService, PulsarClientImpl replicationClient)
String remoteCluster, String remoteTopic,
BrokerService brokerService, PulsarClientImpl replicationClient,
PulsarAdmin replicationAdmin)
throws PulsarServerException {
super(localCluster, localTopic, remoteCluster, remoteTopic, localTopic.getReplicatorPrefix(),
brokerService, replicationClient);
brokerService, replicationClient, replicationAdmin);
this.topic = localTopic;
this.localSchemaTopicName = TopicName.getPartitionedTopicName(localTopicName).toString();
this.cursor = Objects.requireNonNull(cursor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
Expand Down Expand Up @@ -2345,11 +2346,11 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, Ma
String localCluster) {
return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData)))
.thenAccept(replicationClient -> {
if (replicationClient == null) {
.getClusterAsync(remoteCluster))
.thenAccept((clusterData) -> {
PulsarClient replicationClient = brokerService.getReplicationClient(remoteCluster, clusterData);
PulsarAdmin replicationAdmin = brokerService.getClusterPulsarAdmin(remoteCluster, clusterData);
if (replicationClient == null || replicationAdmin == null) {
log.error("[{}] Can not create replicator because the remote client can not be created."
+ " remote cluster: {}. State of transferring : {}",
topic, remoteCluster, transferring);
Expand All @@ -2367,7 +2368,8 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, Ma
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
remoteCluster, brokerService, (PulsarClientImpl) replicationClient,
replicationAdmin);
} catch (PulsarServerException e) {
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
}
Expand Down Expand Up @@ -2433,9 +2435,10 @@ protected CompletableFuture<Void> addShadowReplicationCluster(String shadowTopic
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(localCluster)
.thenApply(clusterData -> brokerService.getReplicationClient(localCluster, clusterData)))
.thenAccept(replicationClient -> {
.getClusterAsync(localCluster))
.thenAccept((clusterData) -> {
PulsarClient replicationClient = brokerService.getReplicationClient(localCluster, clusterData);
PulsarAdmin replicationAdmin = brokerService.getClusterPulsarAdmin(localCluster, clusterData);
Replicator replicator = shadowReplicators.computeIfAbsent(shadowTopic, r -> {
try {
TopicName sourceTopicName = TopicName.get(getName());
Expand All @@ -2444,7 +2447,7 @@ protected CompletableFuture<Void> addShadowReplicationCluster(String shadowTopic
shadowPartitionTopic += "-partition-" + sourceTopicName.getPartitionIndex();
}
return new ShadowReplicator(shadowPartitionTopic, PersistentTopic.this, cursor,
brokerService, (PulsarClientImpl) replicationClient);
brokerService, (PulsarClientImpl) replicationClient, replicationAdmin);
} catch (PulsarServerException e) {
log.error("[{}] ShadowReplicator startup failed {}", topic, shadowTopic, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand All @@ -39,11 +40,12 @@
public class ShadowReplicator extends PersistentReplicator {

public ShadowReplicator(String shadowTopic, PersistentTopic sourceTopic, ManagedCursor cursor,
BrokerService brokerService, PulsarClientImpl replicationClient)
BrokerService brokerService, PulsarClientImpl replicationClient,
PulsarAdmin replicationAdmin)
throws PulsarServerException {
super(brokerService.pulsar().getConfiguration().getClusterName(), sourceTopic, cursor,
brokerService.pulsar().getConfiguration().getClusterName(), shadowTopic, brokerService,
replicationClient);
replicationClient, replicationAdmin);
}

/**
Expand Down
Loading
Loading