Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
Expand All @@ -82,6 +83,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
Expand Down Expand Up @@ -867,12 +869,331 @@ protected CompletableFuture<Void> internalSetNamespaceReplicationClusters(List<S
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(__ -> replicationClusterSet);
}))
.thenCompose(replicationClusterSet ->
getNamespacePoliciesAsync(namespaceName)
.thenCompose(policies ->
validateReplicationClusterCompatibility(replicationClusterSet,
policies.replication_clusters))
.thenApply(__ -> replicationClusterSet))
.thenCompose(replicationClusterSet -> updatePoliciesAsync(namespaceName, policies -> {
policies.replication_clusters = replicationClusterSet;
return policies;
}));
}

/**
* Validates compatibility between clusters when enabling namespace-level replication.
* This validation is only performed for newly added clusters.
* This includes:
* <ul>
* <li>All topics' partitions that have been created should be the same across clusters,
* including the __change_events system topic</li>
* <li>Auto-creation policies between clusters should be the same, including broker-level
* and namespace-level settings</li>
* </ul>
*
* @param replicationClusterSet the new set of clusters to be configured
* @param existingClusters the existing set of replication clusters
* @return a CompletableFuture that completes when validation passes, or fails with RestException
*/
private CompletableFuture<Void> validateReplicationClusterCompatibility(Set<String> replicationClusterSet,
Set<String> existingClusters) {
String localCluster = pulsar().getConfiguration().getClusterName();

// Skip validation if local cluster is not in the replication set
if (!replicationClusterSet.contains(localCluster)) {
return CompletableFuture.completedFuture(null);
}

// Find newly added clusters
Set<String> newlyAddedClusters = new HashSet<>(replicationClusterSet);
if (existingClusters != null) {
newlyAddedClusters.removeAll(existingClusters);
}

if (newlyAddedClusters.isEmpty()) {
// No new clusters added, skip validation
return CompletableFuture.completedFuture(null);
}

List<String> newRemoteClusters = newlyAddedClusters.stream()
.filter(cluster -> !cluster.equals(localCluster))
.collect(Collectors.toList());

if (newRemoteClusters.isEmpty()) {
return CompletableFuture.completedFuture(null);
}

// Validate compatibility with each newly added remote cluster
List<CompletableFuture<Void>> validationFutures = newRemoteClusters.stream()
.map(remoteCluster -> validateClusterPairCompatibility(localCluster, remoteCluster))
.collect(Collectors.toList());

return FutureUtil.waitForAll(validationFutures);
}

/**
* Validates compatibility between the local cluster and a remote cluster.
*/
private CompletableFuture<Void> validateClusterPairCompatibility(String localCluster, String remoteCluster) {
return clusterResources().getClusterAsync(remoteCluster)
.thenCompose(clusterDataOpt -> {
if (clusterDataOpt.isEmpty()) {
throw new RestException(Status.NOT_FOUND, "Cluster " + remoteCluster + " does not exist");
}
ClusterData clusterData = clusterDataOpt.get();
PulsarAdmin remoteAdmin;
try {
remoteAdmin = pulsar().getBrokerService()
.getClusterPulsarAdmin(remoteCluster, Optional.of(clusterData));
} catch (Exception e) {
throw new RestException(Status.INTERNAL_SERVER_ERROR,
"Failed to update clusters because failed to create admin client for cluster "
+ remoteCluster + ": " + e.getMessage());
}

// Validate both partition compatibility and auto-creation policy compatibility
return validatePartitionCompatibility(remoteAdmin, remoteCluster)
.thenCompose(__ -> validateAutoTopicCreationCompatibility(remoteAdmin, remoteCluster));
});
}

/**
* Validates partition compatibility between local and remote clusters.
* <ul>
* <li>All partitioned topics (including __change_events) that exist in the local cluster
* must have the same partition count in the remote cluster (if they exist there).</li>
* <li>Non-partitioned topics in the local cluster must not exist as partitioned topics
* in the remote cluster.</li>
* </ul>
*/
private CompletableFuture<Void> validatePartitionCompatibility(PulsarAdmin remoteAdmin, String remoteCluster) {
// Get local partitioned topics
CompletableFuture<List<String>> localPartitionedTopicsFuture =
pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName);

// Get persistent topics only (non-persistent topics don't have persistent state
// and getListOfNonPersistentTopics triggers global namespace ownership validation
// which fails when namespace has no clusters configured yet)
CompletableFuture<List<String>> localAllTopicsFuture =
pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);

