Skip to content

Commit 5addc89

Browse files
committed
[Mustang] Enable profiling and migrate to UnifiedQueryParser (#5247)
Task 1: Enable profiling (#5268) - Add .profiling(pplRequest.profile()) to UnifiedQueryContext.builder() in both doExecute and doExplain Task 2: Migrate to UnifiedQueryParser for index extraction (#5274) - Replace StubIndexDetector regex with PPLQueryParser AST-based extraction: parse query, walk AST to find Relation node, extract table name via getTableQualifiedName() - Delete StubIndexDetector - isAnalyticsIndex() is now an instance method (needs PPLQueryParser) - Constructor takes Settings for PPLQueryParser Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 2920fc5 commit 5addc89

File tree

4 files changed

+80
-119
lines changed

4 files changed

+80
-119
lines changed

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 53 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,18 @@
2020
import org.opensearch.core.action.ActionListener;
2121
import org.opensearch.sql.api.UnifiedQueryContext;
2222
import org.opensearch.sql.api.UnifiedQueryPlanner;
23+
import org.opensearch.sql.api.parser.PPLQueryParser;
24+
import org.opensearch.sql.ast.tree.Relation;
25+
import org.opensearch.sql.ast.tree.UnresolvedPlan;
2326
import org.opensearch.sql.calcite.CalcitePlanContext;
2427
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
2528
import org.opensearch.sql.common.response.ResponseListener;
29+
import org.opensearch.sql.common.setting.Settings;
2630
import org.opensearch.sql.executor.ExecutionEngine.QueryResponse;
2731
import org.opensearch.sql.executor.QueryType;
2832
import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine;
2933
import org.opensearch.sql.executor.analytics.QueryPlanExecutor;
3034
import org.opensearch.sql.lang.LangSpec;
31-
import org.opensearch.sql.plugin.rest.analytics.stub.StubIndexDetector;
3235
import org.opensearch.sql.plugin.rest.analytics.stub.StubSchemaProvider;
3336
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
3437
import org.opensearch.sql.ppl.domain.PPLQueryRequest;
@@ -49,30 +52,59 @@ public class RestUnifiedQueryAction {
4952

5053
private final AnalyticsExecutionEngine analyticsEngine;
5154
private final NodeClient client;
55+
private final PPLQueryParser pplParser;
5256

53-
public RestUnifiedQueryAction(NodeClient client, QueryPlanExecutor planExecutor) {
57+
public RestUnifiedQueryAction(
58+
NodeClient client, QueryPlanExecutor planExecutor, Settings settings) {
5459
this.client = client;
5560
this.analyticsEngine = new AnalyticsExecutionEngine(planExecutor);
61+
this.pplParser = new PPLQueryParser(settings);
5662
}
5763

5864
/**
59-
* Check if the query targets an analytics engine index. Delegates to {@link StubIndexDetector}
60-
* which will be replaced by UnifiedQueryParser and index settings when available.
65+
* Check if the query targets an analytics engine index (e.g., Parquet-backed). Uses {@link
66+
* PPLQueryParser} to parse the query and extract the index name from the AST.
6167
*/
62-
public static boolean isAnalyticsIndex(String query) {
63-
return StubIndexDetector.isAnalyticsIndex(query);
68+
public boolean isAnalyticsIndex(String query) {
69+
if (query == null || query.isEmpty()) {
70+
return false;
71+
}
72+
try {
73+
String indexName = extractIndexName(query);
74+
if (indexName == null) {
75+
return false;
76+
}
77+
int lastDot = indexName.lastIndexOf('.');
78+
String tableName = lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName;
79+
return tableName.startsWith("parquet_");
80+
} catch (Exception e) {
81+
return false;
82+
}
83+
}
84+
85+
/** Extract the source index name by parsing the PPL AST and finding the Relation node. */
86+
private String extractIndexName(String query) {
87+
UnresolvedPlan plan = pplParser.parse(query);
88+
Relation relation = findRelation(plan);
89+
return relation != null ? relation.getTableQualifiedName().toString() : null;
90+
}
91+
92+
/** Walk the AST to find the Relation (table scan) node. */
93+
private static Relation findRelation(UnresolvedPlan plan) {
94+
if (plan instanceof Relation) {
95+
return (Relation) plan;
96+
}
97+
for (var child : plan.getChild()) {
98+
if (child instanceof UnresolvedPlan unresolvedChild) {
99+
Relation found = findRelation(unresolvedChild);
100+
if (found != null) {
101+
return found;
102+
}
103+
}
104+
}
105+
return null;
64106
}
65107

66-
/**
67-
* Execute a query through the unified query pipeline on the sql-worker thread pool. Called from
68-
* {@code TransportPPLQueryAction} which handles PPL enabled check, metrics, request ID, and
69-
* profiling cleanup.
70-
*
71-
* @param query the query string
72-
* @param queryType SQL or PPL
73-
* @param pplRequest the original PPL request
74-
* @param listener the transport action listener
75-
*/
76108
/** Execute a query through the unified query pipeline on the sql-worker thread pool. */
77109
public void execute(
78110
String query,
@@ -90,7 +122,7 @@ public void execute(
90122
/**
91123
* Explain a query through the unified query pipeline on the sql-worker thread pool. Returns
92124
* ExplainResponse via ResponseListener so the caller (TransportPPLQueryAction) can format it
93-
* using its own createExplainResponseListener, reusing the existing format-aware logic.
125+
* using its own createExplainResponseListener.
94126
*/
95127
public void explain(
96128
String query,
@@ -111,14 +143,14 @@ private void doExecute(
111143
PPLQueryRequest pplRequest,
112144
ActionListener<TransportPPLQueryResponse> listener) {
113145
try {
114-
long startTime = System.nanoTime();
115146
AbstractSchema schema = StubSchemaProvider.buildSchema();
116147

117148
try (UnifiedQueryContext context =
118149
UnifiedQueryContext.builder()
119150
.language(queryType)
120151
.catalog(SCHEMA_NAME, schema)
121152
.defaultNamespace(SCHEMA_NAME)
153+
.profiling(pplRequest.profile())
122154
.build()) {
123155

124156
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
@@ -127,14 +159,7 @@ private void doExecute(
127159
CalcitePlanContext planContext = context.getPlanContext();
128160
plan = addQuerySizeLimit(plan, planContext);
129161

130-
long planTime = System.nanoTime();
131-
LOG.info(
132-
"[unified] Planning completed in {}ms for {} query",
133-
(planTime - startTime) / 1_000_000,
134-
queryType);
135-
136-
analyticsEngine.execute(
137-
plan, planContext, createQueryListener(queryType, planTime, listener));
162+
analyticsEngine.execute(plan, planContext, createQueryListener(queryType, listener));
138163
}
139164
} catch (Exception e) {
140165
listener.onFailure(e);
@@ -147,14 +172,14 @@ private void doExplain(
147172
PPLQueryRequest pplRequest,
148173
ResponseListener<ExplainResponse> listener) {
149174
try {
150-
long startTime = System.nanoTime();
151175
AbstractSchema schema = StubSchemaProvider.buildSchema();
152176

153177
try (UnifiedQueryContext context =
154178
UnifiedQueryContext.builder()
155179
.language(queryType)
156180
.catalog(SCHEMA_NAME, schema)
157181
.defaultNamespace(SCHEMA_NAME)
182+
.profiling(pplRequest.profile())
158183
.build()) {
159184

160185
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
@@ -163,23 +188,13 @@ private void doExplain(
163188
CalcitePlanContext planContext = context.getPlanContext();
164189
plan = addQuerySizeLimit(plan, planContext);
165190

166-
long planTime = System.nanoTime();
167-
LOG.info(
168-
"[unified] Planning completed in {}ms for {} query",
169-
(planTime - startTime) / 1_000_000,
170-
queryType);
171-
172191
analyticsEngine.explain(plan, pplRequest.mode(), planContext, listener);
173192
}
174193
} catch (Exception e) {
175194
listener.onFailure(e);
176195
}
177196
}
178197

179-
/**
180-
* Add a system-level query size limit to the plan. This ensures the analytics engine enforces the
181-
* limit during execution rather than returning all rows for post-processing truncation.
182-
*/
183198
private static RelNode addQuerySizeLimit(RelNode plan, CalcitePlanContext context) {
184199
return LogicalSystemLimit.create(
185200
LogicalSystemLimit.SystemLimitType.QUERY_SIZE_LIMIT,
@@ -188,18 +203,11 @@ private static RelNode addQuerySizeLimit(RelNode plan, CalcitePlanContext contex
188203
}
189204

190205
private ResponseListener<QueryResponse> createQueryListener(
191-
QueryType queryType,
192-
long planEndTime,
193-
ActionListener<TransportPPLQueryResponse> transportListener) {
206+
QueryType queryType, ActionListener<TransportPPLQueryResponse> transportListener) {
194207
ResponseFormatter<QueryResult> formatter = new JdbcResponseFormatter(PRETTY);
195208
return new ResponseListener<QueryResponse>() {
196209
@Override
197210
public void onResponse(QueryResponse response) {
198-
long execTime = System.nanoTime();
199-
LOG.info(
200-
"[unified] Execution completed in {}ms, {} rows returned",
201-
(execTime - planEndTime) / 1_000_000,
202-
response.getResults().size());
203211
LangSpec langSpec = queryType == QueryType.PPL ? PPL_SPEC : LangSpec.SQL_SPEC;
204212
String result =
205213
formatter.format(
@@ -215,11 +223,6 @@ public void onFailure(Exception e) {
215223
};
216224
}
217225

218-
/**
219-
* Capture current thread context and restore it on the worker thread. Ensures security context
220-
* (user identity, permissions) is propagated. Same pattern as {@link
221-
* org.opensearch.sql.opensearch.executor.OpenSearchQueryManager#withCurrentContext}.
222-
*/
223226
private static Runnable withCurrentContext(final Runnable task) {
224227
final Map<String, String> currentContext = ThreadContext.getImmutableContext();
225228
return () -> {

plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubIndexDetector.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,11 @@ public TransportPPLQueryAction(
8383
b.bind(DataSourceService.class).toInstance(dataSourceService);
8484
});
8585
this.injector = Guice.createInjector(modules);
86-
this.unifiedQueryHandler = new RestUnifiedQueryAction(client, new StubQueryPlanExecutor());
86+
this.unifiedQueryHandler =
87+
new RestUnifiedQueryAction(
88+
client,
89+
new StubQueryPlanExecutor(),
90+
new OpenSearchSettings(clusterService.getClusterSettings()));
8791
this.pplEnabled =
8892
() ->
8993
MULTI_ALLOW_EXPLICIT_INDEX.get(clusterSettings)
@@ -130,7 +134,7 @@ protected void doExecute(
130134
ActionListener<TransportPPLQueryResponse> clearingListener = wrapWithProfilingClear(listener);
131135

132136
// Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices
133-
if (RestUnifiedQueryAction.isAnalyticsIndex(transformedRequest.getRequest())) {
137+
if (unifiedQueryHandler.isAnalyticsIndex(transformedRequest.getRequest())) {
134138
if (transformedRequest.isExplainRequest()) {
135139
unifiedQueryHandler.explain(
136140
transformedRequest.getRequest(),

plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,39 @@
77

88
import static org.junit.Assert.assertFalse;
99
import static org.junit.Assert.assertTrue;
10+
import static org.mockito.Mockito.mock;
1011

12+
import org.junit.Before;
1113
import org.junit.Test;
14+
import org.opensearch.sql.common.setting.Settings;
15+
import org.opensearch.sql.executor.analytics.QueryPlanExecutor;
16+
import org.opensearch.transport.client.node.NodeClient;
1217

1318
/**
14-
* Tests for analytics index routing in RestUnifiedQueryAction. Index name extraction will be
15-
* replaced by UnifiedQueryParser -- these tests focus on routing behavior only.
19+
* Tests for analytics index routing in RestUnifiedQueryAction. Uses PPLQueryParser for AST-based
20+
* index name extraction.
1621
*/
1722
public class RestUnifiedQueryActionTest {
1823

24+
private RestUnifiedQueryAction action;
25+
26+
@Before
27+
public void setUp() {
28+
action =
29+
new RestUnifiedQueryAction(
30+
mock(NodeClient.class), mock(QueryPlanExecutor.class), mock(Settings.class));
31+
}
32+
1933
@Test
2034
public void parquetIndexRoutesToAnalytics() {
21-
assertTrue(RestUnifiedQueryAction.isAnalyticsIndex("source = parquet_logs | fields ts"));
22-
assertTrue(
23-
RestUnifiedQueryAction.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts"));
35+
assertTrue(action.isAnalyticsIndex("source = parquet_logs | fields ts"));
36+
assertTrue(action.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts"));
2437
}
2538

2639
@Test
2740
public void nonParquetIndexRoutesToLucene() {
28-
assertFalse(RestUnifiedQueryAction.isAnalyticsIndex("source = my_logs | fields ts"));
29-
assertFalse(RestUnifiedQueryAction.isAnalyticsIndex(null));
30-
assertFalse(RestUnifiedQueryAction.isAnalyticsIndex(""));
41+
assertFalse(action.isAnalyticsIndex("source = my_logs | fields ts"));
42+
assertFalse(action.isAnalyticsIndex(null));
43+
assertFalse(action.isAnalyticsIndex(""));
3144
}
3245
}

0 commit comments

Comments
 (0)