Skip to content

Commit 15935b3

Browse files
authored
[feat][broker] Support configuring replicator rate limiter per cluster (#36)
* [feat][broker] Support configuring replicator rate limiter per cluster Signed-off-by: Zixuan Liu <nodeces@gmail.com> * Fix test Signed-off-by: Zixuan Liu <nodeces@gmail.com> * Update Namespaces.java --------- Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent 63dd4e9 commit 15935b3

File tree

19 files changed

+723
-74
lines changed

19 files changed

+723
-74
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,11 +1532,12 @@ protected SubscribeRate internalGetSubscribeRate() {
15321532
return policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
15331533
}
15341534

1535-
protected void internalRemoveReplicatorDispatchRate() {
1535+
protected void internalRemoveReplicatorDispatchRate(String cluster) {
15361536
validateSuperUserAccess();
15371537
try {
15381538
updatePolicies(namespaceName, policies -> {
1539-
policies.replicatorDispatchRate.remove(pulsar().getConfiguration().getClusterName());
1539+
policies.replicatorDispatchRate.remove(
1540+
StringUtils.isNotEmpty(cluster) ? cluster : pulsar().getConfiguration().getClusterName());
15401541
return policies;
15411542
});
15421543
log.info("[{}] Successfully delete the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
@@ -1548,12 +1549,14 @@ protected void internalRemoveReplicatorDispatchRate() {
15481549
}
15491550
}
15501551

1551-
protected void internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate) {
1552+
protected void internalSetReplicatorDispatchRate(String cluster, DispatchRateImpl dispatchRate) {
15521553
validateSuperUserAccess();
15531554
log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
15541555
try {
15551556
updatePolicies(namespaceName, policies -> {
1556-
policies.replicatorDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
1557+
policies.replicatorDispatchRate.put(
1558+
StringUtils.isNotEmpty(cluster) ? cluster : pulsar().getConfiguration().getClusterName(),
1559+
dispatchRate);
15571560
return policies;
15581561
});
15591562
log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
@@ -1565,11 +1568,12 @@ protected void internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate)
15651568
}
15661569
}
15671570

1568-
protected DispatchRate internalGetReplicatorDispatchRate() {
1571+
protected DispatchRate internalGetReplicatorDispatchRate(String cluster) {
15691572
validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ);
15701573

15711574
Policies policies = getNamespacePolicies(namespaceName);
1572-
return policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
1575+
return policies.replicatorDispatchRate.get(
1576+
StringUtils.isNotEmpty(cluster) ? cluster : pulsar().getConfiguration().getClusterName());
15731577
}
15741578

15751579
protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3597,25 +3597,64 @@ protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer ma
35973597
});
35983598
}
35993599