return localPartitionedTopicsFuture.thenCombine(localAllTopicsFuture,
(localPartitionedTopics, localAllTopics) -> {
// Find non-partitioned topics (topics that are not in the partitioned list
// and are not partition suffixes of partitioned topics)
Set<String> partitionedTopicSet = new HashSet<>(localPartitionedTopics);
List<String> localNonPartitionedTopics = localAllTopics.stream()
.map(TopicName::get)
.filter(topicName -> !partitionedTopicSet.contains(
topicName.getPartitionedTopicName()))
.map(TopicName::toString)
.distinct()
.collect(Collectors.toList());

return new TopicLists(localPartitionedTopics, localNonPartitionedTopics);
})
.thenCompose(topicLists -> {
List<CompletableFuture<Void>> validations = new ArrayList<>();

// Validate partitioned topics have same partition count
for (String topic : topicLists.partitionedTopics) {
validations.add(compareTopicPartitions(topic, remoteAdmin, remoteCluster));
}

// Validate non-partitioned topics don't exist as partitioned on remote
for (String topic : topicLists.nonPartitionedTopics) {
validations.add(validateNonPartitionedTopicCompatibility(topic, remoteAdmin, remoteCluster));
}

return FutureUtil.waitForAll(validations);
});
}

/**
* Helper class to hold partitioned and non-partitioned topic lists.
*/
private static class TopicLists {
final List<String> partitionedTopics;
final List<String> nonPartitionedTopics;

TopicLists(List<String> partitionedTopics, List<String> nonPartitionedTopics) {
this.partitionedTopics = partitionedTopics;
this.nonPartitionedTopics = nonPartitionedTopics;
}
}

/**
* Validates that a non-partitioned topic on local does not exist as a partitioned topic on remote.
*/
private CompletableFuture<Void> validateNonPartitionedTopicCompatibility(String topic, PulsarAdmin remoteAdmin,
String remoteCluster) {
return remoteAdmin.topics().getPartitionedTopicMetadataAsync(topic)
.thenAccept(remoteMetadata -> {
// If remote has partitions > 0, it's a partitioned topic, which is incompatible
if (remoteMetadata.partitions > 0) {
throw new RestException(Status.CONFLICT,
String.format("Topic type mismatch for topic '%s': local cluster has a "
+ "non-partitioned topic, but remote cluster '%s' has a partitioned "
+ "topic with %d partitions. "
+ "Please ensure topic types are the same before enabling replication.",
topic, remoteCluster, remoteMetadata.partitions));
}
})
.exceptionally(ex -> {
// If topic doesn't exist on remote, that's fine
if (ex.getCause() instanceof PulsarAdminException.NotFoundException
|| ex instanceof PulsarAdminException.NotFoundException) {
return null;
}
throw new CompletionException(ex);
});
}

/**
* Compares the partition count of a topic between local and remote clusters.
* If the topic exists on local but not on remote, validation passes.
* If the topic exists on both clusters, partition counts must match.
*/
private CompletableFuture<Void> compareTopicPartitions(String topic, PulsarAdmin remoteAdmin,
String remoteCluster) {
TopicName topicName = TopicName.get(topic);

// Get local partition metadata
CompletableFuture<Optional<PartitionedTopicMetadata>> localMetadataFuture =
pulsar().getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().getPartitionedTopicMetadataAsync(topicName);

// Get remote partition metadata, return Optional.empty() if topic doesn't exist
CompletableFuture<Optional<PartitionedTopicMetadata>> remoteMetadataFuture =
remoteAdmin.topics().getPartitionedTopicMetadataAsync(topic)
.thenApply(Optional::of)
.exceptionally(ex -> {
// If topic doesn't exist on remote, return empty
if (ex.getCause() instanceof PulsarAdminException.NotFoundException
|| ex instanceof PulsarAdminException.NotFoundException) {
return Optional.empty();
}
throw new CompletionException(ex);
});

return localMetadataFuture.thenCombine(remoteMetadataFuture, (localMetadataOpt, remoteMetadataOpt) -> {
// If topic doesn't exist on remote, validation passes
if (remoteMetadataOpt.isEmpty()) {
return null;
}

int localPartitions = localMetadataOpt.map(m -> m.partitions).orElse(0);
int remotePartitions = remoteMetadataOpt.get().partitions;

if (localPartitions != remotePartitions) {
String topicType = SystemTopicNames.isTopicPoliciesSystemTopic(topic)
? "__change_events system topic" : "topic";
throw new RestException(Status.CONFLICT,
String.format("Partition count mismatch for %s '%s': local cluster has %d partitions, "
+ "remote cluster '%s' has %d partitions. "
+ "Please ensure partition counts are the same before enabling replication.",
topicType, topic, localPartitions, remoteCluster, remotePartitions));
}
return null;
});
}

