Skip to content

Commit 1bc3789

Browse files
Demogorgon314nikhil-ctds
authored andcommitted
[feat][broker][branch-3.0] PIP-321 Introduce allowed-cluster at the namespace level (apache#22378) (apache#22960)
(cherry-picked from commit apache@36bae69) Co-authored-by: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> (cherry picked from commit 7b2e724)
1 parent 34f88c0 commit 1bc3789

File tree

13 files changed

+546
-69
lines changed

13 files changed

+546
-69
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -321,32 +321,28 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
321321
}
322322

323323
protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName namespaceName) {
324-
return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(policies -> {
325-
if (policies.isPresent()) {
326-
return pulsar()
327-
.getNamespaceService()
328-
.getNamespaceBundleFactory()
329-
.getBundlesAsync(namespaceName)
330-
.thenCompose(bundles -> {
331-
BundlesData bundleData = null;
332-
try {
333-
bundleData = bundles.getBundlesData();
334-
} catch (Exception e) {
335-
log.error("[{}] Failed to get namespace policies {}", clientAppId(), namespaceName, e);
336-
return FutureUtil.failedFuture(new RestException(e));
337-
}
338-
policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles;
339-
if (policies.get().is_allow_auto_update_schema == null) {
340-
// the type changed from boolean to Boolean. return broker value here for keeping compatibility.
341-
policies.get().is_allow_auto_update_schema = pulsar().getConfig()
342-
.isAllowAutoUpdateSchemaEnabled();
324+
CompletableFuture<Policies> result = new CompletableFuture<>();
325+
namespaceResources().getPoliciesAsync(namespaceName)
326+
.thenCombine(getLocalPolicies().getLocalPoliciesAsync(namespaceName), (pl, localPolicies) -> {
327+
if (pl.isPresent()) {
328+
Policies policies = pl.get();
329+
localPolicies.ifPresent(value -> policies.bundles = value.bundles);
330+
if (policies.is_allow_auto_update_schema == null) {
331+
// the type changed from boolean to Boolean. return
332+
// broker value here for keeping compatibility.
333+
policies.is_allow_auto_update_schema = pulsar().getConfig()
334+
.isAllowAutoUpdateSchemaEnabled();
335+
}
336+
result.complete(policies);
337+
} else {
338+
result.completeExceptionally(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
343339
}
344-
return CompletableFuture.completedFuture(policies.get());
340+
return null;
341+
}).exceptionally(ex -> {
342+
result.completeExceptionally(ex.getCause());
343+
return null;
345344
});
346-
} else {
347-
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
348-
}
349-
});
345+
return result;
350346
}
351347

352348
protected BacklogQuota namespaceBacklogQuota(NamespaceName namespace,

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -702,9 +702,21 @@ protected CompletableFuture<Void> internalSetNamespaceReplicationClusters(List<S
702702
"Invalid cluster id: " + clusterId);
703703
}
704704
return validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
705-
.thenCompose(__ ->
706-
validateClusterForTenantAsync(
707-
namespaceName.getTenant(), clusterId));
705+
.thenCompose(__ -> getNamespacePoliciesAsync(this.namespaceName)
706+
.thenCompose(nsPolicies -> {
707+
if (nsPolicies.allowed_clusters.isEmpty()) {
708+
return validateClusterForTenantAsync(
709+
namespaceName.getTenant(), clusterId);
710+
}
711+
if (!nsPolicies.allowed_clusters.contains(clusterId)) {
712+
String msg = String.format("Cluster [%s] is not in the "
713+
+ "list of allowed clusters list for namespace "
714+
+ "[%s]", clusterId, namespaceName.toString());
715+
log.info(msg);
716+
throw new RestException(Status.FORBIDDEN, msg);
717+
}
718+
return CompletableFuture.completedFuture(null);
719+
}));
708720
}).collect(Collectors.toList());
709721
return FutureUtil.waitForAll(futures).thenApply(__ -> replicationClusterSet);
710722
}))
@@ -2695,4 +2707,65 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, BacklogQu
26952707
return null;
26962708
});
26972709
}
2710+
2711+
protected CompletableFuture<Void> internalSetNamespaceAllowedClusters(List<String> clusterIds) {
2712+
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.WRITE)
2713+
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
2714+
// Allowed clusters in the namespace policy should be included in the allowed clusters in the tenant
2715+
// policy.
2716+
.thenCompose(__ -> FutureUtil.waitForAll(clusterIds.stream().map(clusterId ->
2717+
validateClusterForTenantAsync(namespaceName.getTenant(), clusterId))
2718+
.collect(Collectors.toList())))
2719+
// Allowed clusters should include all the existed replication clusters and could not contain global
2720+
// cluster.
2721+
.thenCompose(__ -> {
2722+
checkNotNull(clusterIds, "ClusterIds should not be null");
2723+
if (clusterIds.contains("global")) {
2724+
throw new RestException(Status.PRECONDITION_FAILED,
2725+
"Cannot specify global in the list of allowed clusters");
2726+
}
2727+
return getNamespacePoliciesAsync(this.namespaceName).thenApply(namespacePolicies -> {
2728+
namespacePolicies.replication_clusters.forEach(replicationCluster -> {
2729+
if (!clusterIds.contains(replicationCluster)) {
2730+
throw new RestException(Status.BAD_REQUEST,
2731+
String.format("Allowed clusters do not contain the replication cluster %s. "
2732+
+ "Please remove the replication cluster if the cluster is not allowed "
2733+
+ "for this namespace", replicationCluster));
2734+
}
2735+
});
2736+
return Sets.newHashSet(clusterIds);
2737+
});
2738+
})
2739+
// Verify the allowed clusters are valid and they do not contain the peer clusters.
2740+
.thenCompose(allowedClusters -> clustersAsync()
2741+
.thenCompose(clusters -> {
2742+
List<CompletableFuture<Void>> futures =
2743+
allowedClusters.stream().map(clusterId -> {
2744+
if (!clusters.contains(clusterId)) {
2745+
throw new RestException(Status.FORBIDDEN,
2746+
"Invalid cluster id: " + clusterId);
2747+
}
2748+
return validatePeerClusterConflictAsync(clusterId, allowedClusters);
2749+
}).collect(Collectors.toList());
2750+
return FutureUtil.waitForAll(futures).thenApply(__ -> allowedClusters);
2751+
}))
2752+
// Update allowed clusters into policies.
2753+
.thenCompose(allowedClusterSet -> updatePoliciesAsync(namespaceName, policies -> {
2754+
policies.allowed_clusters = allowedClusterSet;
2755+
return policies;
2756+
}));
2757+
}
2758+
2759+
protected CompletableFuture<Set<String>> internalGetNamespaceAllowedClustersAsync() {
2760+
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.READ)
2761+
.thenAccept(__ -> {
2762+
if (!namespaceName.isGlobal()) {
2763+
throw new RestException(Status.PRECONDITION_FAILED,
2764+
"Cannot get the allowed clusters for a non-global namespace");
2765+
}
2766+
}).thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
2767+
.thenApply(policies -> policies.allowed_clusters);
2768+
}
2769+
2770+
26982771
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2976,5 +2976,51 @@ public void removeNamespaceEntryFilters(@Suspended AsyncResponse asyncResponse,
29762976
});
29772977
}
29782978

