Skip to content

Commit 5bb0783

Browse files
committed
[feat][broker] Support configuring replicator rate limiter per cluster (#36)
* [feat][broker] Support configuring replicator rate limiter per cluster Signed-off-by: Zixuan Liu <nodeces@gmail.com> * Fix test Signed-off-by: Zixuan Liu <nodeces@gmail.com> * Update Namespaces.java --------- Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent 90a8cd6 commit 5bb0783

File tree

19 files changed

+724
-82
lines changed

19 files changed

+724
-82
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2614,14 +2614,16 @@ protected CompletableFuture<Void> internalSetEntryFiltersPerTopicAsync(EntryFilt
26142614
* Base method for setReplicatorDispatchRate v1 and v2.
26152615
* Notion: don't re-use this logic.
26162616
*/
2617-
protected void internalSetReplicatorDispatchRate(AsyncResponse asyncResponse, DispatchRateImpl dispatchRate) {
2617+
protected void internalSetReplicatorDispatchRate(AsyncResponse asyncResponse, String cluster,
2618+
DispatchRateImpl dispatchRate) {
26182619
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
26192620
.thenAccept(__ -> {
26202621
log.info("[{}] Set namespace replicator dispatch-rate {}/{}",
26212622
clientAppId(), namespaceName, dispatchRate);
26222623
}).thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> {
2623-
String clusterName = pulsar().getConfiguration().getClusterName();
2624-
policies.replicatorDispatchRate.put(clusterName, dispatchRate);
2624+
policies.replicatorDispatchRate.put(
2625+
StringUtils.isNotEmpty(cluster) ? cluster : pulsar().getConfiguration().getClusterName(),
2626+
dispatchRate);
26252627
return policies;
26262628
})).thenAccept(__ -> {
26272629
asyncResponse.resume(Response.noContent().build());
@@ -2638,15 +2640,15 @@ protected void internalSetReplicatorDispatchRate(AsyncResponse asyncResponse, Di
26382640
* Base method for getReplicatorDispatchRate v1 and v2.
26392641
* Notion: don't re-use this logic.
26402642
*/
2641-
protected void internalGetReplicatorDispatchRate(AsyncResponse asyncResponse) {
2643+
protected void internalGetReplicatorDispatchRate(AsyncResponse asyncResponse, String cluster) {
26422644
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ)
26432645
.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName))
26442646
.thenApply(policiesOpt -> {
26452647
if (!policiesOpt.isPresent()) {
26462648
throw new RestException(Response.Status.NOT_FOUND, "Namespace policies does not exist");
26472649
}
2648-
String clusterName = pulsar().getConfiguration().getClusterName();
2649-
return policiesOpt.get().replicatorDispatchRate.get(clusterName);
2650+
return policiesOpt.get().replicatorDispatchRate.get(
2651+
StringUtils.isNotEmpty(cluster) ? cluster : pulsar().getConfiguration().getClusterName());
26502652
}).thenAccept(asyncResponse::resume)
26512653
.exceptionally(ex -> {
26522654
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -2659,11 +2661,11 @@ protected void internalGetReplicatorDispatchRate(AsyncResponse asyncResponse) {
26592661
* Base method for removeReplicatorDispatchRate v1 and v2.
26602662
* Notion: don't re-use this logic.
26612663
*/
2662-
protected void internalRemoveReplicatorDispatchRate(AsyncResponse asyncResponse) {
2664+
protected void internalRemoveReplicatorDispatchRate(AsyncResponse asyncResponse, String cluster) {
26632665
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
26642666
.thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> {
2665-
String clusterName = pulsar().getConfiguration().getClusterName();
2666-
policies.replicatorDispatchRate.remove(clusterName);
2667+
policies.replicatorDispatchRate.remove(
2668+
StringUtils.isNotEmpty(cluster) ? cluster : pulsar().getConfiguration().getClusterName());
26672669
return policies;
26682670
})).thenAccept(__ -> {
26692671
asyncResponse.resume(Response.noContent().build());

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3807,25 +3807,64 @@ protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer ma
38073807
});
38083808
}
38093809

3810-
protected CompletableFuture<DispatchRateImpl> internalGetReplicatorDispatchRate(boolean applied, boolean isGlobal) {
3810+
protected CompletableFuture<DispatchRateImpl> internalGetReplicatorDispatchRate(String cluster, boolean applied,
3811+
boolean isGlobal) {
38113812
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
3812-
.thenApply(op -> op.map(TopicPolicies::getReplicatorDispatchRate)
3813-
.orElseGet(() -> {
3814-
if (applied) {
3815-
DispatchRateImpl namespacePolicy = getNamespacePolicies(namespaceName)
3816-
.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
3817-
return namespacePolicy == null ? replicatorDispatchRate() : namespacePolicy;
3813+
.thenApply(op -> op.map(n -> {
3814+
// Prioritize getting the dispatch rate from the replicatorDispatchRateMap if a specific cluster
3815+
// is provided.
3816+
// If the cluster is empty, it means the user has not explicitly set a rate for a particular
3817+
// cluster,
3818+
// so we still attempt to retrieve the value from the replicatorDispatchRateMap using the current
3819+
// cluster.
3820+
// If `applied` is true, we also need to consider the default cluster rate and finally fallback
3821+
// to `getReplicatorDispatchRate()` for backward compatibility.
3822+
if (StringUtils.isNotEmpty(cluster)) {
3823+
DispatchRateImpl dispatchRate = n.getReplicatorDispatchRateMap().get(cluster);
3824+
if (dispatchRate != null) {
3825+
return dispatchRate;
3826+
}
3827+
}
3828+
3829+
if (applied || StringUtils.isEmpty(cluster)) {
3830+
DispatchRateImpl dispatchRate =
3831+
n.getReplicatorDispatchRateMap().get(pulsar().getConfiguration().getClusterName());
3832+
if (dispatchRate != null) {
3833+
return dispatchRate;
3834+
}
3835+
// Backward compatibility.
3836+
return n.getReplicatorDispatchRate();
38183837
}
38193838
return null;
3839+
}).orElseGet(() -> {
3840+
if (!applied) {
3841+
return null;
3842+
}
3843+
Map<String, DispatchRateImpl> replicatorDispatchRate =
3844+
getNamespacePolicies(namespaceName).replicatorDispatchRate;
3845+
DispatchRateImpl namespacePolicy = replicatorDispatchRate.getOrDefault(cluster,
3846+
replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName()));
3847+
return namespacePolicy == null ? replicatorDispatchRate() : namespacePolicy;
38203848
}));
38213849
}
38223850

3823-
protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate,
3851+
protected CompletableFuture<Void> internalSetReplicatorDispatchRate(String cluster, DispatchRateImpl dispatchRate,
38243852
boolean isGlobal) {
38253853
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
38263854
.thenCompose(op -> {
38273855
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
3828-
topicPolicies.setReplicatorDispatchRate(dispatchRate);
3856+
boolean usingDefaultCluster = StringUtils.isEmpty(cluster);
3857+
if (dispatchRate == null) {
3858+
topicPolicies.getReplicatorDispatchRateMap()
3859+
.remove(usingDefaultCluster ? pulsar().getConfiguration().getClusterName() : cluster);
3860+
} else {
3861+
topicPolicies.getReplicatorDispatchRateMap()
3862+
.put(usingDefaultCluster ? pulsar().getConfiguration().getClusterName() : cluster,
3863+
dispatchRate);
3864+
}
3865+
if (usingDefaultCluster) {
3866+
topicPolicies.setReplicatorDispatchRate(dispatchRate);
3867+
}
38293868
topicPolicies.setIsGlobal(isGlobal);
38303869
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
38313870
});

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,10 +1118,11 @@ public void setReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse,
11181118
@PathParam("tenant") String tenant,
11191119
@PathParam("cluster") String cluster,
11201120
@PathParam("namespace") String namespace,
1121+
@QueryParam("cluster") String queryCluster,
11211122
@ApiParam(value = "Replicator dispatch rate for all topics of the specified namespace")
11221123
DispatchRateImpl dispatchRate) {
11231124
validateNamespaceName(tenant, cluster, namespace);
1124-
internalSetReplicatorDispatchRate(asyncResponse, dispatchRate);
1125+
internalSetReplicatorDispatchRate(asyncResponse, cluster, dispatchRate);
11251126
}
11261127

11271128
@GET
@@ -1135,9 +1136,10 @@ public void getReplicatorDispatchRate(
11351136
@Suspended final AsyncResponse asyncResponse,
11361137
@PathParam("tenant") String tenant,
11371138
@PathParam("cluster") String cluster,
1138-
@PathParam("namespace") String namespace) {
1139+
@PathParam("namespace") String namespace,
1140+
@QueryParam("cluster") String queryCluster) {
11391141
validateNamespaceName(tenant, cluster, namespace);
1140-
internalGetReplicatorDispatchRate(asyncResponse);
1142+
internalGetReplicatorDispatchRate(asyncResponse, cluster);
11411143
}
11421144

11431145
@GET

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,9 +1192,10 @@ public void getSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam(
11921192
@ApiResponse(code = 403, message = "Don't have admin permission")})
11931193
public void removeReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse,
11941194
@PathParam("tenant") String tenant,
1195-
@PathParam("namespace") String namespace) {
1195+
@PathParam("namespace") String namespace,
1196+
@QueryParam("cluster") String cluster) {
11961197
validateNamespaceName(tenant, namespace);
1197-
internalRemoveReplicatorDispatchRate(asyncResponse);
1198+
internalRemoveReplicatorDispatchRate(asyncResponse, cluster);
11981199
}
11991200

12001201
@POST
@@ -1206,10 +1207,11 @@ public void removeReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse,
12061207
public void setReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse,
12071208
@PathParam("tenant") String tenant,
12081209
@PathParam("namespace") String namespace,
1210+
@QueryParam("cluster") String cluster,
12091211
@ApiParam(value =
12101212
"Replicator dispatch rate for all topics of the specified namespace") DispatchRateImpl dispatchRate) {
12111213
validateNamespaceName(tenant, namespace);
1212-
internalSetReplicatorDispatchRate(asyncResponse, dispatchRate);
1214+
internalSetReplicatorDispatchRate(asyncResponse, cluster, dispatchRate);
12131215
}
12141216

12151217
@GET
@@ -1221,9 +1223,10 @@ public void setReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse,
12211223
@ApiResponse(code = 404, message = "Namespace does not exist") })
12221224
public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
12231225
@PathParam("tenant") String tenant,
1224-
@PathParam("namespace") String namespace) {
1226+
@PathParam("namespace") String namespace,
1227+
@QueryParam("cluster") String cluster) {
12251228
validateNamespaceName(tenant, namespace);
1226-
internalGetReplicatorDispatchRate(asyncResponse);
1229+
internalGetReplicatorDispatchRate(asyncResponse, cluster);
12271230
}
12281231

12291232
@GET

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2840,11 +2840,12 @@ public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon
28402840
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
28412841
@QueryParam("applied") @DefaultValue("false") boolean applied,
28422842
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
2843-
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
2843+
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
2844+
@QueryParam("cluster") String cluster) {
28442845
validateTopicName(tenant, namespace, encodedTopic);
28452846
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.READ)
28462847
.thenCompose(__ -> preValidation(authoritative))
2847-
.thenCompose(__ -> internalGetReplicatorDispatchRate(applied, isGlobal))
2848+
.thenCompose(__ -> internalGetReplicatorDispatchRate(cluster, applied, isGlobal))
28482849
.thenApply(asyncResponse::resume)
28492850
.exceptionally(ex -> {
28502851
handleTopicPolicyException("getReplicatorDispatchRate", ex, asyncResponse);
@@ -2870,11 +2871,12 @@ public void setReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon
28702871
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
28712872
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
28722873
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
2874+
@QueryParam("cluster") String cluster,
28732875
@ApiParam(value = "Replicator dispatch rate of the topic") DispatchRateImpl dispatchRate) {
28742876
validateTopicName(tenant, namespace, encodedTopic);
28752877
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
28762878
.thenCompose(__ -> preValidation(authoritative))
2877-
.thenCompose(__ -> internalSetReplicatorDispatchRate(dispatchRate, isGlobal))
2879+
.thenCompose(__ -> internalSetReplicatorDispatchRate(cluster, dispatchRate, isGlobal))
28782880
.thenRun(() -> {
28792881
log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}"
28802882
+ ", replicatorDispatchRate={}, isGlobal={}",
@@ -2903,11 +2905,12 @@ public void removeReplicatorDispatchRate(@Suspended final AsyncResponse asyncRes
29032905
@PathParam("topic") @Encoded String encodedTopic,
29042906
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
29052907
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
2906-
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
2908+
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
2909+
@QueryParam("cluster") String cluster) {
29072910
validateTopicName(tenant, namespace, encodedTopic);
29082911
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
29092912
.thenCompose(__ -> preValidation(authoritative))
2910-
.thenCompose(__ -> internalSetReplicatorDispatchRate(null, isGlobal))
2913+
.thenCompose(__ -> internalSetReplicatorDispatchRate(cluster, null, isGlobal))
29112914
.thenRun(() -> {
29122915
log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}",
29132916
clientAppId(), namespaceName, topicName.getLocalName());

0 commit comments

Comments
 (0)