Skip to content

Commit 88b6343

Browse files
authored
fix(scheduler): Coordinator Task Throttling Bug (#27146)
## Description With coordinator task based throttling (queueing) enabled, we run into an issue where certain resource groups are never updated to be eligible to run. This occurs when the resource group is created during a task throttling period and canRun returns false, resulting in the resource group never being added as an eligible subgroup on creation. When we exit task throttling, an eligibility update is never triggered. if this group doesnt have a new query added after we exit task throttling, its status is never updated. Changes: 1. move the isTaskLimitExceeded check from canRunMore to internalStartNext, canRunMore will return true allowing the group to be marked as eligible, but internalStartNext will prevent the group from running more queries. 2. add check to enqueue immediate execution candidates if task throttling 3. remove experimental from session property 4. add tests to ensure resource groups properly queue/run queries with task limits (should this be in resourceGroups or testQueryTaskLimit?) Meta Internal review by: spershin Meta Internal Differential Revision: D92632990 ## Motivation and Context Coordinator memory is being overloaded by queries with large task counts. There needs to be safeguards on this outside of just RG's. The existing coordinator task throttling property has some issues which are fixed by this PR. ## Impact Coordinator task throttling no longer causes stuck resource groups. Config renamed from experimental.max-total-running-task-count-to-not-execute-new-query -> max-total-running-task-count-to-not-execute-new-query, however the old config will be kept as a legacy config for backwards compatibility Coordinator task throttling, when used in conjunction with query-pacing, should limit the number of tasks on the cluster close to the limit. ## Test Plan Bug Reproduction Set task limit to 1 on a test cluster. Trigger multiple queries that peak at 10-30 tasks and have execution time from 10-30 secs <img width="2732" height="1482" alt="image" src="https://github.com/user-attachments/assets/3bef60b3-ee39-4190-8e0f-b972736876af" /> repro with larger query suite <img width="2594" height="1254" alt="image" src="https://github.com/user-attachments/assets/dd26ed05-4c33-4c46-ae70-fed41098c389" /> Test: build and push again to test cluster, test previous repro Seems to kick in as expected, cluster submits a lot of queries as running before TaskLimitExceeded fires, after which it seems to run 1-3 queries at a time for the remainder of the queue. However it seemed like the cluster was still admitting queries slowly even in a task throttling state <img width="870" height="686" alt="image" src="https://github.com/user-attachments/assets/04c14abd-8c74-4c90-9625-2b2119ee2fc6" /> Following the previous fix, it was noticed that internalStartNext would not prevent immediate executions, only queued queries. This was then patched to block immediate executions during task throttling periods to prevent queries from running while in a task throttling state. Test with second fix <img width="2438" height="1150" alt="image" src="https://github.com/user-attachments/assets/3ed7d10f-2d5f-47f3-9969-e343b467ef5f" /> The spikes in this fix are because multiple queries can be admitted with no pacing, before re-entering task throttling state. With query pacing, this aspect should be mitigated. ## Contributor checklist - [ ] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [ ] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [ ] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [ ] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [ ] Adequate tests were added if applicable. - [ ] CI passed. ## Summary by Sourcery Fix coordinator task-based throttling so resource groups correctly queue and start queries when task limits are exceeded and later cleared. Bug Fixes: - Ensure resource groups remain eligible and properly queue queries instead of silently starving when the coordinator task limit is exceeded. - Prevent new queries from starting immediately when the coordinator is overloaded while still allowing existing running queries to continue. Enhancements: - Refine admission control in resource groups to consider coordinator overload separately from eligibility and concurrency checks. - Promote the task-limit-based throttling session property from experimental by renaming its configuration key. Tests: - Add unit tests covering query queuing and execution across task-limit transitions, including subgroup hierarchies and multiple throttle cycles. - Update configuration and task-limit integration tests to use the non-experimental task throttling property. ## Release Notes Please follow [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines) and fill in the release notes below. ``` == RELEASE NOTES == General Changes * Fix a bug where queries could get permanently stuck in resource groups when coordinator task-based throttling (``experimental.max-total-running-task-count-to-not-execute-new-query``) is enabled. * Replace experimental.max-total-running-task-count-to-not-execute-new-query with max-total-running-task-count-to-not-execute-new-query, this is backwards compatible
1 parent 8236897 commit 88b6343

File tree

6 files changed

+265
-8
lines changed

6 files changed

+265
-8
lines changed

presto-docs/src/main/sphinx/admin/properties.rst

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1368,6 +1368,40 @@ This allows the cluster to quickly ramp up when idle while still providing
13681368
protection against overload when the cluster is busy. Set to ``0`` to always
13691369
apply pacing when ``max-queries-per-second`` is configured.
13701370

1371+
``max-total-running-task-count-to-not-execute-new-query``
1372+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1373+
1374+
* **Type:** ``integer``
1375+
* **Minimum value:** ``1``
1376+
* **Default value:** ``2147483647`` (unlimited)
1377+
1378+
Maximum total running task count across all queries on the coordinator. When
1379+
this threshold is exceeded, new queries are held in the queue rather than
1380+
being scheduled for execution. This helps prevent coordinator overload by
1381+
limiting the number of concurrent tasks being managed.
1382+
1383+
Unlike ``max-total-running-task-count-to-kill-query`` which kills queries when
1384+
the limit is exceeded, this property proactively prevents new queries from
1385+
starting while allowing existing queries to complete normally.
1386+
1387+
This property works in conjunction with query admission pacing
1388+
(``query-manager.query-pacing.max-queries-per-second``) to provide
1389+
comprehensive coordinator load management. When both are configured:
1390+
1391+
1. Pacing controls the rate at which queries are admitted
1392+
2. This property provides a hard cap on total concurrent tasks
1393+
1394+
Without query-pacing, the cluster can admit multiple queries at once, which
1395+
can lead to significantly more concurrent tasks than expected over this limit.
1396+
1397+
Set to a lower value (e.g., ``50000``) to limit coordinator task management
1398+
overhead. The default value effectively disables this feature.
1399+
1400+
.. note::
1401+
1402+
For backwards compatibility, this property can also be configured using the
1403+
legacy name ``experimental.max-total-running-task-count-to-not-execute-new-query``.
1404+
13711405
Query Retry Properties
13721406
----------------------
13731407

presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,8 @@ public int getMaxQueryRunningTaskCount()
321321
return maxQueryRunningTaskCount;
322322
}
323323