/**
* Validates that the effective auto-topic creation policies are the same between local and remote clusters.
* The effective policy is computed by: namespace-level policy overrides broker-level if it exists.
*/
private CompletableFuture<Void> validateAutoTopicCreationCompatibility(PulsarAdmin remoteAdmin,
String remoteCluster) {
String namespaceStr = namespaceName.toString();

// Get local broker config
ServiceConfiguration localConfig = pulsar().getConfiguration();
TopicType localBrokerAutoCreationType = localConfig.getAllowAutoTopicCreationType();
int localBrokerDefaultPartitions = localConfig.getDefaultNumPartitions();

// Get local namespace policy
CompletableFuture<AutoTopicCreationOverride> localNsPolicyFuture =
getNamespacePoliciesAsync(namespaceName)
.thenApply(policies -> policies.autoTopicCreationOverride);

// Get remote broker config
CompletableFuture<Map<String, String>> remoteBrokerConfigFuture =
remoteAdmin.brokers().getRuntimeConfigurationsAsync();

// Get remote namespace policy
CompletableFuture<AutoTopicCreationOverride> remoteNsPolicyFuture =
remoteAdmin.namespaces().getAutoTopicCreationAsync(namespaceStr)
.exceptionally(ex -> {
// If namespace doesn't have override, return null
if (ex.getCause() instanceof PulsarAdminException.NotFoundException
|| ex instanceof PulsarAdminException.NotFoundException) {
return null;
}
throw new CompletionException(ex);
});

return CompletableFuture.allOf(localNsPolicyFuture, remoteBrokerConfigFuture, remoteNsPolicyFuture)
.thenAccept(__ -> {
AutoTopicCreationOverride localNsPolicy = localNsPolicyFuture.join();
Map<String, String> remoteBrokerConfig = remoteBrokerConfigFuture.join();
AutoTopicCreationOverride remoteNsPolicy = remoteNsPolicyFuture.join();

// Parse remote broker config
String remoteAutoCreationTypeStr = remoteBrokerConfig.getOrDefault(
"allowAutoTopicCreationType", "non-partitioned");
// Convert to uppercase and replace hyphen with underscore for enum parsing
TopicType remoteBrokerAutoCreationType = TopicType.valueOf(
remoteAutoCreationTypeStr.toUpperCase().replace("-", "_"));
int remoteBrokerDefaultPartitions = Integer.parseInt(
remoteBrokerConfig.getOrDefault("defaultNumPartitions", "1"));

// Compute effective local policy (namespace-level overrides broker-level if exists)
String localEffectiveTopicType;
int localEffectiveDefaultPartitions;
if (localNsPolicy != null) {
localEffectiveTopicType = localNsPolicy.getTopicType();
localEffectiveDefaultPartitions = localNsPolicy.getDefaultNumPartitions() != null
? localNsPolicy.getDefaultNumPartitions() : localBrokerDefaultPartitions;
} else {
localEffectiveTopicType = localBrokerAutoCreationType.toString();
localEffectiveDefaultPartitions = localBrokerDefaultPartitions;
}

// Compute effective remote policy (namespace-level overrides broker-level if exists)
String remoteEffectiveTopicType;
int remoteEffectiveDefaultPartitions;
if (remoteNsPolicy != null) {
remoteEffectiveTopicType = remoteNsPolicy.getTopicType();
remoteEffectiveDefaultPartitions = remoteNsPolicy.getDefaultNumPartitions() != null
? remoteNsPolicy.getDefaultNumPartitions() : remoteBrokerDefaultPartitions;
} else {
remoteEffectiveTopicType = remoteBrokerAutoCreationType.toString();
remoteEffectiveDefaultPartitions = remoteBrokerDefaultPartitions;
}

// Compare effective policies (only topicType and defaultNumPartitions)
List<String> mismatches = new ArrayList<>();
if (!Objects.equals(localEffectiveTopicType, remoteEffectiveTopicType)) {
mismatches.add(String.format("topicType: local=%s, remote=%s",
localEffectiveTopicType, remoteEffectiveTopicType));
}
if (localEffectiveDefaultPartitions != remoteEffectiveDefaultPartitions) {
mismatches.add(String.format("defaultNumPartitions: local=%d, remote=%d",
localEffectiveDefaultPartitions, remoteEffectiveDefaultPartitions));
}

if (!mismatches.isEmpty()) {
throw new RestException(Status.CONFLICT,
String.format("Effective auto-topic creation policy mismatch for namespace '%s' "
+ "between local cluster and remote cluster '%s': %s. "
+ "Please ensure auto-topic creation policies are the same "
+ "before enabling replication.",
namespaceStr, remoteCluster, String.join("; ", mismatches)));
}
});
}

protected CompletableFuture<Void> internalSetNamespaceMessageTTLAsync(Integer messageTTL) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.TTL, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
Expand Down
Loading
Loading