2979+
@POST
2980+
@Path("/{tenant}/{namespace}/allowedClusters")
2981+
@ApiOperation(value = "Set the allowed clusters for a namespace.")
2982+
@ApiResponses(value = {
2983+
@ApiResponse(code = 400, message = "The list of allowed clusters should include all replication clusters."),
2984+
@ApiResponse(code = 403, message = "The requester does not have admin permissions."),
2985+
@ApiResponse(code = 404, message = "The specified tenant, cluster, or namespace does not exist."),
2986+
@ApiResponse(code = 409, message = "A peer-cluster cannot be part of an allowed-cluster."),
2987+
@ApiResponse(code = 412, message = "The namespace is not global or the provided cluster IDs are invalid.")})
2988+
public void setNamespaceAllowedClusters(@Suspended AsyncResponse asyncResponse,
2989+
@PathParam("tenant") String tenant,
2990+
@PathParam("namespace") String namespace,
2991+
@ApiParam(value = "List of allowed clusters", required = true)
2992+
List<String> clusterIds) {
2993+
validateNamespaceName(tenant, namespace);
2994+
internalSetNamespaceAllowedClusters(clusterIds)
2995+
.thenAccept(asyncResponse::resume)
2996+
.exceptionally(e -> {
2997+
log.error("[{}] Failed to set namespace allowed clusters on namespace {}",
2998+
clientAppId(), namespace, e);
2999+
resumeAsyncResponseExceptionally(asyncResponse, e);
3000+
return null;
3001+
});
3002+
}
3003+
3004+
@GET
3005+
@Path("/{tenant}/{namespace}/allowedClusters")
3006+
@ApiOperation(value = "Get the allowed clusters for a namespace.",
3007+
response = String.class, responseContainer = "List")
3008+
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
3009+
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
3010+
@ApiResponse(code = 412, message = "Namespace is not global")})
3011+
public void getNamespaceAllowedClusters(@Suspended AsyncResponse asyncResponse,
3012+
@PathParam("tenant") String tenant,
3013+
@PathParam("namespace") String namespace) {
3014+
validateNamespaceName(tenant, namespace);
3015+
internalGetNamespaceAllowedClustersAsync()
3016+
.thenAccept(asyncResponse::resume)
3017+
.exceptionally(e -> {
3018+
log.error("[{}] Failed to get namespace allowed clusters on namespace {}", clientAppId(),
3019+
namespace, e);
3020+
resumeAsyncResponseExceptionally(asyncResponse, e);
3021+
return null;
3022+
});
3023+
}
3024+
29793025
private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
29803026
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1758,52 +1758,78 @@ public CompletableFuture<Void> checkReplication() {
17581758
if (log.isDebugEnabled()) {
17591759
log.debug("[{}] Checking replication status", name);
17601760
}
1761-
17621761
List<String> configuredClusters = topicPolicies.getReplicationClusters().get();
17631762
if (CollectionUtils.isEmpty(configuredClusters)) {
17641763
log.warn("[{}] No replication clusters configured", name);
17651764
return CompletableFuture.completedFuture(null);
17661765
}
17671766

1768-
int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();
1769-
17701767
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
17711768

1772-
// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
1773-
// because pulsar doesn't serve global topic without local repl-cluster configured.
1774-
if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
1775-
log.info("Deleting topic [{}] because local cluster is not part of "
1776-
+ " global namespace repl list {}", topic, configuredClusters);
1777-
return deleteForcefully();
1778-
}
1779-
1780-
removeTerminatedReplicators(replicators);
1781-
List<CompletableFuture<Void>> futures = new ArrayList<>();
1782-
1783-
// Check for missing replicators
1784-
for (String cluster : configuredClusters) {
1785-
if (cluster.equals(localCluster)) {
1786-
continue;
1787-
}
1788-
if (!replicators.containsKey(cluster)) {
1789-
futures.add(startReplicator(cluster));
1790-
}
1791-
}
1792-
1793-
// Check for replicators to be stopped
1794-
replicators.forEach((cluster, replicator) -> {
1795-
// Update message TTL
1796-
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
1797-
if (!cluster.equals(localCluster)) {
1798-
if (!configuredClusters.contains(cluster)) {
1799-
futures.add(removeReplicator(cluster));
1769+
return checkAllowedCluster(localCluster).thenCompose(success -> {
1770+
if (!success) {
1771+
// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
1772+
// because pulsar doesn't serve global topic without local repl-cluster configured.
1773+
return deleteForcefully();
1774+
}
1775+
1776+
int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();
1777+
1778+
removeTerminatedReplicators(replicators);
1779+
List<CompletableFuture<Void>> futures = new ArrayList<>();
1780+
1781+
// The replication clusters at namespace level will get local cluster when creating a namespace.
1782+
// If there are only one cluster in the replication clusters, it means the replication is not enabled.
1783+
// If the cluster 1 and cluster 2 use the same configuration store and the namespace is created in cluster1
1784+
// without enabling geo-replication, then the replication clusters always has cluster1.
1785+
//
1786+
// When a topic under the namespace is load in the cluster2, the `cluster1` may be identified as
1787+
// remote cluster and start geo-replication. This check is to avoid the above case.
1788+
if (!(configuredClusters.size() == 1 && replicators.isEmpty())) {
1789+
// Check for missing replicators
1790+
for (String cluster : configuredClusters) {
1791+
if (cluster.equals(localCluster)) {
1792+
continue;
1793+
}
1794+
if (!replicators.containsKey(cluster)) {
1795+
futures.add(startReplicator(cluster));
1796+
}
18001797
}
1798+
// Check for replicators to be stopped
1799+
replicators.forEach((cluster, replicator) -> {
1800+
// Update message TTL
1801+
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
1802+
if (!cluster.equals(localCluster)) {
1803+
if (!configuredClusters.contains(cluster)) {
1804+
futures.add(removeReplicator(cluster));
1805+
}
1806+
}
1807+
});
18011808
}
1802-
});
18031809

1804-
futures.add(checkShadowReplication());
1810+
futures.add(checkShadowReplication());
18051811

1806-
return FutureUtil.waitForAll(futures);
1812+
return FutureUtil.waitForAll(futures);
1813+
});
1814+
}
1815+
1816+
private CompletableFuture<Boolean> checkAllowedCluster(String localCluster) {
1817+
List<String> replicationClusters = topicPolicies.getReplicationClusters().get();
1818+
return brokerService.pulsar().getPulsarResources().getNamespaceResources()
1819+
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(policiesOptional -> {
1820+
Set<String> allowedClusters = Set.of();
1821+
if (policiesOptional.isPresent()) {
1822+
allowedClusters = policiesOptional.get().allowed_clusters;
1823+
}
1824+
if (TopicName.get(topic).isGlobal() && !replicationClusters.contains(localCluster)
1825+
&& !allowedClusters.contains(localCluster)) {
1826+
log.warn("Local cluster {} is not part of global namespace repl list {} and allowed list {}",
1827+
localCluster, replicationClusters, allowedClusters);
1828+
return CompletableFuture.completedFuture(false);
1829+
} else {
1830+
return CompletableFuture.completedFuture(true);
1831+
}
1832+
});
18071833
}
18081834

18091835
private CompletableFuture<Void> checkShadowReplication() {

0 commit comments

Comments
 (0)