Skip to content
Open
14 changes: 14 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,20 @@ system memory allocated by a query across all workers hits this limit it will be
killed. The value of ``query.max-total-memory`` must be greater than
``query.max-memory``.

``query.use-worker-advertised-memory-for-limit``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``true``

When ``true`` and the coordinator does not schedule work
(``node-scheduler.include-coordinator=false``), the coordinator caps query memory
limits by the sum of worker-advertised general pool capacity. That is, the effective
limit is ``min(query.max-memory or query.max-total-memory, sum of worker capacities)``.
This allows the coordinator to use worker-advertised capacity for scheduling and OOM
decisions instead of relying only on configured limits. Set to ``false`` to use only
the configured ``query.max-memory`` and ``query.max-total-memory``.

``memory.heap-headroom-per-node``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,35 @@ public LocalMemoryManager(NodeMemoryConfig config)

@VisibleForTesting
public LocalMemoryManager(NodeMemoryConfig config, long availableMemory)
{
this(config, availableMemory, false);
}

/**
* Constructor for coordinator-only mode. When {@code useCoordinatorOnlyValidation} is true,
* only heap headroom is validated against available memory; query.max-memory-per-node and
* query.max-total-memory-per-node are not required to fit in this node's heap. This allows
* a coordinator with a small JVM heap to start when node-scheduler.include-coordinator=false
* and workers use larger per-node limits. The coordinator's pools are sized to (heap - headroom)
* and no reserved pool is used.
*/
@VisibleForTesting
public LocalMemoryManager(NodeMemoryConfig config, long availableMemory, boolean useCoordinatorOnlyValidation)
{
requireNonNull(config, "config is null");
configureMemoryPools(config, availableMemory);
configureMemoryPools(config, availableMemory, useCoordinatorOnlyValidation);
}

private void configureMemoryPools(NodeMemoryConfig config, long availableMemory)
private void configureMemoryPools(NodeMemoryConfig config, long availableMemory, boolean useCoordinatorOnlyValidation)
{
if (useCoordinatorOnlyValidation) {
validateCoordinatorHeapHeadroom(config, availableMemory);
maxMemory = new DataSize(availableMemory - config.getHeapHeadroom().toBytes(), BYTE);
verify(maxMemory.toBytes() > 0, "general memory pool size is 0 after headroom");
this.pools = ImmutableMap.of(GENERAL_POOL, new MemoryPool(GENERAL_POOL, maxMemory));
return;
}

validateHeapHeadroom(config, availableMemory);
maxMemory = new DataSize(availableMemory - config.getHeapHeadroom().toBytes(), BYTE);
checkArgument(
Expand Down Expand Up @@ -102,6 +124,23 @@ static void validateHeapHeadroom(NodeMemoryConfig config, long availableMemory)
}
}

/**
* Validation for coordinator-only mode: only requires that heap headroom fits in available memory.
* Used when node-scheduler.include-coordinator=false so the coordinator does not run tasks and
* does not need query.max-memory-per-node / query.max-total-memory-per-node to fit in its heap.
*/
@VisibleForTesting
static void validateCoordinatorHeapHeadroom(NodeMemoryConfig config, long availableMemory)
{
long heapHeadroom = config.getHeapHeadroom().toBytes();
if (heapHeadroom < 0 || heapHeadroom >= availableMemory) {
throw new IllegalArgumentException(
format("Invalid memory configuration for coordinator. Heap headroom (%s) must be non-negative and less than available heap memory (%s)",
heapHeadroom,
availableMemory));
}
}

