Skip to content

Commit 3e3321c

Browse files
committed
address comments
Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent fc9261e commit 3e3321c

File tree

5 files changed

+205
-29
lines changed

5 files changed

+205
-29
lines changed

core/src/main/java/org/opensearch/sql/executor/DelegatingExecutionEngine.java

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.sql.executor;
77

88
import java.util.List;
9+
import java.util.Optional;
910
import lombok.RequiredArgsConstructor;
1011
import lombok.extern.log4j.Log4j2;
1112
import org.apache.calcite.rel.RelNode;
@@ -16,8 +17,9 @@
1617

1718
/**
1819
* An {@link ExecutionEngine} that delegates Calcite RelNode execution to the first extension whose
19-
* {@link ExecutionEngine#canVectorize(RelNode)} returns {@code true}, falling back to a default
20-
* engine otherwise. All non-Calcite methods are forwarded directly to the default engine.
20+
* {@link ExecutionEngine#canVectorize(RelNode)} returns {@code true}, falling back to the default
21+
* engine otherwise. Non-Calcite ({@link PhysicalPlan}) methods and unmatched RelNode plans are
22+
* forwarded to the default engine.
2123
*/
2224
@RequiredArgsConstructor
2325
@Log4j2
@@ -28,41 +30,35 @@ public class DelegatingExecutionEngine implements ExecutionEngine {
2830

2931
@Override
3032
public void execute(PhysicalPlan plan, ResponseListener<QueryResponse> listener) {
31-
throw new UnsupportedOperationException("DelegatingExecutionEngine only accepts RelNode");
33+
defaultEngine.execute(plan, listener);
3234
}
3335

3436
@Override
3537
public void execute(
3638
PhysicalPlan plan, ExecutionContext context, ResponseListener<QueryResponse> listener) {
37-
throw new UnsupportedOperationException("DelegatingExecutionEngine only accepts RelNode");
39+
defaultEngine.execute(plan, context, listener);
3840
}
3941

4042
@Override
4143
public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listener) {
42-
throw new UnsupportedOperationException("DelegatingExecutionEngine only accepts RelNode");
44+
defaultEngine.explain(plan, listener);
4345
}
4446

4547
@Override
4648
public boolean canVectorize(RelNode plan) {
47-
for (ExecutionEngine ext : extensions) {
48-
if (ext.canVectorize(plan)) {
49-
return true;
50-
}
51-
}
52-
return false;
49+
return findExtension(plan).isPresent();
5350
}
5451

