Skip to content

Commit d058461

Browse files
Coordinator memory: validate only coordinator heap; use worker-advertised capacity for limits
- LocalMemoryManager: coordinator-only validation when include-coordinator=false (only heap headroom validated; no reserved pool) - LocalMemoryManagerProvider: wire coordinator-only path in ServerMainModule - MemoryManagerConfig: query.use-worker-advertised-memory-for-limit (default true) - ClusterMemoryManager: cap query limits by sum of worker general pool capacity - Tests and admin docs for new config and validation
1 parent 8bfb3f3 commit d058461

File tree

9 files changed

+197
-7
lines changed

9 files changed

+197
-7
lines changed

presto-docs/src/main/sphinx/admin/properties.rst

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,20 @@ system memory allocated by a query across all workers hits this limit it will be
274274
killed. The value of ``query.max-total-memory`` must be greater than
275275
``query.max-memory``.
276276

277+
``query.use-worker-advertised-memory-for-limit``
278+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
279+
280+
* **Type:** ``boolean``
281+
* **Default value:** ``true``
282+
283+
When ``true`` and the coordinator does not schedule work
284+
(``node-scheduler.include-coordinator=false``), the coordinator caps query memory
285+
limits by the sum of worker-advertised general pool capacity. That is, the effective
286+
limit is ``min(query.max-memory or query.max-total-memory, sum of worker capacities)``.
287+
This allows the coordinator to use worker-advertised capacity for scheduling and OOM
288+
decisions instead of relying only on configured limits. Set to ``false`` to use only
289+
the configured ``query.max-memory`` and ``query.max-total-memory`` (previous behavior).
290+
277291
``memory.heap-headroom-per-node``
278292
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
279293

presto-main-base/src/main/java/com/facebook/presto/memory/LocalMemoryManager.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,35 @@ public LocalMemoryManager(NodeMemoryConfig config)
5151

5252
@VisibleForTesting
5353
public LocalMemoryManager(NodeMemoryConfig config, long availableMemory)
54+
{
55+
this(config, availableMemory, false);
56+
}
57+
58+
/**
59+
* Constructor for coordinator-only mode. When {@code useCoordinatorOnlyValidation} is true,
60+
* only heap headroom is validated against available memory; query.max-memory-per-node and
61+
* query.max-total-memory-per-node are not required to fit in this node's heap. This allows
62+
* a coordinator with a small JVM heap to start when node-scheduler.include-coordinator=false
63+
* and workers use larger per-node limits. The coordinator's pools are sized to (heap - headroom)
64+
* and no reserved pool is used.
65+
*/
66+
@VisibleForTesting
67+
public LocalMemoryManager(NodeMemoryConfig config, long availableMemory, boolean useCoordinatorOnlyValidation)
5468
{
5569
requireNonNull(config, "config is null");
56-
configureMemoryPools(config, availableMemory);
70+
configureMemoryPools(config, availableMemory, useCoordinatorOnlyValidation);
5771
}
5872

59-
private void configureMemoryPools(NodeMemoryConfig config, long availableMemory)
73+
private void configureMemoryPools(NodeMemoryConfig config, long availableMemory, boolean useCoordinatorOnlyValidation)
6074
{
75+
if (useCoordinatorOnlyValidation) {
76+
validateCoordinatorHeapHeadroom(config, availableMemory);
77+
maxMemory = new DataSize(availableMemory - config.getHeapHeadroom().toBytes(), BYTE);
78+
verify(maxMemory.toBytes() > 0, "general memory pool size is 0 after headroom");
79+
this.pools = ImmutableMap.of(GENERAL_POOL, new MemoryPool(GENERAL_POOL, maxMemory));
80+
return;
81+
}
82+
6183
validateHeapHeadroom(config, availableMemory);
6284
maxMemory = new DataSize(availableMemory - config.getHeapHeadroom().toBytes(), BYTE);
6385
checkArgument(
@@ -102,6 +124,23 @@ static void validateHeapHeadroom(NodeMemoryConfig config, long availableMemory)
102124
}
103125
}
104126