324-
@Config("experimental.max-total-running-task-count-to-not-execute-new-query")
324+
@LegacyConfig("experimental.max-total-running-task-count-to-not-execute-new-query")
325+
@Config("max-total-running-task-count-to-not-execute-new-query")
325326
@ConfigDescription("Keep new queries in the queue if total task count exceeds this threshold")
326327
public QueryManagerConfig setMaxTotalRunningTaskCountToNotExecuteNewQuery(int maxTotalRunningTaskCountToNotExecuteNewQuery)
327328
{

presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,18 @@ public void run(ManagedQueryExecution query)
740740
else {
741741
query.setResourceGroupQueryLimits(perQueryLimits);
742742
boolean immediateStartCandidate = canRun && queuedQueries.isEmpty();
743-
if (immediateStartCandidate && queryPacingContext.tryAcquireAdmissionSlot()) {
743+
boolean startQuery = immediateStartCandidate;
744+
if (immediateStartCandidate) {
745+
// Check for coordinator overload (task limit exceeded or denied admission)
746+
//isTaskLimitExceeded MUST be checked before tryAcquireAdmissionSlot, or else admission slots will be acquired but not started
747+
boolean coordOverloaded = ((RootInternalResourceGroup) root).isTaskLimitExceeded()
748+
|| !queryPacingContext.tryAcquireAdmissionSlot();
749+
if (coordOverloaded) {
750+
startQuery = false;
751+
}
752+
}
753+
754+
if (startQuery) {
744755
startInBackground(query);
745756
}
746757
else {
@@ -914,6 +925,10 @@ protected boolean internalStartNext()
914925
{
915926
checkState(Thread.holdsLock(root), "Must hold lock to find next query");
916927
synchronized (root) {
928+
if (((RootInternalResourceGroup) root).isTaskLimitExceeded()) {
929+
return false;
930+
}
931+
917932
if (!canRunMore()) {
918933
return false;
919934
}
@@ -1052,10 +1067,6 @@ private boolean canRunMore()
10521067
return false;
10531068
}
10541069

1055-
if (((RootInternalResourceGroup) root).isTaskLimitExceeded()) {
1056-
return false;
1057-
}
1058-
10591070
int hardConcurrencyLimit = getHardConcurrencyLimitBasedOnCpuUsage();
10601071

10611072
int totalRunningQueries = runningQueries.size() + descendantRunningQueries;

presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public void testExplicitPropertyMappings()
102102
.put("query.stage-count-warning-threshold", "12300")
103103
.put("max-total-running-task-count-to-kill-query", "60000")
104104
.put("max-query-running-task-count", "10000")
105-
.put("experimental.max-total-running-task-count-to-not-execute-new-query", "50000")
105+
.put("max-total-running-task-count-to-not-execute-new-query", "50000")
106106
.put("concurrency-threshold-to-enable-resource-group-refresh", "2")
107107
.put("resource-group-runtimeinfo-refresh-interval", "10ms")
108108
.put("query.schedule-split-batch-size", "99")

presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,4 +1098,215 @@ public String getName()
10981098

10991099
return new ClusterResourceChecker(mockPolicy, config, createNodeManager());
11001100
}
1101+
1102+
// Tests that when task limit is exceeded, new queries are queued instead of starting immediately
1103+
@Test(timeOut = 10_000)
1104+
public void testTaskLimitExceededQueuesQuery()
1105+
{
1106+
RootInternalResourceGroup root = new RootInternalResourceGroup(
1107+
"root",
1108+
(group, export) -> {},
1109+
directExecutor(),
1110+
ignored -> Optional.empty(),
1111+
rg -> false,
1112+
createNodeManager(),
1113+
createClusterResourceChecker(),
1114+
QueryPacingContext.NOOP);
1115+
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
1116+
root.setMaxQueuedQueries(10);
1117+
root.setHardConcurrencyLimit(10);
1118+
1119+
// Set task limit exceeded
1120+
root.setTaskLimitExceeded(true);
1121+
1122+
// Submit a query - it should be queued because task limit is exceeded
1123+
MockManagedQueryExecution query1 = new MockManagedQueryExecution(0);
1124+
query1.startWaitingForPrerequisites();
1125+
root.run(query1);
1126+
1127+
// Query should be queued, not running
1128+
assertEquals(query1.getState(), QUEUED);
1129+
assertEquals(root.getQueuedQueries(), 1);
1130+
assertEquals(root.getRunningQueries(), 0);
1131+
}
1132+
1133+
// Tests that queued queries start when task limit is no longer exceeded
1134+
@Test(timeOut = 10_000)
1135+
public void testQueryStartsWhenTaskLimitClears()
1136+
{
1137+
RootInternalResourceGroup root = new RootInternalResourceGroup(
1138+
"root",
1139+
(group, export) -> {},
1140+
directExecutor(),
1141+
ignored -> Optional.empty(),
1142+
rg -> false,
1143+
createNodeManager(),
1144+
createClusterResourceChecker(),
1145+
QueryPacingContext.NOOP);
1146+
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
1147+
root.setMaxQueuedQueries(10);
1148+
root.setHardConcurrencyLimit(10);
1149+
1150+
// Set task limit exceeded
1151+
root.setTaskLimitExceeded(true);
1152+
1153+
// Submit queries - they should be queued
1154+
MockManagedQueryExecution query1 = new MockManagedQueryExecution(0);
1155+
query1.startWaitingForPrerequisites();
1156+
root.run(query1);
1157+
MockManagedQueryExecution query2 = new MockManagedQueryExecution(0);
1158+
query2.startWaitingForPrerequisites();
1159+
root.run(query2);
1160+
1161+
assertEquals(query1.getState(), QUEUED);
1162+
assertEquals(query2.getState(), QUEUED);
1163+
assertEquals(root.getQueuedQueries(), 2);
1164+
assertEquals(root.getRunningQueries(), 0);
1165+
1166+
// Clear task limit
1167+
root.setTaskLimitExceeded(false);
1168+
1169+
// Process queued queries - they should now start
1170+
root.processQueuedQueries();
1171+
1172+
assertEquals(query1.getState(), RUNNING);
1173+
assertEquals(query2.getState(), RUNNING);
1174+
assertEquals(root.getQueuedQueries(), 0);
1175+
assertEquals(root.getRunningQueries(), 2);
1176+
}
1177+
1178+
// Tests that queries in a subgroup hierarchy are properly queued and started when task limit changes
1179+
@Test(timeOut = 10_000)
1180+
public void testTaskLimitExceededWithSubgroups()
1181+
{
1182+
RootInternalResourceGroup root = new RootInternalResourceGroup(
1183+
"root",
1184+
(group, export) -> {},
1185+
directExecutor(),
1186+
ignored -> Optional.empty(),
1187+
rg -> false,
1188+
createNodeManager(),
1189+
createClusterResourceChecker(),
1190+
QueryPacingContext.NOOP);
1191+
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
1192+
root.setMaxQueuedQueries(10);
1193+
root.setHardConcurrencyLimit(10);
1194+
1195+
InternalResourceGroup groupA = root.getOrCreateSubGroup("A", true);
1196+
groupA.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
1197+
groupA.setMaxQueuedQueries(10);
1198+
groupA.setHardConcurrencyLimit(10);
1199+
1200+
InternalResourceGroup groupG = groupA.getOrCreateSubGroup("G", true);
1201+
groupG.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
1202+
groupG.setMaxQueuedQueries(10);
1203+
groupG.setHardConcurrencyLimit(10);
1204+
1205+
// Set task limit exceeded
1206+
root.setTaskLimitExceeded(true);
1207+
1208+
// Submit a query to leaf group G - it should be queued
1209+
MockManagedQueryExecution query1 = new MockManagedQueryExecution(0);
1210+
query1.startWaitingForPrerequisites();
1211+
groupG.run(query1);
1212+
1213+
assertEquals(query1.getState(), QUEUED);
1214+
assertEquals(groupG.getQueuedQueries(), 1);
1215+
assertEquals(groupG.getRunningQueries(), 0);
1216+
1217+
// Clear task limit and process queued queries
1218+
root.setTaskLimitExceeded(false);
1219+
root.processQueuedQueries();
1220+
1221+
// Query should now be running
1222+
assertEquals(query1.getState(), RUNNING);
1223+
assertEquals(groupG.getQueuedQueries(), 0);
1224+
assertEquals(groupG.getRunningQueries(), 1);
1225+
}
1226+
1227+
// Tests that when task limit is exceeded, queries already running continue, but new ones are queued
1228+
@Test(timeOut = 10_000)
1229+
public void testTaskLimitExceededDoesNotAffectRunningQueries()
1230+
{
1231+
RootInternalResourceGroup root = new RootInternalResourceGroup(
1232+
"root",
1233+
(group, export) -> {},
1234+
directExecutor(),
1235+
ignored -> Optional.empty(),
1236+
rg -> false,
1237+
createNodeManager(),
1238+
createClusterResourceChecker(),
1239+
QueryPacingContext.NOOP);
1240+
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
1241+
root.setMaxQueuedQueries(10);
1242+
root.setHardConcurrencyLimit(10);
1243+
1244+
// Submit a query before task limit is exceeded - it should run
1245+
MockManagedQueryExecution query1 = new MockManagedQueryExecution(0);
1246+
query1.startWaitingForPrerequisites();
1247+
root.run(query1);
1248+
assertEquals(query1.getState(), RUNNING);
1249+
1250+
// Now set task limit exceeded
1251+
root.setTaskLimitExceeded(true);
1252+
1253+
// Submit another query - it should be queued
1254+
MockManagedQueryExecution query2 = new MockManagedQueryExecution(0);
1255+
query2.startWaitingForPrerequisites();
1256+
root.run(query2);
1257+
assertEquals(query2.getState(), QUEUED);
1258+
1259+
// The first query should still be running
1260+
assertEquals(query1.getState(), RUNNING);
1261+
assertEquals(root.getRunningQueries(), 1);
1262+
assertEquals(root.getQueuedQueries(), 1);
1263+
}
1264+
1265+
// Tests that task limit transitions work correctly with multiple cycles
1266+
@Test(timeOut = 10_000)
1267+
public void testTaskLimitExceededMultipleCycles()
1268+
{
1269+
RootInternalResourceGroup root = new RootInternalResourceGroup(
1270+
"root",
1271+
(group, export) -> {},
1272+
directExecutor(),
1273+
ignored -> Optional.empty(),
1274+
rg -> false,
1275+
createNodeManager(),
1276+
createClusterResourceChecker(),
1277+
QueryPacingContext.NOOP);
1278+
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
1279+
root.setMaxQueuedQueries(10);
1280+
root.setHardConcurrencyLimit(10);
1281+
1282+
// Cycle 1: Task limit exceeded, query queued
1283+
root.setTaskLimitExceeded(true);
1284+
MockManagedQueryExecution query1 = new MockManagedQueryExecution(0);
1285+
query1.startWaitingForPrerequisites();
1286+
root.run(query1);
1287+
assertEquals(query1.getState(), QUEUED);
1288+
1289+
// Clear task limit, query starts
1290+
root.setTaskLimitExceeded(false);
1291+
root.processQueuedQueries();
1292+
assertEquals(query1.getState(), RUNNING);
1293+
1294+
// Cycle 2: Task limit exceeded again, new query queued
1295+
root.setTaskLimitExceeded(true);
1296+
MockManagedQueryExecution query2 = new MockManagedQueryExecution(0);
1297+
query2.startWaitingForPrerequisites();
1298+
root.run(query2);
1299+
assertEquals(query2.getState(), QUEUED);
1300+
assertEquals(query1.getState(), RUNNING); // query1 still running
1301+
1302+
// Complete query1, processQueuedQueries should not start query2 (task limit still exceeded)
1303+
query1.complete();
1304+
root.processQueuedQueries();
1305+
assertEquals(query2.getState(), QUEUED); // Still queued because task limit exceeded
1306+
1307+
// Clear task limit, query2 starts
1308+
root.setTaskLimitExceeded(false);
1309+
root.processQueuedQueries();
1310+
assertEquals(query2.getState(), RUNNING);
1311+
}
11011312
}

presto-tests/src/test/java/com/facebook/presto/tests/TestQueryTaskLimit.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void testQueuingWhenTaskLimitExceeds()
9494
{
9595
ImmutableMap<String, String> extraProperties = ImmutableMap.<String, String>builder()
9696
.put("experimental.spill-enabled", "false")
97-
.put("experimental.max-total-running-task-count-to-not-execute-new-query", "2")
97+
.put("max-total-running-task-count-to-not-execute-new-query", "2")
9898
.build();
9999

100100
try (DistributedQueryRunner queryRunner = createQueryRunner(defaultSession, extraProperties)) {

0 commit comments

Comments
 (0)