5552
@Override
5653
public void execute(
5754
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
58-
for (ExecutionEngine ext : extensions) {
59-
if (ext.canVectorize(plan)) {
60-
log.info("Routing query to extension engine : {}", ext.getClass().getSimpleName());
61-
ext.execute(plan, context, listener);
62-
return;
63-
}
55+
Optional<ExecutionEngine> ext = findExtension(plan);
56+
if (ext.isPresent()) {
57+
log.info("Routing query to extension engine : {}", ext.get().getClass().getSimpleName());
58+
ext.get().execute(plan, context, listener);
59+
} else {
60+
defaultEngine.execute(plan, context, listener);
6461
}
65-
defaultEngine.execute(plan, context, listener);
6662
}
6763

6864
@Override
@@ -71,12 +67,15 @@ public void explain(
7167
ExplainMode mode,
7268
CalcitePlanContext context,
7369
ResponseListener<ExplainResponse> listener) {
74-
for (ExecutionEngine ext : extensions) {
75-
if (ext.canVectorize(plan)) {
76-
ext.explain(plan, mode, context, listener);
77-
return;
78-
}
70+
Optional<ExecutionEngine> ext = findExtension(plan);
71+
if (ext.isPresent()) {
72+
ext.get().explain(plan, mode, context, listener);
73+
} else {
74+
defaultEngine.explain(plan, mode, context, listener);
7975
}
80-
defaultEngine.explain(plan, mode, context, listener);
76+
}
77+
78+
private Optional<ExecutionEngine> findExtension(RelNode plan) {
79+
return extensions.stream().filter(ext -> ext.canVectorize(plan)).findFirst();
8180
}
8281
}

core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,21 @@ default boolean canVectorize(RelNode plan) {
5858

5959
/** Execute calcite RelNode plan with {@link ExecutionContext} and call back response listener. */
6060
default void execute(
61-
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {}
61+
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
62+
listener.onFailure(
63+
new UnsupportedOperationException(
64+
getClass().getSimpleName() + " does not support RelNode execution"));
65+
}
6266

6367
default void explain(
6468
RelNode plan,
6569
ExplainMode mode,
6670
CalcitePlanContext context,
67-
ResponseListener<ExplainResponse> listener) {}
71+
ResponseListener<ExplainResponse> listener) {
72+
listener.onFailure(
73+
new UnsupportedOperationException(
74+
getClass().getSimpleName() + " does not support RelNode explain"));
75+
}
6876

6977
/** Data class that encapsulates ExprValue. */
7078
@Data
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.executor;
7+
8+
import static org.mockito.ArgumentMatchers.any;
9+
import static org.mockito.ArgumentMatchers.eq;
10+
import static org.mockito.Mockito.never;
11+
import static org.mockito.Mockito.verify;
12+
import static org.mockito.Mockito.when;
13+
14+
import java.util.List;
15+
import org.apache.calcite.rel.RelNode;
16+
import org.junit.jupiter.api.Test;
17+
import org.junit.jupiter.api.extension.ExtendWith;
18+
import org.mockito.Mock;
19+
import org.mockito.junit.jupiter.MockitoExtension;
20+
import org.opensearch.sql.ast.statement.ExplainMode;
21+
import org.opensearch.sql.calcite.CalcitePlanContext;
22+
import org.opensearch.sql.common.response.ResponseListener;
23+
import org.opensearch.sql.planner.physical.PhysicalPlan;
24+
25+
@ExtendWith(MockitoExtension.class)
26+
class DelegatingExecutionEngineTest {
27+
28+
@Mock private ExecutionEngine defaultEngine;
29+
30+
@Mock private ExecutionEngine extension1;
31+
32+
@Mock private ExecutionEngine extension2;
33+
34+
@Mock private RelNode relNode;
35+
36+
@Mock private CalcitePlanContext calciteContext;
37+
38+
@Mock private PhysicalPlan physicalPlan;
39+
40+
@Mock private ExecutionContext executionContext;
41+
42+
@Mock private ResponseListener<ExecutionEngine.QueryResponse> queryListener;
43+
44+
@Mock private ResponseListener<ExecutionEngine.ExplainResponse> explainListener;
45+
46+
@Test
47+
void executeRelNodeRoutesToMatchingExtension() {
48+
when(extension1.canVectorize(relNode)).thenReturn(true);
49+
DelegatingExecutionEngine engine =
50+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2));
51+
52+
engine.execute(relNode, calciteContext, queryListener);
53+
54+
verify(extension1).execute(relNode, calciteContext, queryListener);
55+
verify(defaultEngine, never()).execute(any(RelNode.class), any(), eq(queryListener));
56+
}
57+
58+
@Test
59+
void executeRelNodeFallsBackToDefaultWhenNoExtensionMatches() {
60+
when(extension1.canVectorize(relNode)).thenReturn(false);
61+
when(extension2.canVectorize(relNode)).thenReturn(false);
62+
DelegatingExecutionEngine engine =
63+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2));
64+
65+
engine.execute(relNode, calciteContext, queryListener);
66+
67+
verify(defaultEngine).execute(relNode, calciteContext, queryListener);
68+
verify(extension1, never()).execute(any(RelNode.class), any(), eq(queryListener));
69+
verify(extension2, never()).execute(any(RelNode.class), any(), eq(queryListener));
70+
}
71+
72+
@Test
73+
void executeRelNodeRoutesToFirstMatchingExtension() {
74+
when(extension1.canVectorize(relNode)).thenReturn(true);
75+
DelegatingExecutionEngine engine =
76+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2));
77+
78+
engine.execute(relNode, calciteContext, queryListener);
79+
80+
verify(extension1).execute(relNode, calciteContext, queryListener);
81+
verify(extension2, never()).execute(any(RelNode.class), any(), eq(queryListener));
82+
}
83+
84+
@Test
85+
void explainRelNodeRoutesToMatchingExtension() {
86+
when(extension1.canVectorize(relNode)).thenReturn(true);
87+
DelegatingExecutionEngine engine =
88+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));
89+
90+
engine.explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener);
91+
92+
verify(extension1).explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener);
93+
verify(defaultEngine, never()).explain(any(RelNode.class), any(), any(), eq(explainListener));
94+
}
95+
96+
@Test
97+
void explainRelNodeFallsBackToDefaultWhenNoExtensionMatches() {
98+
when(extension1.canVectorize(relNode)).thenReturn(false);
99+
DelegatingExecutionEngine engine =
100+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));
101+
102+
engine.explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener);
103+
104+
verify(defaultEngine).explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener);
105+
}
106+
107+
@Test
108+
void canVectorizeReturnsTrueWhenExtensionMatches() {
109+
when(extension1.canVectorize(relNode)).thenReturn(false);
110+
when(extension2.canVectorize(relNode)).thenReturn(true);
111+
DelegatingExecutionEngine engine =
112+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2));
113+
114+
assert engine.canVectorize(relNode);
115+
}
116+
117+
@Test
118+
void canVectorizeReturnsFalseWhenNoExtensionMatches() {
119+
when(extension1.canVectorize(relNode)).thenReturn(false);
120+
DelegatingExecutionEngine engine =
121+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));
122+
123+
assert !engine.canVectorize(relNode);
124+
}
125+
126+
@Test
127+
void physicalPlanExecuteDelegatesToDefault() {
128+
DelegatingExecutionEngine engine =
129+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));
130+
131+
engine.execute(physicalPlan, queryListener);
132+
133+
verify(defaultEngine).execute(physicalPlan, queryListener);
134+
}
135+
136+
@Test
137+
void physicalPlanExecuteWithContextDelegatesToDefault() {
138+
DelegatingExecutionEngine engine =
139+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));
140+
141+
engine.execute(physicalPlan, executionContext, queryListener);
142+
143+
verify(defaultEngine).execute(physicalPlan, executionContext, queryListener);
144+
}
145+
146+
@Test
147+
void physicalPlanExplainDelegatesToDefault() {
148+
DelegatingExecutionEngine engine =
149+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));
150+
151+
engine.explain(physicalPlan, explainListener);
152+
153+
verify(defaultEngine).explain(physicalPlan, explainListener);
154+
}
155+
156+
@Test
157+
void emptyExtensionsListAlwaysFallsBackToDefault() {
158+
DelegatingExecutionEngine engine = new DelegatingExecutionEngine(defaultEngine, List.of());
159+
160+
engine.execute(relNode, calciteContext, queryListener);
161+
162+
verify(defaultEngine).execute(relNode, calciteContext, queryListener);
163+
}
164+
}

plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,9 @@ public String description() {
158158

159159
@Override
160160
public void loadExtensions(ExtensionLoader loader) {
161-
this.executionEngineExtensions = loader.loadExtensions(ExecutionEngine.class);
162-
if (executionEngineExtensions != null && !executionEngineExtensions.isEmpty()) {
161+
List<ExecutionEngine> loaded = loader.loadExtensions(ExecutionEngine.class);
162+
this.executionEngineExtensions = loaded != null ? List.copyOf(loaded) : List.of();
163+
if (!executionEngineExtensions.isEmpty()) {
163164
LOGGER.info(
164165
"Loaded {} execution engine extension(s): {}",
165166
executionEngineExtensions.size(),

plugin/src/main/java/org/opensearch/sql/plugin/config/EngineExtensionsHolder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,8 @@
1313
* so that OpenSearch's Guice injector can inject it into transport actions like {@code
1414
* TransportPPLQueryAction}.
1515
*/
16-
public record EngineExtensionsHolder(List<ExecutionEngine> engines) {}
16+
public record EngineExtensionsHolder(List<ExecutionEngine> engines) {
17+
public EngineExtensionsHolder(List<ExecutionEngine> engines) {
18+
this.engines = engines != null ? List.copyOf(engines) : List.of();
19+
}
20+
}

0 commit comments

Comments
 (0)