Skip to content

Commit e4471e9

Browse files
feat: Coordinator memory - validate only coordinator heap; use worker-advertised capacity for limits
1 parent 1e78f92 commit e4471e9

File tree

4 files changed

+145
-13
lines changed

4 files changed

+145
-13
lines changed

presto-main/src/main/java/com/facebook/presto/memory/ClusterMemoryManager.java

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,28 @@ public synchronized boolean memoryPoolExists(MemoryPoolId poolId)
238238
return pools.containsKey(poolId);
239239
}
240240

241+
/**
242+
* Computes effective query memory limits from config and worker-advertised capacity.
243+
* Used by {@link #process(Iterable)} and by tests to verify worker-advertised capping behavior.
244+
*
245+
* @return long[0] = effective max query user memory in bytes, long[1] = effective max query total memory in bytes
246+
*/
247+
static long[] computeEffectiveQueryMemoryLimits(
248+
long maxQueryMemoryInBytes,
249+
long maxQueryTotalMemoryInBytes,
250+
boolean useWorkerAdvertisedMemoryForLimit,
251+
boolean isWorkScheduledOnCoordinator,
252+
long workerTotalCapacityBytes)
253+
{
254+
long effectiveMaxQueryMemoryInBytes = maxQueryMemoryInBytes;
255+
long effectiveMaxQueryTotalMemoryInBytes = maxQueryTotalMemoryInBytes;
256+
if (useWorkerAdvertisedMemoryForLimit && !isWorkScheduledOnCoordinator && workerTotalCapacityBytes > 0) {
257+
effectiveMaxQueryTotalMemoryInBytes = min(maxQueryTotalMemoryInBytes, workerTotalCapacityBytes);
258+
effectiveMaxQueryMemoryInBytes = min(maxQueryMemoryInBytes, effectiveMaxQueryTotalMemoryInBytes);
259+
}
260+
return new long[] {effectiveMaxQueryMemoryInBytes, effectiveMaxQueryTotalMemoryInBytes};
261+
}
262+
241263
public synchronized void process(Iterable<QueryExecution> runningQueries)
242264
{
243265
if (!enabled) {
@@ -251,16 +273,19 @@ public synchronized void process(Iterable<QueryExecution> runningQueries)
251273

252274
// When coordinator does not schedule work, cap query limits by worker-advertised capacity
253275
// so the coordinator uses min(configured limit, sum of worker general pool capacity).
254-
long effectiveMaxQueryMemoryInBytes = maxQueryMemoryInBytes;
255-
long effectiveMaxQueryTotalMemoryInBytes = maxQueryTotalMemoryInBytes;
256-
if (useWorkerAdvertisedMemoryForLimit && !isWorkScheduledOnCoordinator) {
257-
ClusterMemoryPool generalPool = pools.get(GENERAL_POOL);
258-
long workerTotalCapacity = generalPool != null ? generalPool.getTotalDistributedBytes() : 0;
259-
if (workerTotalCapacity > 0) {
260-
effectiveMaxQueryTotalMemoryInBytes = min(maxQueryTotalMemoryInBytes, workerTotalCapacity);
261-
effectiveMaxQueryMemoryInBytes = min(maxQueryMemoryInBytes, effectiveMaxQueryTotalMemoryInBytes);
262-
}
263-
}
276+
long workerTotalCapacity = 0;
277+
ClusterMemoryPool generalPool = pools.get(GENERAL_POOL);
278+
if (generalPool != null) {
279+
workerTotalCapacity = generalPool.getTotalDistributedBytes();
280+
}
281+
long[] effective = computeEffectiveQueryMemoryLimits(
282+
maxQueryMemoryInBytes,
283+
maxQueryTotalMemoryInBytes,
284+
useWorkerAdvertisedMemoryForLimit,
285+
isWorkScheduledOnCoordinator,
286+
workerTotalCapacity);
287+
long effectiveMaxQueryMemoryInBytes = effective[0];
288+
long effectiveMaxQueryTotalMemoryInBytes = effective[1];
264289

265290
boolean queryKilled = false;
266291
long totalUserMemoryBytes = 0L;

presto-main/src/main/java/com/facebook/presto/server/LocalMemoryManagerProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
1717
import com.facebook.presto.memory.LocalMemoryManager;
1818
import com.facebook.presto.memory.NodeMemoryConfig;
19-
import jakarta.inject.Inject;
20-
import jakarta.inject.Provider;
19+
import com.google.inject.Inject;
20+
import javax.inject.Provider;
2121

2222
/**
2323
* Provides {@link LocalMemoryManager} with coordinator-only validation when this process is a

presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
560560
configBinder(binder).bindConfig(MemoryManagerConfig.class);
561561
configBinder(binder).bindConfig(NodeMemoryConfig.class);
562562
configBinder(binder).bindConfig(ReservedSystemMemoryConfig.class);
563-
binder.bind(LocalMemoryManager.class).toProvider(LocalMemoryManagerProvider.class).in(Scopes.SINGLETON);
563+
binder.bind(LocalMemoryManager.class).toProvider(Key.get(LocalMemoryManagerProvider.class)).in(Scopes.SINGLETON);
564564
binder.bind(LocalMemoryManagerExporter.class).in(Scopes.SINGLETON);
565565
binder.bind(EmbedVersion.class).in(Scopes.SINGLETON);
566566
newExporter(binder).export(TaskManager.class).withGeneratedName();
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.memory;
15+
16+
import org.testng.annotations.Test;
17+
18+
import static org.testng.Assert.assertEquals;
19+
20+
/**
21+
* Behavioral tests for worker-advertised capacity capping of query limits in ClusterMemoryManager.
22+
* Verifies that effective user/total limits are computed correctly from config and worker capacity.
23+
*/
24+
public class TestClusterMemoryManager
25+
{
26+
private static final long CONFIG_MAX_USER_BYTES = 20L << 30; // 20 GB
27+
private static final long CONFIG_MAX_TOTAL_BYTES = 40L << 30; // 40 GB
28+
29+
@Test
30+
public void testEffectiveLimitsCappedByWorkerCapacityWhenEnabledAndNotSchedulingOnCoordinator()
31+
{
32+
// Worker capacity smaller than configured limits -> effective limits are capped by worker capacity
33+
long workerCapacityBytes = 10L << 30; // 10 GB
34+
long[] effective = ClusterMemoryManager.computeEffectiveQueryMemoryLimits(
35+
CONFIG_MAX_USER_BYTES,
36+
CONFIG_MAX_TOTAL_BYTES,
37+
true, // useWorkerAdvertisedMemoryForLimit
38+
false, // isWorkScheduledOnCoordinator (coordinator does not run tasks)
39+
workerCapacityBytes);
40+
41+
assertEquals(effective[0], workerCapacityBytes, "effective max user memory should be capped by worker capacity");
42+
assertEquals(effective[1], workerCapacityBytes, "effective max total memory should be capped by worker capacity");
43+
}
44+
45+
@Test
46+
public void testEffectiveLimitsUseConfigWhenWorkerCapacityLargerThanConfig()
47+
{
48+
// Worker capacity larger than configured limits -> configured limits still apply
49+
long workerCapacityBytes = 100L << 30; // 100 GB
50+
long[] effective = ClusterMemoryManager.computeEffectiveQueryMemoryLimits(
51+
CONFIG_MAX_USER_BYTES,
52+
CONFIG_MAX_TOTAL_BYTES,
53+
true,
54+
false,
55+
workerCapacityBytes);
56+
57+
assertEquals(effective[0], CONFIG_MAX_USER_BYTES, "effective max user memory should remain config limit");
58+
assertEquals(effective[1], CONFIG_MAX_TOTAL_BYTES, "effective max total memory should remain config limit");
59+
}
60+
61+
@Test
62+
public void testEffectiveLimitsUnchangedWhenUseWorkerAdvertisedDisabled()
63+
{
64+
// useWorkerAdvertisedMemoryForLimit = false -> config limits used regardless of worker capacity
65+
long workerCapacityBytes = 10L << 30; // 10 GB (smaller than config)
66+
long[] effective = ClusterMemoryManager.computeEffectiveQueryMemoryLimits(
67+
CONFIG_MAX_USER_BYTES,
68+
CONFIG_MAX_TOTAL_BYTES,
69+
false, // useWorkerAdvertisedMemoryForLimit
70+
false,
71+
workerCapacityBytes);
72+
73+
assertEquals(effective[0], CONFIG_MAX_USER_BYTES, "effective max user memory should be config when flag disabled");
74+
assertEquals(effective[1], CONFIG_MAX_TOTAL_BYTES, "effective max total memory should be config when flag disabled");
75+
}
76+
77+
@Test
78+
public void testEffectiveLimitsUnchangedWhenCoordinatorSchedulesWork()
79+
{
80+
// isWorkScheduledOnCoordinator = true -> config limits used (coordinator is a worker)
81+
long workerCapacityBytes = 10L << 30; // 10 GB
82+
long[] effective = ClusterMemoryManager.computeEffectiveQueryMemoryLimits(
83+
CONFIG_MAX_USER_BYTES,
84+
CONFIG_MAX_TOTAL_BYTES,
85+
true,
86+
true, // isWorkScheduledOnCoordinator
87+
workerCapacityBytes);
88+
89+
assertEquals(effective[0], CONFIG_MAX_USER_BYTES, "effective max user memory should be config when coordinator schedules work");
90+
assertEquals(effective[1], CONFIG_MAX_TOTAL_BYTES, "effective max total memory should be config when coordinator schedules work");
91+
}
92+
93+
@Test
94+
public void testEffectiveLimitsUseConfigWhenWorkerCapacityZero()
95+
{
96+
// No workers reported yet (workerTotalCapacity = 0) -> config limits used
97+
long[] effective = ClusterMemoryManager.computeEffectiveQueryMemoryLimits(
98+
CONFIG_MAX_USER_BYTES,
99+
CONFIG_MAX_TOTAL_BYTES,
100+
true,
101+
false,
102+
0);
103+
104+
assertEquals(effective[0], CONFIG_MAX_USER_BYTES, "effective max user memory should be config when no worker capacity");
105+
assertEquals(effective[1], CONFIG_MAX_TOTAL_BYTES, "effective max total memory should be config when no worker capacity");
106+
}
107+
}

0 commit comments

Comments
 (0)