Skip to content

Commit eab9263

Browse files
[Coordinator throttling] Scheduling Policies for Admission Control based on worker load (#25689)
**Admission control scheduling policy** **Logic** Gather worker overload data from the added end point in PR - [https://github.com/prestodb/presto/pull/25687](https://l.facebook.com/l.php?u=https%3A%2F%2Fgithub.com%2Fprestodb%2Fpresto%2Fpull%2F25687&h=AT20QdO6Ld9h4KN4cMS_xFUtKTtNuKLc-glQIqSWbPgMh0KxW_3-UYhiBxO2gl-I8P_i8wG5gixSBWcGmglCWFShRKtOUlWnYZ5RqgBNHhmkGYrK2XJfKIjqXNj4Ltd4oWghm0eSCgnKL0UGvLr9mXLyJyA) Based on configured policies (cnt of overloaded workers or pct of overloaded workers) and cluster overload, queue the queries **Background** RFC PR: [https://github.com/prestodb/rfcs/pull/42](https://l.facebook.com/l.php?u=https%3A%2F%2Fgithub.com%2Fprestodb%2Frfcs%2Fpull%2F42&h=AT20QdO6Ld9h4KN4cMS_xFUtKTtNuKLc-glQIqSWbPgMh0KxW_3-UYhiBxO2gl-I8P_i8wG5gixSBWcGmglCWFShRKtOUlWnYZ5RqgBNHhmkGYrK2XJfKIjqXNj4Ltd4oWghm0eSCgnKL0UGvLr9mXLyJyA) **Metrics on queuing due to this feature:** Added following metrics - ClusterOverloadDuration - ClusterOverloadCount **Feature flag:** Right now feature is disabled. We can use coordinator configs to enable / add thresholds ## Summary by Sourcery Add cluster overload-based admission control scheduling policy and supporting infrastructure to throttle queries based on worker load. New Features: - Introduce ClusterResourceChecker for periodic cluster overload detection and query throttling based on worker load metrics - Implement CpuMemoryOverloadPolicy with count- and percentage-based overload thresholds and expose overload detection count and duration metrics - Add AdmissionControlBypassConfig to allow certain queries (e.g., DDL) to bypass admission control Enhancements: - Refactor DiscoveryNodeManager and SPI to use RemoteNodeStats for asynchronous fetching of node load metrics - Integrate cluster overload checks into InternalResourceGroup scheduling logic and update eligibility propagation on overload state changes - Introduce ClusterOverloadPolicyModule and DI bindings for policy selection and checker Tests: - Add unit tests for ClusterResourceChecker, CpuMemoryOverloadPolicy, and AdmissionControlBypassConfig - Update existing resource group and node manager tests to support clustering resource checker mocks Chores: - Add new configuration properties for cluster-overload throttling and internal-communication stats polling intervals
1 parent 29b1657 commit eab9263

28 files changed

+1922
-50
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.execution;
15+
16+
import com.facebook.airlift.configuration.Config;
17+
18+
public class ClusterOverloadConfig
19+
{
20+
public static final String OVERLOAD_POLICY_CNT_BASED = "overload_worker_cnt_based_throttling";
21+
public static final String OVERLOAD_POLICY_PCT_BASED = "overload_worker_pct_based_throttling";
22+
private boolean clusterOverloadThrottlingEnabled;
23+
private double allowedOverloadWorkersPct = 0.01;
24+
private int allowedOverloadWorkersCnt;
25+
private String overloadPolicyType = OVERLOAD_POLICY_CNT_BASED;
26+
private int overloadCheckCacheTtlInSecs = 5;
27+
28+
/**
29+
* Gets the time-to-live for the cached cluster overload state.
30+
* This determines how frequently the system will re-evaluate whether the cluster is overloaded.
31+
*
32+
* @return the cache TTL duration
33+
*/
34+
public int getOverloadCheckCacheTtlInSecs()
35+
{
36+
return overloadCheckCacheTtlInSecs;
37+
}
38+
39+
/**
40+
* Gets the time-to-live for the cached cluster overload state.
41+
* This determines how frequently the system will re-evaluate whether the cluster is overloaded.
42+
*
43+
* @return the cache TTL duration
44+
*/
45+
public int getOverloadCheckCacheTtlMillis()
46+
{
47+
return overloadCheckCacheTtlInSecs * 1000;
48+
}
49+
50+
/**
51+
* Sets the time-to-live for the cached cluster overload state.
52+
*
53+
* @param overloadCheckCacheTtlInSecs the cache TTL duration
54+
* @return this for chaining
55+
*/
56+
@Config("cluster.overload-check-cache-ttl-secs")
57+
public ClusterOverloadConfig setOverloadCheckCacheTtlInSecs(int overloadCheckCacheTtlInSecs)
58+
{
59+
this.overloadCheckCacheTtlInSecs = overloadCheckCacheTtlInSecs;
60+
return this;
61+
}
62+
63+
@Config("cluster-overload.enable-throttling")
64+
public ClusterOverloadConfig setClusterOverloadThrottlingEnabled(boolean clusterOverloadThrottlingEnabled)
65+
{
66+
this.clusterOverloadThrottlingEnabled = clusterOverloadThrottlingEnabled;
67+
return this;
68+
}
69+
70+
public boolean isClusterOverloadThrottlingEnabled()
71+
{
72+
return this.clusterOverloadThrottlingEnabled;
73+
}
74+
75+
@Config("cluster-overload.allowed-overload-workers-pct")
76+
public ClusterOverloadConfig setAllowedOverloadWorkersPct(Double allowedOverloadWorkersPct)
77+
{
78+
this.allowedOverloadWorkersPct = allowedOverloadWorkersPct;
79+
return this;
80+
}
81+
82+
public double getAllowedOverloadWorkersPct()
83+
{
84+
return this.allowedOverloadWorkersPct;
85+
}
86+
87+
@Config("cluster-overload.allowed-overload-workers-cnt")
88+
public ClusterOverloadConfig setAllowedOverloadWorkersCnt(int allowedOverloadWorkersCnt)
89+
{
90+
this.allowedOverloadWorkersCnt = allowedOverloadWorkersCnt;
91+
return this;
92+
}
93+
94+
public double getAllowedOverloadWorkersCnt()
95+
{
96+
return this.allowedOverloadWorkersCnt;
97+
}
98+
99+
@Config("cluster-overload.overload-policy-type")
100+
public ClusterOverloadConfig setOverloadPolicyType(String overloadPolicyType)
101+
{
102+
// validate
103+
this.overloadPolicyType = overloadPolicyType;
104+
return this;
105+
}
106+
107+
public String getOverloadPolicyType()
108+
{
109+
return this.overloadPolicyType;
110+
}
111+
}

presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.facebook.airlift.units.Duration;
1919
import com.facebook.presto.execution.ManagedQueryExecution;
2020
import com.facebook.presto.execution.resourceGroups.WeightedFairQueue.Usage;
21+
import com.facebook.presto.execution.scheduler.clusterOverload.ClusterResourceChecker;
2122
import com.facebook.presto.metadata.InternalNodeManager;
2223
import com.facebook.presto.server.QueryStateInfo;
2324
import com.facebook.presto.server.ResourceGroupInfo;
@@ -96,6 +97,7 @@ public class InternalResourceGroup
9697
private final Function<ResourceGroupId, Optional<ResourceGroupRuntimeInfo>> additionalRuntimeInfo;
9798
private final Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate;
9899
private final InternalNodeManager nodeManager;
100+
private final ClusterResourceChecker clusterResourceChecker;
99101

100102
// Configuration
101103
// =============
@@ -166,12 +168,14 @@ protected InternalResourceGroup(
166168
boolean staticResourceGroup,
167169
Function<ResourceGroupId, Optional<ResourceGroupRuntimeInfo>> additionalRuntimeInfo,
168170
Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate,
169-
InternalNodeManager nodeManager)
171+
InternalNodeManager nodeManager,
172+
ClusterResourceChecker clusterResourceChecker)
170173
{
171174
this.parent = requireNonNull(parent, "parent is null");
172175
this.jmxExportListener = requireNonNull(jmxExportListener, "jmxExportListener is null");
173176
this.executor = requireNonNull(executor, "executor is null");
174177
this.nodeManager = requireNonNull(nodeManager, "node manager is null");
178+
this.clusterResourceChecker = requireNonNull(clusterResourceChecker, "clusterResourceChecker is null");
175179
requireNonNull(name, "name is null");
176180
if (parent.isPresent()) {
177181
id = new ResourceGroupId(parent.get().id, name);
@@ -671,7 +675,8 @@ public InternalResourceGroup getOrCreateSubGroup(String name, boolean staticSegm
671675
staticResourceGroup && staticSegment,
672676
additionalRuntimeInfo,
673677
shouldWaitForResourceManagerUpdate,
674-
nodeManager);
678+
nodeManager,
679+
clusterResourceChecker);
675680
// Sub group must use query priority to ensure ordering
676681
if (schedulingPolicy == QUERY_PRIORITY) {
677682
subGroup.setSchedulingPolicy(QUERY_PRIORITY);
@@ -770,7 +775,7 @@ private void enqueueQuery(ManagedQueryExecution query)
770775
}
771776

772777
// This method must be called whenever the group's eligibility to run more queries may have changed.
773-
private void updateEligibility()
778+
protected void updateEligibility()
774779
{
775780
checkState(Thread.holdsLock(root), "Must hold lock to update eligibility");
776781
synchronized (root) {
@@ -1019,6 +1024,11 @@ private boolean canRunMore()
10191024
{
10201025
checkState(Thread.holdsLock(root), "Must hold lock");
10211026
synchronized (root) {
1027+
// Check if more queries can be run on the cluster based on cluster overload
1028+
if (clusterResourceChecker.isClusterCurrentlyOverloaded()) {
1029+
return false;
1030+
}
1031+
10221032
if (cpuUsageMillis >= hardCpuLimitMillis) {
10231033
return false;
10241034
}
@@ -1135,7 +1145,8 @@ public RootInternalResourceGroup(
11351145
Executor executor,
11361146
Function<ResourceGroupId, Optional<ResourceGroupRuntimeInfo>> additionalRuntimeInfo,
11371147
Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate,
1138-
InternalNodeManager nodeManager)
1148+
InternalNodeManager nodeManager,
1149+
ClusterResourceChecker clusterResourceChecker)
11391150
{
11401151
super(Optional.empty(),
11411152
name,
@@ -1144,7 +1155,16 @@ public RootInternalResourceGroup(
11441155
true,
11451156
additionalRuntimeInfo,
11461157
shouldWaitForResourceManagerUpdate,
1147-
nodeManager);
1158+
nodeManager,
1159+
clusterResourceChecker);
1160+
}
1161+
1162+
public synchronized void updateEligibilityRecursively(InternalResourceGroup group)
1163+
{
1164+
group.updateEligibility();
1165+
for (InternalResourceGroup subGroup : group.subGroups()) {
1166+
updateEligibilityRecursively(subGroup);
1167+
}
11481168
}
11491169

11501170
public synchronized void processQueuedQueries()

presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.facebook.presto.execution.ManagedQueryExecution;
2020
import com.facebook.presto.execution.QueryManagerConfig;
2121
import com.facebook.presto.execution.resourceGroups.InternalResourceGroup.RootInternalResourceGroup;
22+
import com.facebook.presto.execution.scheduler.clusterOverload.ClusterOverloadStateListener;
23+
import com.facebook.presto.execution.scheduler.clusterOverload.ClusterResourceChecker;
2224
import com.facebook.presto.metadata.InternalNodeManager;
2325
import com.facebook.presto.resourcemanager.ResourceGroupService;
2426
import com.facebook.presto.server.ResourceGroupInfo;
@@ -81,7 +83,7 @@
8183

8284
@ThreadSafe
8385
public final class InternalResourceGroupManager<C>
84-
implements ResourceGroupManager<C>
86+
implements ResourceGroupManager<C>, ClusterOverloadStateListener
8587
{
8688
private static final Logger log = Logger.get(InternalResourceGroupManager.class);
8789
private static final File RESOURCE_GROUPS_CONFIGURATION = new File("etc/resource-groups.properties");
@@ -112,6 +114,7 @@ public final class InternalResourceGroupManager<C>
112114
private final QueryManagerConfig queryManagerConfig;
113115
private final InternalNodeManager nodeManager;
114116
private AtomicBoolean isConfigurationManagerLoaded;
117+
private final ClusterResourceChecker clusterResourceChecker;
115118

116119
@Inject
117120
public InternalResourceGroupManager(
@@ -121,7 +124,8 @@ public InternalResourceGroupManager(
121124
MBeanExporter exporter,
122125
ResourceGroupService resourceGroupService,
123126
ServerConfig serverConfig,
124-
InternalNodeManager nodeManager)
127+
InternalNodeManager nodeManager,
128+
ClusterResourceChecker clusterResourceChecker)
125129
{
126130
this.queryManagerConfig = requireNonNull(queryManagerConfig, "queryManagerConfig is null");
127131
this.exporter = requireNonNull(exporter, "exporter is null");
@@ -137,6 +141,7 @@ public InternalResourceGroupManager(
137141
this.resourceGroupRuntimeExecutor = new PeriodicTaskExecutor(resourceGroupRuntimeInfoRefreshInterval.toMillis(), refreshExecutor, this::refreshResourceGroupRuntimeInfo);
138142
configurationManagerFactories.putIfAbsent(LegacyResourceGroupConfigurationManager.NAME, new LegacyResourceGroupConfigurationManager.Factory());
139143
this.isConfigurationManagerLoaded = new AtomicBoolean(false);
144+
this.clusterResourceChecker = clusterResourceChecker;
140145
}
141146

142147
@Override
@@ -254,6 +259,8 @@ public ResourceGroupConfigurationManager<C> getConfigurationManager()
254259
@PreDestroy
255260
public void destroy()
256261
{
262+
// Unregister from cluster overload state changes
263+
clusterResourceChecker.removeListener(this);
257264
refreshExecutor.shutdownNow();
258265
resourceGroupRuntimeExecutor.stop();
259266
}
@@ -275,6 +282,9 @@ public void start()
275282
if (isResourceManagerEnabled) {
276283
resourceGroupRuntimeExecutor.start();
277284
}
285+
286+
// Register as listener for cluster overload state changes
287+
clusterResourceChecker.addListener(this);
278288
}
279289
}
280290

@@ -396,7 +406,7 @@ private synchronized void createGroupIfNecessary(SelectionContext<C> context, Ex
396406
else {
397407
RootInternalResourceGroup root;
398408
if (!isResourceManagerEnabled) {
399-
root = new RootInternalResourceGroup(id.getSegments().get(0), this::exportGroup, executor, ignored -> Optional.empty(), rg -> false, nodeManager);
409+
root = new RootInternalResourceGroup(id.getSegments().get(0), this::exportGroup, executor, ignored -> Optional.empty(), rg -> false, nodeManager, clusterResourceChecker);
400410
}
401411
else {
402412
root = new RootInternalResourceGroup(
@@ -409,7 +419,8 @@ private synchronized void createGroupIfNecessary(SelectionContext<C> context, Ex
409419
resourceGroupRuntimeInfosSnapshot::get,
410420
lastUpdatedResourceGroupRuntimeInfo::get,
411421
concurrencyThreshold),
412-
nodeManager);
422+
nodeManager,
423+
clusterResourceChecker);
413424
}
414425
group = root;
415426
rootGroups.add(root);
@@ -463,6 +474,24 @@ public int getQueriesQueuedOnInternal()
463474
return queriesQueuedInternal;
464475
}
465476

477+
@Override
478+
public void onClusterEnteredOverloadedState()
479+
{
480+
// Resource groups will handle overload state through their existing admission control logic
481+
// No additional action needed here as queries will be queued automatically
482+
}
483+
484+
@Override
485+
public void onClusterExitedOverloadedState()
486+
{
487+
log.info("Cluster exited overloaded state, updating eligibility for all resource groups");
488+
for (RootInternalResourceGroup rootGroup : rootGroups) {
489+
synchronized (rootGroup) {
490+
rootGroup.updateEligibilityRecursively(rootGroup);
491+
}
492+
}
493+
}
494+
466495
@Managed
467496
public long getLastSchedulingCycleRuntimeDelayMs()
468497
{
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package com.facebook.presto.execution.scheduler.clusterOverload;
16+
17+
import com.facebook.presto.metadata.InternalNodeManager;
18+
19+
/**
20+
* Interface for policies that determine if cluster is overloaded.
21+
* Implementations can check various metrics from NodeStats to determine
22+
* if a worker is overloaded and queries should be throttled.
23+
*/
24+
public interface ClusterOverloadPolicy
25+
{
26+
/**
27+
* Checks if cluster is overloaded.
28+
*
29+
* @param nodeManager The node manager to get node information
30+
* @return true if cluster is overloaded, false otherwise
31+
*/
32+
boolean isClusterOverloaded(InternalNodeManager nodeManager);
33+
34+
/**
35+
* Gets the name of the policy.
36+
*
37+
* @return The name of the policy
38+
*/
39+
String getName();
40+
}

0 commit comments

Comments
 (0)