Skip to content

Commit 3190e36

Browse files
committed
Rename plugin interfaces and default implementations.
Wire up a ppl front-end using UnifiedQueryAPI from sql plugin. Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent 937ad71 commit 3190e36

File tree

110 files changed

+5548
-2119
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

110 files changed

+5548
-2119
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
.claude
2+
.kiro
23
CLAUDE.md
34
.cursor*
4-
5+
*.md
56
# intellij files
67
.idea/
78
*.iml

sandbox/libs/build.gradle

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ subprojects {
2020
project.afterEvaluate {
2121
configurations.all { Configuration conf ->
2222
dependencies.matching { it instanceof ProjectDependency }.all { ProjectDependency dep ->
23-
Project depProject = dep.dependencyProject
23+
Project depProject = project.project(dep.path)
2424
if (depProject != null
2525
&& false == depProject.path.equals(':libs:opensearch-core')
26+
&& false == depProject.path.equals(':libs:opensearch-common')
2627
&& depProject.path.startsWith(':libs')) {
2728
throw new InvalidUserDataException("projects in :libs " +
2829
"may not depend on other projects libs except " +
29-
":libs:opensearch-core but " +
30+
":libs:opensearch-core or :libs:opensearch-common but " +
3031
"${project.path} depends on ${depProject.path}")
3132
}
3233
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.engine;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
14+
import org.opensearch.cluster.service.ClusterService;
15+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
16+
import org.opensearch.core.xcontent.NamedXContentRegistry;
17+
import org.opensearch.env.Environment;
18+
import org.opensearch.env.NodeEnvironment;
19+
import org.opensearch.plugins.ActionPlugin;
20+
import org.opensearch.plugins.QueryExecutionPlugin;
21+
import org.opensearch.plugins.Plugin;
22+
import org.opensearch.plugins.QueryBackEndPlugin;
23+
import org.opensearch.plugins.QueryExecutor;
24+
import org.opensearch.engine.engine.EngineDescriptor;
25+
import org.opensearch.engine.exec.OperatorType;
26+
import org.opensearch.repositories.RepositoriesService;
27+
import org.opensearch.script.ScriptService;
28+
import org.opensearch.threadpool.ThreadPool;
29+
import org.opensearch.transport.client.Client;
30+
import org.opensearch.watcher.ResourceWatcherService;
31+
32+
import java.util.Collection;
33+
import java.util.EnumSet;
34+
import java.util.List;
35+
import java.util.Set;
36+
import java.util.function.Supplier;
37+
38+
/**
39+
* Java-based Distributed Query Executor implementation.
40+
* Evolves from the former query-engine module plugin. Implements the {@link QueryExecutionPlugin}
41+
* interface as the first concrete query execution plugin implementation.
42+
*
43+
* <p>Responsibilities:
44+
* <ul>
45+
* <li>Aggregate {@link EngineDescriptor#supportedOperators()} from all
46+
* discovered {@link QueryBackEndPlugin} instances into a combined capability set</li>
47+
* <li>Create the {@link QueryExecutorService} as the {@link QueryExecutor} implementation</li>
48+
* <li>Expose both via {@link #getQueryExecutor()} and {@link #getEngineCapabilities()}</li>
49+
* </ul>
50+
*
51+
* <p>This module has zero core Calcite imports (only linq4j via the adapter dependency).
52+
*/
53+
public class DefaultQueryExecutionEngine extends Plugin implements QueryExecutionPlugin, ActionPlugin {
54+
55+
private static final Logger logger = LogManager.getLogger(DefaultQueryExecutionEngine.class);
56+
57+
private QueryExecutorService queryExecutorService;
58+
private Set<OperatorType> aggregatedCapabilities;
59+
60+
@Override
61+
public Collection<Object> createComponents(
62+
Client client,
63+
ClusterService clusterService,
64+
ThreadPool threadPool,
65+
ResourceWatcherService resourceWatcherService,
66+
ScriptService scriptService,
67+
NamedXContentRegistry xContentRegistry,
68+
Environment environment,
69+
NodeEnvironment nodeEnvironment,
70+
NamedWriteableRegistry namedWriteableRegistry,
71+
IndexNameExpressionResolver indexNameExpressionResolver,
72+
Supplier<RepositoriesService> repositoriesServiceSupplier,
73+
Collection<QueryBackEndPlugin> backEnds
74+
) {
75+
// Aggregate back-end capabilities
76+
this.aggregatedCapabilities = aggregateCapabilities(backEnds);
77+
78+
// Create QueryExecutorService as the DQEExecutor implementation
79+
this.queryExecutorService = new QueryExecutorService();
80+
81+
return List.of(queryExecutorService);
82+
}
83+
84+
@Override
85+
public QueryExecutor getQueryExecutor() {
86+
return queryExecutorService;
87+
}
88+
89+
@Override
90+
public Object getEngineCapabilities() {
91+
return aggregatedCapabilities;
92+
}
93+
94+
/**
95+
* Aggregates supported operator types from all back-end plugins.
96+
* Each back-end that implements {@link EngineDescriptor} contributes its
97+
* {@link EngineDescriptor#supportedOperators()} to the union set.
98+
*
99+
* @param backEnds the discovered back-end plugins
100+
* @return the union of all back-end supported operator types
101+
*/
102+
Set<OperatorType> aggregateCapabilities(Collection<QueryBackEndPlugin> backEnds) {
103+
if (backEnds == null || backEnds.isEmpty()) {
104+
logger.warn("No QueryBackEndPlugin instances found; continuing without back-end engine support");
105+
return Set.of();
106+
}
107+
108+
EnumSet<OperatorType> combined = EnumSet.noneOf(OperatorType.class);
109+
for (QueryBackEndPlugin backEnd : backEnds) {
110+
if (backEnd instanceof EngineDescriptor) {
111+
EngineDescriptor descriptor = (EngineDescriptor) backEnd;
112+
Set<OperatorType> ops = descriptor.supportedOperators();
113+
if (ops != null) {
114+
combined.addAll(ops);
115+
}
116+
logger.info("Registered back-end engine '{}' with operators: {}", descriptor.name(), ops);
117+
} else {
118+
logger.warn(
119+
"QueryBackEndPlugin '{}' does not implement EngineDescriptor; skipping capability extraction",
120+
backEnd.getClass().getName()
121+
);
122+
}
123+
}
124+
125+
return Set.copyOf(combined);
126+
}
127+
}
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.engine;
10+
11+
import org.opensearch.plugins.ActionPlugin;
12+
import org.opensearch.plugins.QueryExecutionPlugin;
13+
import org.opensearch.plugins.QueryBackEndPlugin;
14+
import org.opensearch.plugins.QueryExecutor;
15+
import org.opensearch.engine.engine.EngineBridge;
16+
import org.opensearch.engine.engine.EngineDescriptor;
17+
import org.opensearch.engine.exec.OperatorType;
18+
import org.opensearch.test.OpenSearchTestCase;
19+
20+
import java.util.Collection;
21+
import java.util.List;
22+
import java.util.Set;
23+
24+
/**
25+
* Tests for {@link DefaultQueryExecutionEngine}.
26+
*/
27+
public class DefaultQueryExecutionEngineTests extends OpenSearchTestCase {
28+
29+
public void testImplementsQueryExecutionPlugin() {
30+
DefaultQueryExecutionEngine dqe = new DefaultQueryExecutionEngine();
31+
assertTrue(dqe instanceof QueryExecutionPlugin);
32+
}
33+
34+
public void testImplementsActionPlugin() {
35+
DefaultQueryExecutionEngine dqe = new DefaultQueryExecutionEngine();
36+
assertTrue(dqe instanceof ActionPlugin);
37+
}
38+
39+
public void testCreateComponentsReturnsExecutorService() {
40+
DefaultQueryExecutionEngine dqe = new DefaultQueryExecutionEngine();
41+
Collection<Object> components = dqe.createComponents(
42+
null, null, null, null, null, null, null, null, null, null, null,
43+
List.of()
44+
);
45+
assertEquals(1, components.size());
46+
assertTrue(components.iterator().next() instanceof QueryExecutorService);
47+
}
48+
49+
public void testGetQueryExecutorReturnsNonNullAfterCreateComponents() {
50+
DefaultQueryExecutionEngine dqe = new DefaultQueryExecutionEngine();
51+
dqe.createComponents(null, null, null, null, null, null, null, null, null, null, null, List.of());
52+
QueryExecutor executor = dqe.getQueryExecutor();
53+
assertNotNull(executor);
54+
assertTrue(executor instanceof QueryExecutorService);
55+
}
56+
57+
public void testGetEngineCapabilitiesReturnsNonNullAfterCreateComponents() {
58+
DefaultQueryExecutionEngine dqe = new DefaultQueryExecutionEngine();
59+
dqe.createComponents(null, null, null, null, null, null, null, null, null, null, null, List.of());
60+
Object capabilities = dqe.getEngineCapabilities();
61+
assertNotNull(capabilities);
62+
}
63+
64+
public void testNoBackEndsLogsWarningAndContinues() {
65+
DefaultQueryExecutionEngine dqe = new DefaultQueryExecutionEngine();
66+
Collection<Object> components = dqe.createComponents(
67+
null, null, null, null, null, null, null, null, null, null, null,
68+
List.of()
69+
);
70+
// Should succeed without error
71+
assertNotNull(components);
72+
assertNotNull(dqe.getQueryExecutor());
73+
Object caps = dqe.getEngineCapabilities();
74+
assertNotNull(caps);
75+
assertTrue(caps instanceof Set);
76+
assertTrue(((Set<?>) caps).isEmpty());
77+
}
78+
79+
@SuppressWarnings("unchecked")
80+
public void testAggregatesCapabilitiesFromBackEnds() {
81+
DefaultQueryExecutionEngine dqe = new DefaultQueryExecutionEngine();
82+
83+
QueryBackEndPlugin backEnd1 = createMockBackEnd("engine1", Set.of(OperatorType.SCAN, OperatorType.FILTER));
84+
QueryBackEndPlugin backEnd2 = createMockBackEnd("engine2", Set.of(OperatorType.AGGREGATE, OperatorType.SORT));
85+
86+
dqe.createComponents(
87+
null, null, null, null, null, null, null, null, null, null, null,
88+
List.of(backEnd1, backEnd2)
89+
);
90+
91+
Object caps = dqe.getEngineCapabilities();
92+
assertNotNull(caps);
93+
Set<OperatorType> capSet = (Set<OperatorType>) caps;
94+
assertEquals(Set.of(OperatorType.SCAN, OperatorType.FILTER, OperatorType.AGGREGATE, OperatorType.SORT), capSet);
95+
}
96+
97+
@SuppressWarnings("unchecked")
98+
public void testAggregatesOverlappingCapabilities() {
99+
DefaultQueryExecutionEngine dqe = new DefaultQueryExecutionEngine();
100+
101+
QueryBackEndPlugin backEnd1 = createMockBackEnd("engine1", Set.of(OperatorType.SCAN, OperatorType.FILTER));
102+
QueryBackEndPlugin backEnd2 = createMockBackEnd("engine2", Set.of(OperatorType.FILTER, OperatorType.SORT));
103+
104+
dqe.createComponents(
105+
null, null, null, null, null, null, null, null, null, null, null,
106+
List.of(backEnd1, backEnd2)
107+
);
108+
109+
Set<OperatorType> capSet = (Set<OperatorType>) dqe.getEngineCapabilities();
110+
assertEquals(Set.of(OperatorType.SCAN, OperatorType.FILTER, OperatorType.SORT), capSet);
111+
}
112+
113+
public void testGetQueryExecutorReturnsNullBeforeCreateComponents() {
114+
DefaultQueryExecutionEngine dqe = new DefaultQueryExecutionEngine();
115+
assertNull(dqe.getQueryExecutor());
116+
}
117+
118+
public void testGetEngineCapabilitiesReturnsNullBeforeCreateComponents() {
119+
DefaultQueryExecutionEngine dqe = new DefaultQueryExecutionEngine();
120+
assertNull(dqe.getEngineCapabilities());
121+
}
122+
123+
public void testCloseDoesNotThrow() throws Exception {
124+
DefaultQueryExecutionEngine dqe = new DefaultQueryExecutionEngine();
125+
// Should not throw even when nothing is initialized
126+
dqe.close();
127+
}
128+
129+
130+
/**
131+
* Creates a mock QueryBackEndPlugin that also implements EngineDescriptor.
132+
*/
133+
private static QueryBackEndPlugin createMockBackEnd(String name, Set<OperatorType> operators) {
134+
return new MockBackEndPlugin(name, operators);
135+
}
136+
137+
/**
138+
* A test back-end plugin that implements both QueryBackEndPlugin and EngineDescriptor.
139+
*/
140+
private static class MockBackEndPlugin implements QueryBackEndPlugin, EngineDescriptor {
141+
private final String name;
142+
private final Set<OperatorType> operators;
143+
144+
MockBackEndPlugin(String name, Set<OperatorType> operators) {
145+
this.name = name;
146+
this.operators = operators;
147+
}
148+
149+
@Override
150+
public String name() {
151+
return name;
152+
}
153+
154+
@Override
155+
public Set<OperatorType> supportedOperators() {
156+
return operators;
157+
}
158+
159+
@Override
160+
public EngineBridge bridge() {
161+
return null;
162+
}
163+
}
164+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Runtime module for the query engine.
11+
* Provides scheduler, driver, transport actions, and operator implementations.
12+
* Depends on libs/query-engine-framework for types and interfaces.
13+
*/
14+
15+
opensearchplugin {
16+
description = 'Query engine runtime module: scheduler, driver, operators, transport.'
17+
classname = 'org.opensearch.fe.ppl.DefaultQueryEnginePlugin'
18+
}
19+
20+
def calciteVersion = '1.41.0'
21+
22+
dependencies {
23+
implementation "org.apache.calcite:calcite-core:${calciteVersion}"
24+
implementation "org.apache.calcite:calcite-linq4j:${calciteVersion}"
25+
implementation "org.apache.calcite.avatica:avatica-core:1.27.0"
26+
implementation "com.google.guava:guava:${versions.guava}"
27+
runtimeOnly "com.google.guava:failureaccess:1.0.2"
28+
29+
// Calcite runtime dependencies
30+
runtimeOnly "org.jooq:joou-java-6:0.9.4"
31+
runtimeOnly "com.jayway.jsonpath:json-path:2.9.0"
32+
runtimeOnly "org.codehaus.janino:janino:3.1.12"
33+
runtimeOnly "org.codehaus.janino:commons-compiler:3.1.12"
34+
runtimeOnly "org.apache.commons:commons-math3:3.6.1"
35+
runtimeOnly "org.slf4j:slf4j-api:${versions.slf4j}"
36+
runtimeOnly "commons-codec:commons-codec:${versions.commonscodec}"
37+
runtimeOnly "org.locationtech.jts:jts-core:${versions.jts}"
38+
runtimeOnly 'com.esri.geometry:esri-geometry-api:2.2.4'
39+
40+
compileOnly 'org.checkerframework:checker-qual:3.43.0'
41+
}
42+
43+
configurations.all {
44+
resolutionStrategy {
45+
force 'com.google.guava:guava:33.4.0-jre'
46+
force 'com.google.guava:failureaccess:1.0.2'
47+
force 'com.google.errorprone:error_prone_annotations:2.36.0'
48+
force 'org.checkerframework:checker-qual:3.43.0'
49+
}
50+
}
51+
52+
// Suppress -Werror and annotation warnings from Arrow dependency jars
53+
tasks.withType(JavaCompile).configureEach {
54+
doFirst {
55+
options.compilerArgs.removeAll(['-Werror'])
56+
options.compilerArgs.removeIf { it.startsWith('-Xlint') || it.startsWith('-Xdoclint') }
57+
options.compilerArgs << '-Xlint:none'
58+
}
59+
}

0 commit comments

Comments
 (0)