127+
/**
128+
* Validation for coordinator-only mode: only requires that heap headroom fits in available memory.
129+
* Used when node-scheduler.include-coordinator=false so the coordinator does not run tasks and
130+
* does not need query.max-memory-per-node / query.max-total-memory-per-node to fit in its heap.
131+
*/
132+
@VisibleForTesting
133+
static void validateCoordinatorHeapHeadroom(NodeMemoryConfig config, long availableMemory)
134+
{
135+
long heapHeadroom = config.getHeapHeadroom().toBytes();
136+
if (heapHeadroom < 0 || heapHeadroom >= availableMemory) {
137+
throw new IllegalArgumentException(
138+
format("Invalid memory configuration for coordinator. Heap headroom (%s) must be non-negative and less than available heap memory (%s)",
139+
heapHeadroom,
140+
availableMemory));
141+
}
142+
}
143+
105144
public MemoryInfo getInfo()
106145
{
107146
ImmutableMap.Builder<MemoryPoolId, MemoryPoolInfo> builder = ImmutableMap.builder();

presto-main-base/src/main/java/com/facebook/presto/memory/MemoryManagerConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class MemoryManagerConfig
3939
private String lowMemoryKillerPolicy = LowMemoryKillerPolicy.NONE;
4040
private Duration killOnOutOfMemoryDelay = new Duration(5, MINUTES);
4141
private boolean tableFinishOperatorMemoryTrackingEnabled;
42+
private boolean useWorkerAdvertisedMemoryForLimit = true;
4243

4344
public String getLowMemoryKillerPolicy()
4445
{
@@ -143,6 +144,19 @@ public MemoryManagerConfig setTableFinishOperatorMemoryTrackingEnabled(boolean t
143144
return this;
144145
}
145146

147+
public boolean isUseWorkerAdvertisedMemoryForLimit()
148+
{
149+
return useWorkerAdvertisedMemoryForLimit;
150+
}
151+
152+
@Config("query.use-worker-advertised-memory-for-limit")
153+
@ConfigDescription("When true and coordinator does not schedule work, cap query memory limits by the sum of worker-advertised general pool capacity")
154+
public MemoryManagerConfig setUseWorkerAdvertisedMemoryForLimit(boolean useWorkerAdvertisedMemoryForLimit)
155+
{
156+
this.useWorkerAdvertisedMemoryForLimit = useWorkerAdvertisedMemoryForLimit;
157+
return this;
158+
}
159+
146160
public static class LowMemoryKillerPolicy
147161
{
148162
public static final String NONE = "none";

presto-main-base/src/test/java/com/facebook/presto/memory/TestLocalMemoryManager.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,35 @@ public void testNotEnoughAvailableMemory()
7676

7777
new LocalMemoryManager(config, new DataSize(10, GIGABYTE).toBytes());
7878
}
79+
80+
/**
81+
* When coordinator-only validation is used, the coordinator only validates that heap headroom
82+
* fits in the JVM heap. Large query.max-memory-per-node / query.max-total-memory-per-node
83+
* (intended for workers) do not need to fit in the coordinator's heap.
84+
*/
85+
@Test
86+
public void testCoordinatorOnlyValidationAllowsLargePerNodeConfigWithSmallHeap()
87+
{
88+
NodeMemoryConfig config = new NodeMemoryConfig()
89+
.setHeapHeadroom(new DataSize(1, GIGABYTE))
90+
.setMaxQueryMemoryPerNode(new DataSize(32, GIGABYTE))
91+
.setMaxQueryTotalMemoryPerNode(new DataSize(64, GIGABYTE));
92+
93+
long smallCoordinatorHeap = new DataSize(4, GIGABYTE).toBytes();
94+
LocalMemoryManager localMemoryManager = new LocalMemoryManager(config, smallCoordinatorHeap, true);
95+
96+
assertFalse(localMemoryManager.getReservedPool().isPresent());
97+
assertEquals(localMemoryManager.getPools().size(), 1);
98+
assertEquals(localMemoryManager.getGeneralPool().getMaxBytes(), smallCoordinatorHeap - new DataSize(1, GIGABYTE).toBytes());
99+
}
100+
101+
@Test(expectedExceptions = IllegalArgumentException.class,
102+
expectedExceptionsMessageRegExp = "Invalid memory configuration for coordinator.*")
103+
public void testCoordinatorOnlyValidationFailsWhenHeadroomExceedsHeap()
104+
{
105+
NodeMemoryConfig config = new NodeMemoryConfig()
106+
.setHeapHeadroom(new DataSize(5, GIGABYTE));
107+
108+
new LocalMemoryManager(config, new DataSize(4, GIGABYTE).toBytes(), true);
109+
}
79110
}

presto-main-base/src/test/java/com/facebook/presto/memory/TestMemoryManagerConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public void testDefaults()
4141
.setSoftMaxQueryMemory(new DataSize(20, GIGABYTE))
4242
.setMaxQueryTotalMemory(new DataSize(40, GIGABYTE))
4343
.setSoftMaxQueryTotalMemory(new DataSize(40, GIGABYTE))
44-
.setTableFinishOperatorMemoryTrackingEnabled(false));
44+
.setTableFinishOperatorMemoryTrackingEnabled(false)
45+
.setUseWorkerAdvertisedMemoryForLimit(true));
4546
}
4647

