Skip to content

Commit 08e40e6

Browse files
committed
Fix the thread pool handling for background execution
Signed-off-by: Simeon Widdis <[email protected]>
1 parent 8ea3372 commit 08e40e6

File tree

12 files changed

+69
-28
lines changed

12 files changed

+69
-28
lines changed

async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
1313
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
1414
import org.opensearch.plugins.Plugin;
15-
import org.opensearch.sql.legacy.executor.AsyncRestExecutor;
1615
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
1716
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
1817
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
@@ -21,6 +20,8 @@
2120
import org.opensearch.threadpool.ThreadPool;
2221
import org.opensearch.transport.client.Client;
2322

23+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
24+
2425
/**
2526
* The job runner class for scheduling async query.
2627
*
@@ -37,7 +38,7 @@
3738
public class ScheduledAsyncQueryJobRunner implements ScheduledJobRunner {
3839
// Share SQL plugin thread pool
3940
private static final String ASYNC_QUERY_THREAD_POOL_NAME =
40-
AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME;
41+
SQL_WORKER_THREAD_POOL_NAME;
4142
private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class);
4243

4344
private static final ScheduledAsyncQueryJobRunner INSTANCE = new ScheduledAsyncQueryJobRunner();

async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import static org.mockito.Mockito.spy;
1616
import static org.mockito.Mockito.verify;
1717
import static org.mockito.Mockito.when;
18+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
1819

1920
import java.time.Instant;
2021
import org.apache.logging.log4j.LogManager;
@@ -87,7 +88,7 @@ public void testRunJobWithCorrectParameter() {
8788
spyJobRunner.runJob(request, context);
8889

8990
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
90-
verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME))
91+
verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME))
9192
.submit(captor.capture());
9293

9394
Runnable runnable = captor.getValue();
@@ -145,7 +146,7 @@ public void testDoRefreshThrowsException() {
145146
spyJobRunner.runJob(request, context);
146147

147148
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
148-
verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME))
149+
verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME))
149150
.submit(captor.capture());
150151

151152
Runnable runnable = captor.getValue();

datasources/src/main/java/org/opensearch/sql/datasources/utils/Scheduler.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,11 @@
1111
import org.opensearch.common.unit.TimeValue;
1212
import org.opensearch.threadpool.ThreadPool;
1313
import org.opensearch.transport.client.node.NodeClient;
14+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
1415

15-
/** The scheduler which schedule the task run in sql-worker thread pool. */
16+
/** The scheduler which schedule the task run in sql_worker thread pool. */
1617
@UtilityClass
1718
public class Scheduler {
18-
19-
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";
20-
2119
public static void schedule(NodeClient client, Runnable task) {
2220
ThreadPool threadPool = client.threadPool();
2321
threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);

docs/dev/query-manager.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,4 @@ Parser parse raw query as Statement and create AbstractPlan. Each AbstractPlan d
3131
### Change of existing logic
3232
1. Remove the schedule logic in NIO thread. After the change,
3333
a. Parser will be executed in NIO thread.
34-
b. QueryManager decide query execution strategy. e.g. OpenSearchQueryManager schedule the QueryExecution running in **sql-worker** thread pool.
34+
b. QueryManager decide query execution strategy. e.g. OpenSearchQueryManager schedule the QueryExecution running in **sql_worker** thread pool.

docs/user/admin/settings.rst

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,27 @@ Result set::
216216
"transient": {}
217217
}
218218

219+
Thread Pool Settings
220+
====================
221+
222+
The SQL plugin is integrated with the `OpenSearch Thread Pool Settings <https://docs.opensearch.org/latest/install-and-configure/configuring-opensearch/thread-pool-settings/>`.
223+
There are two thread pools which can be configured on cluster setup via `settings.yml`::
224+
225+
thread_pool:
226+
sql_worker:
227+
size: 30
228+
queue_size: 100
229+
sql_background_io:
230+
size: 30
231+
queue_size: 1000
232+
233+
The ``sql_worker`` pool corresponds to compute resources related to running queries, such as compute-heavy evaluations on result sets.
234+
This directly maps to the number of queries that can be run concurrently.
235+
This is the primary pool you interact with externally.
236+
``sql_background_io`` is a low-footprint pool for IO requests the plugin makes,
237+
and can be used to partially the search load SQL places on your cluster for some types of expensive operations.
238+
A ``sql_worker`` thread may spawn multiple background threads.
239+
219240
plugins.query.executionengine.spark.session.limit
220241
==================================================
221242

legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.sql.legacy.executor;
77

8+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
9+
810
import java.io.IOException;
911
import java.time.Duration;
1012
import java.util.Map;
@@ -30,10 +32,6 @@
3032

3133
/** A RestExecutor wrapper to execute request asynchronously to avoid blocking transport thread. */
3234
public class AsyncRestExecutor implements RestExecutor {
33-
34-
/** Custom thread pool name managed by OpenSearch */
35-
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";
36-
3735
private static final Logger LOG = LogManager.getLogger(AsyncRestExecutor.class);
3836

3937
/**

legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.sql.legacy.executor.cursor;
77

8+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
9+
810
import java.io.IOException;
911
import java.time.Duration;
1012
import java.util.Map;
@@ -24,9 +26,6 @@
2426
import org.opensearch.transport.client.Client;
2527

2628
public class CursorAsyncRestExecutor {
27-
/** Custom thread pool name managed by OpenSearch */
28-
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";
29-
3029
private static final Logger LOG = LogManager.getLogger(CursorAsyncRestExecutor.class);
3130

3231
/** Delegated rest executor to async */

legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.sql.legacy.plugin;
77

88
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
9+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
910

1011
import com.google.common.collect.ImmutableList;
1112
import java.util.Arrays;
@@ -90,7 +91,7 @@ protected Set<String> responseParams() {
9091

9192
private void schedule(NodeClient client, Runnable task) {
9293
ThreadPool threadPool = client.threadPool();
93-
threadPool.schedule(withCurrentContext(task), new TimeValue(0), "sql-worker");
94+
threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);
9495
}
9596

9697
private Runnable withCurrentContext(final Runnable task) {

opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ public void cleanup(OpenSearchRequest request) {
218218

219219
@Override
220220
public void schedule(Runnable task) {
221-
// at that time, task already running the sql-worker ThreadPool.
221+
// at that time, task already running the sql_worker ThreadPool.
222222
task.run();
223223
}
224224

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ public class OpenSearchQueryManager implements QueryManager {
2121

2222
private final NodeClient nodeClient;
2323

24-
private static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";
24+
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql_worker";
25+
public static final String SQL_BACKGROUND_THREAD_POOL_NAME = "sql_background_io";
2526

2627
@Override
2728
public QueryId submit(AbstractPlan queryPlan) {

0 commit comments

Comments
 (0)