public MemoryInfo getInfo()
{
ImmutableMap.Builder<MemoryPoolId, MemoryPoolInfo> builder = ImmutableMap.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class MemoryManagerConfig
private String lowMemoryKillerPolicy = LowMemoryKillerPolicy.NONE;
private Duration killOnOutOfMemoryDelay = new Duration(5, MINUTES);
private boolean tableFinishOperatorMemoryTrackingEnabled;
private boolean useWorkerAdvertisedMemoryForLimit = true;

public String getLowMemoryKillerPolicy()
{
Expand Down Expand Up @@ -143,6 +144,19 @@ public MemoryManagerConfig setTableFinishOperatorMemoryTrackingEnabled(boolean t
return this;
}

public boolean isUseWorkerAdvertisedMemoryForLimit()
{
return useWorkerAdvertisedMemoryForLimit;
}

@Config("query.use-worker-advertised-memory-for-limit")
@ConfigDescription("When true and coordinator does not schedule work, cap query memory limits by the sum of worker-advertised general pool capacity")
public MemoryManagerConfig setUseWorkerAdvertisedMemoryForLimit(boolean useWorkerAdvertisedMemoryForLimit)
{
this.useWorkerAdvertisedMemoryForLimit = useWorkerAdvertisedMemoryForLimit;
return this;
}

public static class LowMemoryKillerPolicy
{
public static final String NONE = "none";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,35 @@ public void testNotEnoughAvailableMemory()

new LocalMemoryManager(config, new DataSize(10, GIGABYTE).toBytes());
}

/**
* When coordinator-only validation is used, the coordinator only validates that heap headroom
* fits in the JVM heap. Large query.max-memory-per-node / query.max-total-memory-per-node
* (intended for workers) do not need to fit in the coordinator's heap.
*/
@Test
public void testCoordinatorOnlyValidationAllowsLargePerNodeConfigWithSmallHeap()
{
NodeMemoryConfig config = new NodeMemoryConfig()
.setHeapHeadroom(new DataSize(1, GIGABYTE))
.setMaxQueryMemoryPerNode(new DataSize(32, GIGABYTE))
.setMaxQueryTotalMemoryPerNode(new DataSize(64, GIGABYTE));

long smallCoordinatorHeap = new DataSize(4, GIGABYTE).toBytes();
LocalMemoryManager localMemoryManager = new LocalMemoryManager(config, smallCoordinatorHeap, true);

assertFalse(localMemoryManager.getReservedPool().isPresent());
assertEquals(localMemoryManager.getPools().size(), 1);
assertEquals(localMemoryManager.getGeneralPool().getMaxBytes(), smallCoordinatorHeap - new DataSize(1, GIGABYTE).toBytes());
}

@Test(expectedExceptions = IllegalArgumentException.class,
expectedExceptionsMessageRegExp = "Invalid memory configuration for coordinator.*")
public void testCoordinatorOnlyValidationFailsWhenHeadroomExceedsHeap()
{
NodeMemoryConfig config = new NodeMemoryConfig()
.setHeapHeadroom(new DataSize(5, GIGABYTE));

new LocalMemoryManager(config, new DataSize(4, GIGABYTE).toBytes(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public void testDefaults()
.setSoftMaxQueryMemory(new DataSize(20, GIGABYTE))
.setMaxQueryTotalMemory(new DataSize(40, GIGABYTE))
.setSoftMaxQueryTotalMemory(new DataSize(40, GIGABYTE))
.setTableFinishOperatorMemoryTrackingEnabled(false));
.setTableFinishOperatorMemoryTrackingEnabled(false)
.setUseWorkerAdvertisedMemoryForLimit(true));
}

@Test
Expand All @@ -55,6 +56,7 @@ public void testExplicitPropertyMappings()
.put("query.max-total-memory", "3GB")
.put("query.soft-max-total-memory", "2GB")
.put("table-finish-operator-memory-tracking-enabled", "true")
.put("query.use-worker-advertised-memory-for-limit", "false")
.build();

MemoryManagerConfig expected = new MemoryManagerConfig()
Expand All @@ -64,7 +66,8 @@ public void testExplicitPropertyMappings()
.setSoftMaxQueryMemory(new DataSize(1, GIGABYTE))
.setMaxQueryTotalMemory(new DataSize(3, GIGABYTE))
.setSoftMaxQueryTotalMemory(new DataSize(2, GIGABYTE))
.setTableFinishOperatorMemoryTrackingEnabled(true);
.setTableFinishOperatorMemoryTrackingEnabled(true)
.setUseWorkerAdvertisedMemoryForLimit(false);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static com.facebook.airlift.units.DataSize.Unit.BYTE;
import static com.facebook.airlift.units.DataSize.Unit.GIGABYTE;
import static com.facebook.presto.memory.LocalMemoryManager.validateCoordinatorHeapHeadroom;
import static com.facebook.presto.memory.LocalMemoryManager.validateHeapHeadroom;
import static com.facebook.presto.memory.NodeMemoryConfig.AVAILABLE_HEAP_MEMORY;

Expand Down Expand Up @@ -121,4 +122,22 @@ public void testInvalidValues()
// and the heap headroom and the config is more than that.
validateHeapHeadroom(config, new DataSize(4, GIGABYTE).toBytes());
}

@Test
public void testCoordinatorOnlyValidationPassesWhenHeadroomFits()
{
NodeMemoryConfig config = new NodeMemoryConfig();
config.setMaxQueryTotalMemoryPerNode(new DataSize(32, GIGABYTE));
config.setHeapHeadroom(new DataSize(1, GIGABYTE));
// Coordinator-only validation: only headroom must fit. Per-node limits are for workers.
validateCoordinatorHeapHeadroom(config, new DataSize(4, GIGABYTE).toBytes());
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testCoordinatorOnlyValidationFailsWhenHeadroomExceedsHeap()
{
NodeMemoryConfig config = new NodeMemoryConfig();
config.setHeapHeadroom(new DataSize(2, GIGABYTE));
validateCoordinatorHeapHeadroom(config, new DataSize(1, GIGABYTE).toBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public class ClusterMemoryManager
private final AtomicLong queriesKilledDueToOutOfMemory = new AtomicLong();
private final boolean isWorkScheduledOnCoordinator;
private final boolean isBinaryTransportEnabled;
private final boolean useWorkerAdvertisedMemoryForLimit;

@GuardedBy("this")
private final Map<String, RemoteNodeMemory> nodes = new HashMap<>();
Expand Down Expand Up @@ -183,6 +184,7 @@ public ClusterMemoryManager(
this.killOnOutOfMemoryDelay = config.getKillOnOutOfMemoryDelay();
this.isWorkScheduledOnCoordinator = schedulerConfig.isIncludeCoordinator();
this.isBinaryTransportEnabled = communicationConfig.isBinaryTransportEnabled();
this.useWorkerAdvertisedMemoryForLimit = config.isUseWorkerAdvertisedMemoryForLimit();
if (this.isBinaryTransportEnabled) {
this.memoryInfoCodec = requireNonNull(memoryInfoSmileCodec, "memoryInfoSmileCodec is null");
this.assignmentsRequestCodec = requireNonNull(assignmentsRequestSmileCodec, "assignmentsRequestSmileCodec is null");
Expand Down Expand Up @@ -247,6 +249,19 @@ public synchronized void process(Iterable<QueryExecution> runningQueries)
lastTimeNotOutOfMemory = System.nanoTime();
}

// When coordinator does not schedule work, cap query limits by worker-advertised capacity
// so the coordinator uses min(configured limit, sum of worker general pool capacity).
long effectiveMaxQueryMemoryInBytes = maxQueryMemoryInBytes;
long effectiveMaxQueryTotalMemoryInBytes = maxQueryTotalMemoryInBytes;
if (useWorkerAdvertisedMemoryForLimit && !isWorkScheduledOnCoordinator) {
ClusterMemoryPool generalPool = pools.get(GENERAL_POOL);
long workerTotalCapacity = generalPool != null ? generalPool.getTotalDistributedBytes() : 0;
if (workerTotalCapacity > 0) {
effectiveMaxQueryTotalMemoryInBytes = min(maxQueryTotalMemoryInBytes, workerTotalCapacity);
effectiveMaxQueryMemoryInBytes = min(maxQueryMemoryInBytes, effectiveMaxQueryTotalMemoryInBytes);
}
}

boolean queryKilled = false;
long totalUserMemoryBytes = 0L;
long totalMemoryBytes = 0L;
Expand All @@ -264,13 +279,13 @@ public synchronized void process(Iterable<QueryExecution> runningQueries)
}

if (!resourceOvercommit) {
long userMemoryLimit = min(maxQueryMemoryInBytes, getQueryMaxMemory(query.getSession()).toBytes());
long userMemoryLimit = min(effectiveMaxQueryMemoryInBytes, getQueryMaxMemory(query.getSession()).toBytes());
if (userMemoryReservation > userMemoryLimit) {
query.fail(exceededGlobalUserLimit(succinctBytes(userMemoryLimit)));
queryKilled = true;
}
QueryLimit<Long> queryTotalMemoryLimit = getMinimum(
createDataSizeLimit(maxQueryTotalMemoryInBytes, SYSTEM),
createDataSizeLimit(effectiveMaxQueryTotalMemoryInBytes, SYSTEM),
query.getResourceGroupQueryLimits()
.flatMap(ResourceGroupQueryLimits::getTotalMemoryLimit)
.map(rgLimit -> createDataSizeLimit(rgLimit.toBytes(), RESOURCE_GROUP))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server;

import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.memory.LocalMemoryManager;
import com.facebook.presto.memory.NodeMemoryConfig;
import jakarta.inject.Inject;
import jakarta.inject.Provider;

/**
* Provides {@link LocalMemoryManager} with coordinator-only validation when this process is a
* coordinator that does not schedule work on itself (node-scheduler.include-coordinator=false).
* In that case only heap headroom is validated against the coordinator's JVM heap; the same
* query.max-memory-per-node and query.max-total-memory-per-node config used for workers need
* not fit in the coordinator's heap.
*/
public class LocalMemoryManagerProvider
implements Provider<LocalMemoryManager>
{
private final NodeMemoryConfig nodeMemoryConfig;
private final ServerConfig serverConfig;
private final NodeSchedulerConfig nodeSchedulerConfig;

@Inject
public LocalMemoryManagerProvider(
NodeMemoryConfig nodeMemoryConfig,
ServerConfig serverConfig,
NodeSchedulerConfig nodeSchedulerConfig)
{
this.nodeMemoryConfig = nodeMemoryConfig;
this.serverConfig = serverConfig;
this.nodeSchedulerConfig = nodeSchedulerConfig;
}

@Override
public LocalMemoryManager get()
{
long availableMemory = Runtime.getRuntime().maxMemory();
boolean useCoordinatorOnlyValidation = serverConfig.isCoordinator()
&& !nodeSchedulerConfig.isIncludeCoordinator();
return new LocalMemoryManager(nodeMemoryConfig, availableMemory, useCoordinatorOnlyValidation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
configBinder(binder).bindConfig(MemoryManagerConfig.class);
configBinder(binder).bindConfig(NodeMemoryConfig.class);
configBinder(binder).bindConfig(ReservedSystemMemoryConfig.class);
binder.bind(LocalMemoryManager.class).in(Scopes.SINGLETON);
binder.bind(LocalMemoryManager.class).toProvider(LocalMemoryManagerProvider.class).in(Scopes.SINGLETON);
binder.bind(LocalMemoryManagerExporter.class).in(Scopes.SINGLETON);
binder.bind(EmbedVersion.class).in(Scopes.SINGLETON);
newExporter(binder).export(TaskManager.class).withGeneratedName();
Expand Down
Loading