4748
@Test
@@ -55,6 +56,7 @@ public void testExplicitPropertyMappings()
5556
.put("query.max-total-memory", "3GB")
5657
.put("query.soft-max-total-memory", "2GB")
5758
.put("table-finish-operator-memory-tracking-enabled", "true")
59+
.put("query.use-worker-advertised-memory-for-limit", "false")
5860
.build();
5961

6062
MemoryManagerConfig expected = new MemoryManagerConfig()
@@ -64,7 +66,8 @@ public void testExplicitPropertyMappings()
6466
.setSoftMaxQueryMemory(new DataSize(1, GIGABYTE))
6567
.setMaxQueryTotalMemory(new DataSize(3, GIGABYTE))
6668
.setSoftMaxQueryTotalMemory(new DataSize(2, GIGABYTE))
67-
.setTableFinishOperatorMemoryTrackingEnabled(true);
69+
.setTableFinishOperatorMemoryTrackingEnabled(true)
70+
.setUseWorkerAdvertisedMemoryForLimit(false);
6871

6972
assertFullMapping(properties, expected);
7073
}

presto-main-base/src/test/java/com/facebook/presto/memory/TestNodeMemoryConfig.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
2424
import static com.facebook.airlift.units.DataSize.Unit.BYTE;
2525
import static com.facebook.airlift.units.DataSize.Unit.GIGABYTE;
26+
import static com.facebook.presto.memory.LocalMemoryManager.validateCoordinatorHeapHeadroom;
2627
import static com.facebook.presto.memory.LocalMemoryManager.validateHeapHeadroom;
2728
import static com.facebook.presto.memory.NodeMemoryConfig.AVAILABLE_HEAP_MEMORY;
2829

@@ -121,4 +122,22 @@ public void testInvalidValues()
121122
// and the heap headroom and the config is more than that.
122123
validateHeapHeadroom(config, new DataSize(4, GIGABYTE).toBytes());
123124
}
125+
126+
@Test
127+
public void testCoordinatorOnlyValidationPassesWhenHeadroomFits()
128+
{
129+
NodeMemoryConfig config = new NodeMemoryConfig();
130+
config.setMaxQueryTotalMemoryPerNode(new DataSize(32, GIGABYTE));
131+
config.setHeapHeadroom(new DataSize(1, GIGABYTE));
132+
// Coordinator-only validation: only headroom must fit. Per-node limits are for workers.
133+
validateCoordinatorHeapHeadroom(config, new DataSize(4, GIGABYTE).toBytes());
134+
}
135+
136+
@Test(expectedExceptions = IllegalArgumentException.class)
137+
public void testCoordinatorOnlyValidationFailsWhenHeadroomExceedsHeap()
138+
{
139+
NodeMemoryConfig config = new NodeMemoryConfig();
140+
config.setHeapHeadroom(new DataSize(2, GIGABYTE));
141+
validateCoordinatorHeapHeadroom(config, new DataSize(1, GIGABYTE).toBytes());
142+
}
124143
}

presto-main/src/main/java/com/facebook/presto/memory/ClusterMemoryManager.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public class ClusterMemoryManager
130130
private final AtomicLong queriesKilledDueToOutOfMemory = new AtomicLong();
131131
private final boolean isWorkScheduledOnCoordinator;
132132
private final boolean isBinaryTransportEnabled;
133+
private final boolean useWorkerAdvertisedMemoryForLimit;
133134

134135
@GuardedBy("this")
135136
private final Map<String, RemoteNodeMemory> nodes = new HashMap<>();
@@ -183,6 +184,7 @@ public ClusterMemoryManager(
183184
this.killOnOutOfMemoryDelay = config.getKillOnOutOfMemoryDelay();
184185
this.isWorkScheduledOnCoordinator = schedulerConfig.isIncludeCoordinator();
185186
this.isBinaryTransportEnabled = communicationConfig.isBinaryTransportEnabled();
187+
this.useWorkerAdvertisedMemoryForLimit = config.isUseWorkerAdvertisedMemoryForLimit();
186188
if (this.isBinaryTransportEnabled) {
187189
this.memoryInfoCodec = requireNonNull(memoryInfoSmileCodec, "memoryInfoSmileCodec is null");
188190
this.assignmentsRequestCodec = requireNonNull(assignmentsRequestSmileCodec, "assignmentsRequestSmileCodec is null");
@@ -247,6 +249,19 @@ public synchronized void process(Iterable<QueryExecution> runningQueries)
247249
lastTimeNotOutOfMemory = System.nanoTime();
248250
}
249251

252+
// When coordinator does not schedule work, cap query limits by worker-advertised capacity
253+
// so the coordinator uses min(configured limit, sum of worker general pool capacity).
254+
long effectiveMaxQueryMemoryInBytes = maxQueryMemoryInBytes;
255+
long effectiveMaxQueryTotalMemoryInBytes = maxQueryTotalMemoryInBytes;
256+
if (useWorkerAdvertisedMemoryForLimit && !isWorkScheduledOnCoordinator) {
257+
ClusterMemoryPool generalPool = pools.get(GENERAL_POOL);
258+
long workerTotalCapacity = generalPool != null ? generalPool.getTotalDistributedBytes() : 0;
259+
if (workerTotalCapacity > 0) {
260+
effectiveMaxQueryTotalMemoryInBytes = min(maxQueryTotalMemoryInBytes, workerTotalCapacity);
261+
effectiveMaxQueryMemoryInBytes = min(maxQueryMemoryInBytes, effectiveMaxQueryTotalMemoryInBytes);
262+
}
263+
}
264+
250265
boolean queryKilled = false;
251266
long totalUserMemoryBytes = 0L;
252267
long totalMemoryBytes = 0L;
@@ -264,13 +279,13 @@ public synchronized void process(Iterable<QueryExecution> runningQueries)
264279
}
265280