3600-
protected CompletableFuture<DispatchRateImpl> internalGetReplicatorDispatchRate(boolean applied, boolean isGlobal) {
3600+
protected CompletableFuture<DispatchRateImpl> internalGetReplicatorDispatchRate(String cluster, boolean applied,
3601+
boolean isGlobal) {
36013602
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
3602-
.thenApply(op -> op.map(TopicPolicies::getReplicatorDispatchRate)
3603-
.orElseGet(() -> {
3604-
if (applied) {
3605-
DispatchRateImpl namespacePolicy = getNamespacePolicies(namespaceName)
3606-
.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
3607-
return namespacePolicy == null ? replicatorDispatchRate() : namespacePolicy;
3603+
.thenApply(op -> op.map(n -> {
3604+
// Prioritize getting the dispatch rate from the replicatorDispatchRateMap if a specific cluster
3605+
// is provided.
3606+
// If the cluster is empty, it means the user has not explicitly set a rate for a particular
3607+
// cluster,
3608+
// so we still attempt to retrieve the value from the replicatorDispatchRateMap using the current
3609+
// cluster.
3610+
// If `applied` is true, we also need to consider the default cluster rate and finally fallback
3611+
// to `getReplicatorDispatchRate()` for backward compatibility.
3612+
if (StringUtils.isNotEmpty(cluster)) {
3613+
DispatchRateImpl dispatchRate = n.getReplicatorDispatchRateMap().get(cluster);
3614+
if (dispatchRate != null) {
3615+
return dispatchRate;
3616+
}
3617+
}
3618+
3619+
if (applied || StringUtils.isEmpty(cluster)) {
3620+
DispatchRateImpl dispatchRate =
3621+
n.getReplicatorDispatchRateMap().get(pulsar().getConfiguration().getClusterName());
3622+
if (dispatchRate != null) {
3623+
return dispatchRate;
3624+
}
3625+
// Backward compatibility.
3626+
return n.getReplicatorDispatchRate();
36083627
}
36093628
return null;
3629+
}).orElseGet(() -> {
3630+
if (!applied) {
3631+
return null;
3632+
}
3633+
Map<String, DispatchRateImpl> replicatorDispatchRate =
3634+
getNamespacePolicies(namespaceName).replicatorDispatchRate;
3635+
DispatchRateImpl namespacePolicy = replicatorDispatchRate.getOrDefault(cluster,
3636+
replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName()));
3637+
return namespacePolicy == null ? replicatorDispatchRate() : namespacePolicy;
36103638
}));
36113639
}
36123640

3613-
protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate,
3641+
protected CompletableFuture<Void> internalSetReplicatorDispatchRate(String cluster, DispatchRateImpl dispatchRate,
36143642
boolean isGlobal) {
36153643
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
36163644
.thenCompose(op -> {
36173645
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
3618-
topicPolicies.setReplicatorDispatchRate(dispatchRate);
3646+
boolean usingDefaultCluster = StringUtils.isEmpty(cluster);
3647+
if (dispatchRate == null) {
3648+
topicPolicies.getReplicatorDispatchRateMap()
3649+
.remove(usingDefaultCluster ? pulsar().getConfiguration().getClusterName() : cluster);
3650+
} else {
3651+
topicPolicies.getReplicatorDispatchRateMap()
3652+
.put(usingDefaultCluster ? pulsar().getConfiguration().getClusterName() : cluster,
3653+
dispatchRate);
3654+
}
3655+
if (usingDefaultCluster) {
3656+
topicPolicies.setReplicatorDispatchRate(dispatchRate);
3657+
}
36193658
topicPolicies.setIsGlobal(isGlobal);
36203659
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
36213660
});

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -753,10 +753,11 @@ public void deleteSubscriptionDispatchRate(@PathParam("property") String propert
753753
public void setReplicatorDispatchRate(
754754
@PathParam("tenant") String tenant,
755755
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
756+
@QueryParam("cluster") String queryCluster,
756757
@ApiParam(value = "Replicator dispatch rate for all topics of the specified namespace")
757758
DispatchRateImpl dispatchRate) {
758759
validateNamespaceName(tenant, cluster, namespace);
759-
internalSetReplicatorDispatchRate(dispatchRate);
760+
internalSetReplicatorDispatchRate(queryCluster, dispatchRate);
760761
}
761762

762763
@GET
@@ -768,9 +769,10 @@ public void setReplicatorDispatchRate(
768769
@ApiResponse(code = 404, message = "Namespace does not exist") })
769770
public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant,
770771
@PathParam("cluster") String cluster,
771-
@PathParam("namespace") String namespace) {
772+
@PathParam("namespace") String namespace,
773+
@QueryParam("cluster") String queryCluster) {
772774
validateNamespaceName(tenant, cluster, namespace);
773-
return internalGetReplicatorDispatchRate();
775+
return internalGetReplicatorDispatchRate(queryCluster);
774776
}
775777

776778
@GET

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -717,21 +717,24 @@ public SubscribeRate getSubscribeRate(@PathParam("tenant") String tenant,
717717
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
718718
public void removeReplicatorDispatchRate(
719719
@PathParam("tenant") String tenant,
720-
@PathParam("namespace") String namespace) {
720+
@PathParam("namespace") String namespace,
721+
@QueryParam("cluster") String cluster
722+
) {
721723
validateNamespaceName(tenant, namespace);
722-
internalRemoveReplicatorDispatchRate();
724+
internalRemoveReplicatorDispatchRate(cluster);
723725
}
724726

725727
@POST
726728
@Path("/{tenant}/{namespace}/replicatorDispatchRate")
727729
@ApiOperation(value = "Set replicator dispatch-rate throttling for all topics of the namespace")
728730
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
729731
public void setReplicatorDispatchRate(@PathParam("tenant") String tenant,
730-
@PathParam("namespace") String namespace, @ApiParam(value =
732+
@PathParam("namespace") String namespace,
733+
@QueryParam("cluster") String cluster, @ApiParam(value =
731734
"Replicator dispatch rate for all topics of the specified namespace")
732735
DispatchRateImpl dispatchRate) {
733736
validateNamespaceName(tenant, namespace);
734-
internalSetReplicatorDispatchRate(dispatchRate);
737+
internalSetReplicatorDispatchRate(cluster, dispatchRate);
735738
}
736739

737740
@GET
@@ -742,9 +745,11 @@ public void setReplicatorDispatchRate(@PathParam("tenant") String tenant,
742745
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
743746
@ApiResponse(code = 404, message = "Namespace does not exist") })
744747
public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant,
745-
@PathParam("namespace") String namespace) {
748+
@PathParam("namespace") String namespace,
749+
@QueryParam("cluster") String cluster
750+
) {
746751
validateNamespaceName(tenant, namespace);
747-
return internalGetReplicatorDispatchRate();
752+
return internalGetReplicatorDispatchRate(cluster);
748753
}
749754

750755
@GET

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2531,11 +2531,12 @@ public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon
25312531
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
25322532
@QueryParam("applied") @DefaultValue("false") boolean applied,
25332533
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
2534-
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
2534+
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
2535+
@QueryParam("cluster") String cluster) {
25352536
validateTopicName(tenant, namespace, encodedTopic);
25362537
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.READ)
25372538
.thenCompose(__ -> preValidation(authoritative))
2538-
.thenCompose(__ -> internalGetReplicatorDispatchRate(applied, isGlobal))
2539+
.thenCompose(__ -> internalGetReplicatorDispatchRate(cluster, applied, isGlobal))
25392540
.thenApply(asyncResponse::resume)
25402541
.exceptionally(ex -> {
25412542
handleTopicPolicyException("getReplicatorDispatchRate", ex, asyncResponse);
@@ -2559,11 +2560,12 @@ public void setReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon
25592560
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
25602561
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
25612562
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
2563+
@QueryParam("cluster") String cluster,
25622564
@ApiParam(value = "Replicator dispatch rate of the topic") DispatchRateImpl dispatchRate) {
25632565
validateTopicName(tenant, namespace, encodedTopic);
25642566
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
25652567
.thenCompose(__ -> preValidation(authoritative))
2566-
.thenCompose(__ -> internalSetReplicatorDispatchRate(dispatchRate, isGlobal))
2568+
.thenCompose(__ -> internalSetReplicatorDispatchRate(cluster, dispatchRate, isGlobal))
25672569
.thenRun(() -> {
25682570
log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}"
25692571
+ ", replicatorDispatchRate={}, isGlobal={}",
@@ -2590,11 +2592,12 @@ public void removeReplicatorDispatchRate(@Suspended final AsyncResponse asyncRes
25902592
@PathParam("topic") @Encoded String encodedTopic,
25912593
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
25922594
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
2593-
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
2595+
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
2596+
@QueryParam("cluster") String cluster) {
25942597
validateTopicName(tenant, namespace, encodedTopic);
25952598
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
25962599
.thenCompose(__ -> preValidation(authoritative))
2597-
.thenCompose(__ -> internalSetReplicatorDispatchRate(null, isGlobal))
2600+
.thenCompose(__ -> internalSetReplicatorDispatchRate(cluster, null, isGlobal))
25982601
.thenRun(() -> {
25992602
log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}",
26002603
clientAppId(), namespaceName, topicName.getLocalName());

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Arrays;
2727
import java.util.Collections;
2828
import java.util.EnumSet;
29+
import java.util.HashMap;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.util.Objects;
@@ -183,8 +184,14 @@ public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
183184
return this.topicPolicies.getSchemaCompatibilityStrategy().get();
184185
}
185186

186-
public DispatchRateImpl getReplicatorDispatchRate() {
187-
return this.topicPolicies.getReplicatorDispatchRate().get();
187+
public DispatchRateImpl getReplicatorDispatchRate(String remoteCluster) {
188+
Map<String, DispatchRateImpl> dispatchRateMap = topicPolicies.getReplicatorDispatchRate().get();
189+
DispatchRateImpl dispatchRate = dispatchRateMap.get(remoteCluster);
190+
if (dispatchRate == null) {
191+
// Use the default dispatch rate.
192+
dispatchRate = dispatchRateMap.get(brokerService.pulsar().getConfiguration().getClusterName());
193+
}
194+
return normalize(dispatchRate);
188195
}
189196

190197
public DispatchRateImpl getDispatchRate() {
@@ -225,7 +232,13 @@ protected void updateTopicPolicy(TopicPolicies data) {
225232
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
226233
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
227234
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
228-
topicPolicies.getReplicatorDispatchRate().updateTopicValue(normalize(data.getReplicatorDispatchRate()));
235+
// Backward compatibility.
236+
// Default use the current cluster name as key, {@link TopicPolicies#getReplicatorDispatchRate()} is value.
237+
HashMap<String, DispatchRateImpl> replicatorDispatchRateMap =
238+
new HashMap<>(data.getReplicatorDispatchRateMap());
239+
replicatorDispatchRateMap.putIfAbsent(brokerService.pulsar().getConfiguration().getClusterName(),
240+
data.getReplicatorDispatchRate());
241+
topicPolicies.getReplicatorDispatchRate().updateTopicValue(replicatorDispatchRateMap);
229242
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
230243
topicPolicies.getSubscribeRate().updateTopicValue(SubscribeRate.normalize(data.getSubscribeRate()));
231244
topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate()));
@@ -272,8 +285,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
272285
.map(DelayedDeliveryPolicies::getTickTime).orElse(null));
273286
topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
274287
subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
275-
updateNamespaceReplicatorDispatchRate(namespacePolicies,
276-
brokerService.getPulsar().getConfig().getClusterName());
288+
topicPolicies.getReplicatorDispatchRate().updateNamespaceValue(namespacePolicies.replicatorDispatchRate);
277289
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
278290
type -> this.topicPolicies.getBackLogQuotaMap().get(type)
279291
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
@@ -303,11 +315,6 @@ private void updateNamespaceSubscriptionDispatchRate(Policies namespacePolicies,
303315
.updateNamespaceValue(normalize(namespacePolicies.subscriptionDispatchRate.get(cluster)));
304316
}
305317

306-
private void updateNamespaceReplicatorDispatchRate(Policies namespacePolicies, String cluster) {
307-
topicPolicies.getReplicatorDispatchRate()
308-
.updateNamespaceValue(normalize(namespacePolicies.replicatorDispatchRate.get(cluster)));
309-
}
310-
311318
private DispatchRateImpl normalize(DispatchRateImpl dispatchRate) {
312319
if (dispatchRate != null
313320
&& (dispatchRate.getDispatchThrottlingRateInMsg() > 0
@@ -411,12 +418,14 @@ private DispatchRateImpl subscriptionDispatchRateInBroker(ServiceConfiguration c
411418
.build();
412419
}
413420

414-
private DispatchRateImpl replicatorDispatchRateInBroker(ServiceConfiguration config) {
415-
return DispatchRateImpl.builder()
416-
.dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerReplicatorInMsg())
417-
.dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerReplicatorInByte())
418-
.ratePeriodInSecond(1)
419-
.build();
421+
private Map<String, DispatchRateImpl> replicatorDispatchRateInBroker(ServiceConfiguration config) {
422+
Map<String, DispatchRateImpl> dispatchRate = new HashMap<>();
423+
dispatchRate.put(brokerService.pulsar().getConfiguration().getClusterName(), DispatchRateImpl.builder()
424+
.dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerReplicatorInMsg())
425+
.dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerReplicatorInByte())
426+
.ratePeriodInSecond(1)
427+
.build());
428+
return dispatchRate;
420429
}
421430

422431
private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {

0 commit comments

Comments
 (0)