Skip to content

Commit 25b9ce5

Browse files
committed
Implement background scanner handling optional missing NodeClients
Signed-off-by: Simeon Widdis <[email protected]>
1 parent e38b709 commit 25b9ce5

File tree

12 files changed

+398
-86
lines changed

12 files changed

+398
-86
lines changed

opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.util.List;
99
import java.util.Map;
10+
import java.util.Optional;
1011
import org.opensearch.action.search.CreatePitRequest;
1112
import org.opensearch.action.search.DeletePitRequest;
1213
import org.opensearch.sql.opensearch.mapping.IndexMapping;
@@ -97,7 +98,7 @@ public interface OpenSearchClient {
9798
*/
9899
void schedule(Runnable task);
99100

100-
NodeClient getNodeClient();
101+
Optional<NodeClient> getNodeClient();
101102

102103
/**
103104
* Create PIT for given indices

opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.Collection;
1212
import java.util.List;
1313
import java.util.Map;
14+
import java.util.Optional;
1415
import java.util.concurrent.ExecutionException;
1516
import java.util.function.Function;
1617
import java.util.function.Predicate;
@@ -223,8 +224,8 @@ public void schedule(Runnable task) {
223224
}
224225

225226
@Override
226-
public NodeClient getNodeClient() {
227-
return client;
227+
public Optional<NodeClient> getNodeClient() {
228+
return Optional.of(client);
228229
}
229230

230231
@Override

opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.HashMap;
1414
import java.util.List;
1515
import java.util.Map;
16+
import java.util.Optional;
1617
import java.util.stream.Collectors;
1718
import java.util.stream.Stream;
1819
import lombok.RequiredArgsConstructor;
@@ -236,8 +237,8 @@ public void schedule(Runnable task) {
236237
}
237238

238239
@Override
239-
public NodeClient getNodeClient() {
240-
throw new UnsupportedOperationException("Unsupported method.");
240+
public Optional<NodeClient> getNodeClient() {
241+
return Optional.empty();
241242
}
242243

243244
@Override

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.LinkedHashMap;
1616
import java.util.List;
1717
import java.util.Map;
18+
import java.util.Optional;
1819
import java.util.concurrent.atomic.AtomicReference;
1920
import org.apache.calcite.plan.RelOptUtil;
2021
import org.apache.calcite.rel.RelNode;
@@ -47,13 +48,13 @@
4748
import org.opensearch.sql.expression.function.BuiltinFunctionName;
4849
import org.opensearch.sql.expression.function.PPLFuncImpTable;
4950
import org.opensearch.sql.opensearch.client.OpenSearchClient;
50-
import org.opensearch.sql.opensearch.client.OpenSearchNodeClient;
5151
import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector;
5252
import org.opensearch.sql.opensearch.functions.DistinctCountApproxAggFunction;
5353
import org.opensearch.sql.opensearch.functions.GeoIpFunction;
5454
import org.opensearch.sql.opensearch.util.JdbcOpenSearchDataTypeConvertor;
5555
import org.opensearch.sql.planner.physical.PhysicalPlan;
5656
import org.opensearch.sql.storage.TableScanOperator;
57+
import org.opensearch.transport.client.node.NodeClient;
5758

5859
/** OpenSearch execution engine implementation. */
5960
public class OpenSearchExecutionEngine implements ExecutionEngine {
@@ -268,9 +269,9 @@ private void buildResultSet(
268269

269270
/** Registers opensearch-dependent functions */
270271
private void registerOpenSearchFunctions() {
271-
if (client instanceof OpenSearchNodeClient) {
272-
SqlUserDefinedFunction geoIpFunction =
273-
new GeoIpFunction(client.getNodeClient()).toUDF("GEOIP");
272+
Optional<NodeClient> nodeClient = client.getNodeClient();
273+
if (nodeClient.isPresent()) {
274+
SqlUserDefinedFunction geoIpFunction = new GeoIpFunction(nodeClient.get()).toUDF("GEOIP");
274275
PPLFuncImpTable.INSTANCE.registerExternalOperator(BuiltinFunctionName.GEOIP, geoIpFunction);
275276
} else {
276277
logger.info(

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.LinkedHashMap;
1111
import java.util.Map;
1212
import java.util.Map.Entry;
13+
import java.util.Optional;
1314
import java.util.function.Function;
1415
import java.util.stream.Collectors;
1516
import lombok.Getter;
@@ -45,6 +46,7 @@
4546
import org.opensearch.sql.planner.logical.LogicalPlan;
4647
import org.opensearch.sql.planner.physical.PhysicalPlan;
4748
import org.opensearch.sql.storage.read.TableScanBuilder;
49+
import org.opensearch.transport.client.node.NodeClient;
4850

4951
/** OpenSearch table (index) implementation. */
5052
public class OpenSearchIndex extends AbstractOpenSearchTable {
@@ -231,27 +233,43 @@ public static class OpenSearchDefaultImplementor extends DefaultImplementor<Open
231233

232234
@Override
233235
public PhysicalPlan visitMLCommons(LogicalMLCommons node, OpenSearchIndexScan context) {
236+
Optional<NodeClient> nc = client.getNodeClient();
237+
if (nc.isEmpty()) {
238+
throw new UnsupportedOperationException(
239+
"Unable to run Machine Learning operators on clients outside of the local node");
240+
}
234241
return new MLCommonsOperator(
235-
visitChild(node, context),
236-
node.getAlgorithm(),
237-
node.getArguments(),
238-
client.getNodeClient());
242+
visitChild(node, context), node.getAlgorithm(), node.getArguments(), nc.get());
239243
}
240244

241245
@Override
242246
public PhysicalPlan visitAD(LogicalAD node, OpenSearchIndexScan context) {
243-
return new ADOperator(visitChild(node, context), node.getArguments(), client.getNodeClient());
247+
Optional<NodeClient> nc = client.getNodeClient();
248+
if (nc.isEmpty()) {
249+
throw new UnsupportedOperationException(
250+
"Unable to run Anomaly Detector operators on clients outside of the local node");
251+
}
252+
return new ADOperator(visitChild(node, context), node.getArguments(), nc.get());
244253
}
245254

246255
@Override
247256
public PhysicalPlan visitML(LogicalML node, OpenSearchIndexScan context) {
248-
return new MLOperator(visitChild(node, context), node.getArguments(), client.getNodeClient());
257+
Optional<NodeClient> nc = client.getNodeClient();
258+
if (nc.isEmpty()) {
259+
throw new UnsupportedOperationException(
260+
"Unable to run Machine Learning operators on clients outside of the local node");
261+
}
262+
return new MLOperator(visitChild(node, context), node.getArguments(), nc.get());
249263
}
250264

251265
@Override
252266
public PhysicalPlan visitEval(LogicalEval node, OpenSearchIndexScan context) {
253-
return new OpenSearchEvalOperator(
254-
visitChild(node, context), node.getExpressions(), client.getNodeClient());
267+
Optional<NodeClient> nc = client.getNodeClient();
268+
if (nc.isEmpty()) {
269+
throw new UnsupportedOperationException(
270+
"Unable to run Eval operators on clients outside of the local node");
271+
}
272+
return new OpenSearchEvalOperator(visitChild(node, context), node.getExpressions(), nc.get());
255273
}
256274
}
257275

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.opensearch.storage.scan;
7+
8+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_BACKGROUND_THREAD_POOL_NAME;
9+
10+
import java.util.Collections;
11+
import java.util.Iterator;
12+
import java.util.concurrent.CompletableFuture;
13+
import java.util.concurrent.ExecutionException;
14+
import java.util.concurrent.Executor;
15+
import javax.annotation.Nullable;
16+
import org.opensearch.sql.data.model.ExprValue;
17+
import org.opensearch.sql.exception.NonFallbackCalciteException;
18+
import org.opensearch.sql.opensearch.client.OpenSearchClient;
19+
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
20+
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
21+
22+
/**
23+
* Utility class for asynchronously scanning an index. This lets us send background requests to the
24+
* index while we work on processing the previous batch.
25+
*
26+
* <h2>Lifecycle</h2>
27+
*
28+
* The typical usage pattern is:
29+
*
30+
* <pre>
31+
* 1. Create scanner: new BackgroundSearchScanner(client)
32+
* 2. Start initial scan: startScanning(request)
33+
* 3. Fetch batches in a loop: fetchNextBatch(request, maxWindow)
34+
* 4. Close scanner when done: close()
35+
* </pre>
36+
*
37+
* <h2>Async vs Sync Behavior</h2>
38+
*
39+
* The scanner attempts to operate asynchronously when possible to improve performance:
40+
*
41+
* <ul>
42+
* <li>When async is available (client has thread pool access): - Next batch is pre-fetched while
43+
* current batch is being processed - Reduces latency between batches
44+
* <li>When async is not available (client lacks thread pool access): - Falls back to synchronous
45+
* fetching - Each batch is fetched only when needed
46+
* </ul>
47+
*
48+
* <h2>Termination Conditions</h2>
49+
*
50+
* Scanning will stop when any of these conditions are met:
51+
*
52+
* <ul>
53+
* <li>An empty response is received (lastBatch = true)
54+
* <li>Response is an aggregation or count response (fetchOnce = true)
55+
* <li>Response size is less than maxResultWindow (fetchOnce = true)
56+
* </ul>
57+
*
58+
* Note: This class should be explicitly closed when no longer needed to ensure proper resource
59+
* cleanup.
60+
*/
61+
public class BackgroundSearchScanner {
62+
private final OpenSearchClient client;
63+
@Nullable private final Executor backgroundExecutor;
64+
private CompletableFuture<OpenSearchResponse> nextBatchFuture = null;
65+
private boolean stopIteration = false;
66+
67+
public BackgroundSearchScanner(OpenSearchClient client) {
68+
this.client = client;
69+
// We can only actually do the background operation if we have the ability to access the thread
70+
// pool. Otherwise, fallback to synchronous fetch.
71+
if (client.getNodeClient().isPresent()) {
72+
this.backgroundExecutor =
73+
client.getNodeClient().get().threadPool().executor(SQL_BACKGROUND_THREAD_POOL_NAME);
74+
} else {
75+
this.backgroundExecutor = null;
76+
}
77+
}
78+
79+
private boolean isAsync() {
80+
return backgroundExecutor != null;
81+
}
82+
83+
/**
84+
* @return Whether the search scanner has fetched all batches
85+
*/
86+
public boolean isScanDone() {
87+
return stopIteration;
88+
}
89+
90+
/**
91+
* Initiates the scanning process. If async operations are available, this will trigger the first
92+
* background fetch.
93+
*
94+
* @param request The OpenSearch request to execute
95+
*/
96+
public void startScanning(OpenSearchRequest request) {
97+
if (isAsync()) {
98+
nextBatchFuture =
99+
CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor);
100+
}
101+
}
102+
103+
private OpenSearchResponse getCurrentResponse(OpenSearchRequest request) {
104+
if (isAsync()) {
105+
try {
106+
return nextBatchFuture.get();
107+
} catch (InterruptedException | ExecutionException e) {
108+
throw new NonFallbackCalciteException(
109+
"Failed to fetch data from the index: the background task failed or interrupted.\n"
110+
+ " Inner error: "
111+
+ e.getMessage());
112+
}
113+
} else {
114+
return client.search(request);
115+
}
116+
}
117+
118+
/**
119+
* Fetches the next batch of results. If async is enabled and more batches are expected, this will
120+
* also trigger the next background fetch.
121+
*
122+
* @param request The OpenSearch request to execute
123+
* @param maxResultWindow Maximum number of results to fetch per batch
124+
* @return SearchBatchResult containing the current batch's iterator and completion status
125+
* @throws NonFallbackCalciteException if the background fetch fails or is interrupted
126+
*/
127+
public SearchBatchResult fetchNextBatch(OpenSearchRequest request, int maxResultWindow) {
128+
OpenSearchResponse response = getCurrentResponse(request);
129+
130+
// Determine if we need future batches
131+
if (response.isAggregationResponse()
132+
|| response.isCountResponse()
133+
|| response.getHitsSize() < maxResultWindow) {
134+
stopIteration = true;
135+
}
136+
137+
Iterator<ExprValue> iterator;
138+
if (!response.isEmpty()) {
139+
iterator = response.iterator();
140+
141+
// Pre-fetch next batch if needed
142+
if (!stopIteration && isAsync()) {
143+
nextBatchFuture =
144+
CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor);
145+
}
146+
} else {
147+
iterator = Collections.emptyIterator();
148+
stopIteration = true;
149+
}
150+
151+
return new SearchBatchResult(iterator, stopIteration);
152+
}
153+
154+
/**
155+
* Resets the scanner to its initial state, allowing a new scan to begin. This clears all
156+
* completion flags and initiates a new background fetch if async is enabled.
157+
*
158+
* @param request The OpenSearch request to execute
159+
*/
160+
public void reset(OpenSearchRequest request) {
161+
stopIteration = false;
162+
startScanning(request);
163+
}
164+
165+
/**
166+
* Releases resources associated with this scanner. Cancels any pending background fetches and
167+
* marks the scan as complete. The scanner cannot be reused after closing without calling reset().
168+
*/
169+
public void close() {
170+
stopIteration = true;
171+
if (nextBatchFuture != null) {
172+
nextBatchFuture.cancel(true);
173+
}
174+
}
175+
176+
public record SearchBatchResult(Iterator<ExprValue> iterator, boolean stopIteration) {}
177+
}

0 commit comments

Comments
 (0)