266281
if (!resourceOvercommit) {
267-
long userMemoryLimit = min(maxQueryMemoryInBytes, getQueryMaxMemory(query.getSession()).toBytes());
282+
long userMemoryLimit = min(effectiveMaxQueryMemoryInBytes, getQueryMaxMemory(query.getSession()).toBytes());
268283
if (userMemoryReservation > userMemoryLimit) {
269284
query.fail(exceededGlobalUserLimit(succinctBytes(userMemoryLimit)));
270285
queryKilled = true;
271286
}
272287
QueryLimit<Long> queryTotalMemoryLimit = getMinimum(
273-
createDataSizeLimit(maxQueryTotalMemoryInBytes, SYSTEM),
288+
createDataSizeLimit(effectiveMaxQueryTotalMemoryInBytes, SYSTEM),
274289
query.getResourceGroupQueryLimits()
275290
.flatMap(ResourceGroupQueryLimits::getTotalMemoryLimit)
276291
.map(rgLimit -> createDataSizeLimit(rgLimit.toBytes(), RESOURCE_GROUP))
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.server;
15+
16+
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
17+
import com.facebook.presto.memory.LocalMemoryManager;
18+
import com.facebook.presto.memory.NodeMemoryConfig;
19+
import jakarta.inject.Inject;
20+
import jakarta.inject.Provider;
21+
22+
/**
23+
* Provides {@link LocalMemoryManager} with coordinator-only validation when this process is a
24+
* coordinator that does not schedule work on itself (node-scheduler.include-coordinator=false).
25+
* In that case only heap headroom is validated against the coordinator's JVM heap; the same
26+
* query.max-memory-per-node and query.max-total-memory-per-node config used for workers need
27+
* not fit in the coordinator's heap.
28+
*/
29+
public class LocalMemoryManagerProvider
30+
implements Provider<LocalMemoryManager>
31+
{
32+
private final NodeMemoryConfig nodeMemoryConfig;
33+
private final ServerConfig serverConfig;
34+
private final NodeSchedulerConfig nodeSchedulerConfig;
35+
36+
@Inject
37+
public LocalMemoryManagerProvider(
38+
NodeMemoryConfig nodeMemoryConfig,
39+
ServerConfig serverConfig,
40+
NodeSchedulerConfig nodeSchedulerConfig)
41+
{
42+
this.nodeMemoryConfig = nodeMemoryConfig;
43+
this.serverConfig = serverConfig;
44+
this.nodeSchedulerConfig = nodeSchedulerConfig;
45+
}
46+
47+
@Override
48+
public LocalMemoryManager get()
49+
{
50+
long availableMemory = Runtime.getRuntime().maxMemory();
51+
boolean useCoordinatorOnlyValidation = serverConfig.isCoordinator()
52+
&& !nodeSchedulerConfig.isIncludeCoordinator();
53+
return new LocalMemoryManager(nodeMemoryConfig, availableMemory, useCoordinatorOnlyValidation);
54+
}
55+
}

presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
560560
configBinder(binder).bindConfig(MemoryManagerConfig.class);
561561
configBinder(binder).bindConfig(NodeMemoryConfig.class);
562562
configBinder(binder).bindConfig(ReservedSystemMemoryConfig.class);
563-
binder.bind(LocalMemoryManager.class).in(Scopes.SINGLETON);
563+
binder.bind(LocalMemoryManager.class).toProvider(LocalMemoryManagerProvider.class).in(Scopes.SINGLETON);
564564
binder.bind(LocalMemoryManagerExporter.class).in(Scopes.SINGLETON);
565565
binder.bind(EmbedVersion.class).in(Scopes.SINGLETON);
566566
newExporter(binder).export(TaskManager.class).withGeneratedName();

0 commit comments

Comments
 (0)