Skip to content

Commit e921691

Browse files
committed
Rename plugin interfaces and default implementations.
Wire up a ppl front-end using UnifiedQueryAPI from sql plugin. Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent 937ad71 commit e921691

File tree

106 files changed

+5252
-2116
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

106 files changed

+5252
-2116
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Runtime module for the query engine.
11+
* Provides scheduler, driver, transport actions, and operator implementations.
12+
* Depends on libs/query-engine-framework for types and interfaces.
13+
*/
14+
15+
opensearchplugin {
16+
description = 'Query engine runtime module: scheduler, driver, operators, transport.'
17+
classname = 'org.opensearch.fe.ppl.DefaultQueryEnginePlugin'
18+
}
19+
20+
def calciteVersion = '1.41.0'
21+
22+
dependencies {
23+
implementation "org.apache.calcite:calcite-core:${calciteVersion}"
24+
implementation "org.apache.calcite:calcite-linq4j:${calciteVersion}"
25+
implementation "org.apache.calcite.avatica:avatica-core:1.27.0"
26+
implementation "com.google.guava:guava:${versions.guava}"
27+
runtimeOnly "com.google.guava:failureaccess:1.0.2"
28+
29+
// Calcite runtime dependencies
30+
runtimeOnly "org.jooq:joou-java-6:0.9.4"
31+
runtimeOnly "com.jayway.jsonpath:json-path:2.9.0"
32+
runtimeOnly "org.codehaus.janino:janino:3.1.12"
33+
runtimeOnly "org.codehaus.janino:commons-compiler:3.1.12"
34+
runtimeOnly "org.apache.commons:commons-math3:3.6.1"
35+
runtimeOnly "org.slf4j:slf4j-api:${versions.slf4j}"
36+
runtimeOnly "commons-codec:commons-codec:${versions.commonscodec}"
37+
runtimeOnly "org.locationtech.jts:jts-core:${versions.jts}"
38+
runtimeOnly 'com.esri.geometry:esri-geometry-api:2.2.4'
39+
40+
compileOnly 'org.checkerframework:checker-qual:3.43.0'
41+
}
42+
43+
configurations.all {
44+
resolutionStrategy {
45+
force 'com.google.guava:guava:33.4.0-jre'
46+
force 'com.google.guava:failureaccess:1.0.2'
47+
force 'com.google.errorprone:error_prone_annotations:2.36.0'
48+
force 'org.checkerframework:checker-qual:3.43.0'
49+
}
50+
}
51+
52+
// Suppress -Werror and annotation warnings from Arrow dependency jars
53+
tasks.withType(JavaCompile).configureEach {
54+
doFirst {
55+
options.compilerArgs.removeAll(['-Werror'])
56+
options.compilerArgs.removeIf { it.startsWith('-Xlint') || it.startsWith('-Xdoclint') }
57+
options.compilerArgs << '-Xlint:none'
58+
}
59+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.fe.ppl;
10+
11+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
12+
import org.opensearch.cluster.service.ClusterService;
13+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
14+
import org.opensearch.core.xcontent.NamedXContentRegistry;
15+
import org.opensearch.env.Environment;
16+
import org.opensearch.env.NodeEnvironment;
17+
import org.opensearch.plugins.ActionPlugin;
18+
import org.opensearch.plugins.AnalyticsBackEndPlugin;
19+
import org.opensearch.plugins.Plugin;
20+
import org.opensearch.plugins.QueryEnginePlugin;
21+
import org.opensearch.plugins.QueryPlanExecutor;
22+
import org.opensearch.repositories.RepositoriesService;
23+
import org.opensearch.script.ScriptService;
24+
import org.opensearch.threadpool.ThreadPool;
25+
import org.opensearch.transport.client.Client;
26+
import org.opensearch.watcher.ResourceWatcherService;
27+
28+
import java.util.Collection;
29+
import java.util.List;
30+
import java.util.function.Supplier;
31+
32+
/**
33+
* Java-based Distributed Query Executor implementation.
34+
* Evolves from the former query-engine module plugin. Implements the {@link QueryEnginePlugin}
35+
* interface as the first concrete DQE implementation.
36+
*
37+
* <p>Responsibilities:
38+
* <ul>
39+
* <li>Create the {@link PlanExecutor} as the {@link QueryPlanExecutor} implementation</li>
40+
* <li>Expose via {@link #getExecutor()}
41+
* </ul>
42+
*/
43+
public class AnalyticsEnginePlugin extends Plugin implements QueryEnginePlugin, ActionPlugin {
44+
45+
private PlanExecutor queryExecutorService;
46+
47+
@Override
48+
public Collection<Object> createComponents(
49+
Client client,
50+
ClusterService clusterService,
51+
ThreadPool threadPool,
52+
ResourceWatcherService resourceWatcherService,
53+
ScriptService scriptService,
54+
NamedXContentRegistry xContentRegistry,
55+
Environment environment,
56+
NodeEnvironment nodeEnvironment,
57+
NamedWriteableRegistry namedWriteableRegistry,
58+
IndexNameExpressionResolver indexNameExpressionResolver,
59+
Supplier<RepositoriesService> repositoriesServiceSupplier,
60+
Collection<AnalyticsBackEndPlugin> backEnds
61+
) {
62+
this.queryExecutorService = new PlanExecutor();
63+
64+
// TODO - do something with back-ends/engine implementations
65+
return List.of(queryExecutorService);
66+
}
67+
68+
@Override
69+
public QueryPlanExecutor getExecutor() {
70+
return queryExecutorService;
71+
}
72+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.fe.ppl;
10+
11+
import org.apache.calcite.rel.RelNode;
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.opensearch.plugins.QueryPlanExecutor;
15+
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
19+
/**
20+
* {@link QueryPlanExecutor} implementation in the query-engine module.
21+
* Translates logical fragments (RelNode trees) into OpenSearch query
22+
* operations and returns result rows.
23+
*
24+
* <p>The parameters are typed as {@code Object} because the {@link QueryPlanExecutor}
25+
* interface lives in {@code server/plugins} which must not depend on Calcite.
26+
* This implementation casts them to the appropriate Calcite types internally.</p>
27+
*
28+
* <p>Currently a stub that logs the received logical plan and returns
29+
* placeholder rows matching the fragment's field count.</p>
30+
* // TODO: call this something.. better
31+
*/
32+
public class PlanExecutor implements QueryPlanExecutor {
33+
34+
private static final Logger logger = LogManager.getLogger(PlanExecutor.class);
35+
36+
@Override
37+
public Iterable<Object[]> execute(Object logicalFragment, Object context) {
38+
// TODO: This is a stub for now, just logs the RelNode fragment.
39+
RelNode fragment = (RelNode) logicalFragment;
40+
int fieldCount = fragment.getRowType().getFieldCount();
41+
42+
logger.info("[PlanExecutor] Executing fragment with {} fields: {}", fieldCount, fragment.explain());
43+
44+
// Stub: return an empty result set.
45+
// A real implementation would translate the fragment into OpenSearch
46+
// query operations, execute against OpenSearch shards, and return rows.
47+
List<Object[]> rows = new ArrayList<>();
48+
return rows;
49+
}
50+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.fe.ppl;
10+
11+
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
12+
import org.apache.calcite.plan.RelOptCluster;
13+
import org.apache.calcite.plan.RelTraitSet;
14+
import org.apache.calcite.plan.hep.HepPlanner;
15+
import org.apache.calcite.plan.hep.HepProgramBuilder;
16+
import org.apache.calcite.rel.AbstractRelNode;
17+
import org.apache.calcite.rel.RelNode;
18+
import org.apache.calcite.rel.type.RelDataType;
19+
import org.apache.calcite.rel.type.RelDataTypeFactory;
20+
import org.apache.calcite.rex.RexBuilder;
21+
import org.apache.calcite.sql.type.SqlTypeName;
22+
import org.opensearch.test.OpenSearchTestCase;
23+
24+
/**
25+
* Tests for {@link PlanExecutor}.
26+
*/
27+
public class PlanExecutorTests extends OpenSearchTestCase {
28+
29+
private RelDataTypeFactory typeFactory;
30+
private RelOptCluster cluster;
31+
32+
@Override
33+
public void setUp() throws Exception {
34+
super.setUp();
35+
typeFactory = new JavaTypeFactoryImpl();
36+
RexBuilder rexBuilder = new RexBuilder(typeFactory);
37+
HepPlanner planner = new HepPlanner(new HepProgramBuilder().build());
38+
cluster = RelOptCluster.create(planner, rexBuilder);
39+
}
40+
41+
/**
42+
* Test that execute() returns an iterable and does not throw for a valid fragment.
43+
*/
44+
public void testExecuteReturnsIterableForValidFragment() {
45+
PlanExecutor service = new PlanExecutor();
46+
47+
RelNode fragment = createRelNodeWithFieldCount(3);
48+
Object context = new Object();
49+
50+
Iterable<Object[]> result = service.execute(fragment, context);
51+
assertNotNull("execute() should return a non-null Iterable", result);
52+
}
53+
54+
/**
55+
* Test that returned rows have correct length matching the fragment's field count.
56+
*/
57+
public void testReturnedRowsMatchFieldCount() {
58+
PlanExecutor service = new PlanExecutor();
59+
60+
int fieldCount = 5;
61+
RelNode fragment = createRelNodeWithFieldCount(fieldCount);
62+
Object context = new Object();
63+
64+
Iterable<Object[]> result = service.execute(fragment, context);
65+
for (Object[] row : result) {
66+
assertEquals("Each row should have length equal to fragment field count", fieldCount, row.length);
67+
}
68+
}
69+
70+
/**
71+
* Test that execute() works with a single-field fragment.
72+
*/
73+
public void testExecuteWithSingleFieldFragment() {
74+
PlanExecutor service = new PlanExecutor();
75+
76+
RelNode fragment = createRelNodeWithFieldCount(1);
77+
Object context = new Object();
78+
79+
Iterable<Object[]> result = service.execute(fragment, context);
80+
assertNotNull(result);
81+
for (Object[] row : result) {
82+
assertEquals(1, row.length);
83+
}
84+
}
85+
86+
/**
87+
* Test that execute() casts the logicalFragment to RelNode.
88+
* Passing a non-RelNode should throw ClassCastException.
89+
*/
90+
public void testExecuteThrowsOnInvalidFragmentType() {
91+
PlanExecutor service = new PlanExecutor();
92+
93+
expectThrows(ClassCastException.class, () -> { service.execute("not a RelNode", new Object()); });
94+
}
95+
96+
private RelNode createRelNodeWithFieldCount(int fieldCount) {
97+
RelDataType rowType = buildRowType(fieldCount);
98+
return new StubRelNode(cluster, cluster.traitSet(), rowType);
99+
}
100+
101+
private RelDataType buildRowType(int fieldCount) {
102+
RelDataTypeFactory.Builder builder = typeFactory.builder();
103+
for (int i = 0; i < fieldCount; i++) {
104+
builder.add("field_" + i, SqlTypeName.VARCHAR);
105+
}
106+
return builder.build();
107+
}
108+
109+
/**
110+
* Minimal concrete RelNode for testing. Extends AbstractRelNode
111+
* which provides default implementations for all RelNode methods.
112+
*/
113+
private static class StubRelNode extends AbstractRelNode {
114+
StubRelNode(RelOptCluster cluster, RelTraitSet traitSet, RelDataType rowType) {
115+
super(cluster, traitSet);
116+
this.rowType = rowType;
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)