Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ dependencies {
}
api 'org.apache.calcite:calcite-linq4j:1.41.0'
api project(':common')
compileOnly files("${rootDir}/libs/analytics-framework-3.6.0-SNAPSHOT.jar")
testImplementation files("${rootDir}/libs/analytics-framework-3.6.0-SNAPSHOT.jar")
implementation "com.github.seancfoley:ipaddress:5.4.2"
implementation "com.jayway.jsonpath:json-path:2.9.0"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,143 @@ public Type getElementType() {
}
};
}
return super.implement(root);
return implementEnumerable(root);
}

/**
* Implements the Enumerable path with classloader fix for Janino compilation. Calcite's {@code
* EnumerableInterpretable.getBindable()} hardcodes {@code
* EnumerableInterpretable.class.getClassLoader()} as the parent classloader for Janino. When
* analytics-engine is the parent classloader (via extendedPlugins), this returns the
* analytics-engine classloader which cannot see SQL plugin classes. This method replicates the
* Calcite implementation but uses this class's classloader (SQL plugin, child) which can see
* both parent and child classes.
*/
private PreparedResult implementEnumerable(RelRoot root) {
Hook.PLAN_BEFORE_IMPLEMENTATION.run(root);
RelDataType resultType = root.rel.getRowType();
boolean isDml = root.kind.belongsTo(SqlKind.DML);
EnumerableRel enumerable = (EnumerableRel) root.rel;

if (!root.isRefTrivial()) {
List<RexNode> projects = new java.util.ArrayList<>();
final RexBuilder rexBuilder = enumerable.getCluster().getRexBuilder();
for (java.util.Map.Entry<Integer, String> field : root.fields) {
projects.add(rexBuilder.makeInputRef(enumerable, field.getKey()));
}
org.apache.calcite.rex.RexProgram program =
org.apache.calcite.rex.RexProgram.create(
enumerable.getRowType(), projects, null, root.validatedRowType, rexBuilder);
enumerable =
org.apache.calcite.adapter.enumerable.EnumerableCalc.create(enumerable, program);
}

// Access the internalParameters map via reflection. This map is shared with the
// DataContext so stashed values (e.g., table scan references) are available at execution.
java.util.Map<String, Object> parameters;
try {
java.lang.reflect.Field f =
CalcitePrepareImpl.CalcitePreparingStmt.class.getDeclaredField("internalParameters");
f.setAccessible(true);
@SuppressWarnings("unchecked")
java.util.Map<String, Object> p = (java.util.Map<String, Object>) f.get(this);
parameters = p;
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Failed to access internalParameters", e);
}

// Match original CalcitePreparingStmt.implement() which puts _conformance before toBindable
parameters.put("_conformance", context.config().conformance());

CatalogReader.THREAD_LOCAL.set(catalogReader);
final Bindable bindable;
try {
bindable = compileWithPluginClassLoader(enumerable, parameters);
} finally {
CatalogReader.THREAD_LOCAL.remove();
}

return new PreparedResultImpl(
resultType,
requireNonNull(parameterRowType, "parameterRowType"),
requireNonNull(fieldOrigins, "fieldOrigins"),
root.collation.getFieldCollations().isEmpty()
? ImmutableList.of()
: ImmutableList.of(root.collation),
root.rel,
mapTableModOp(isDml, root.kind),
isDml) {
@Override
public String getCode() {
throw new UnsupportedOperationException();
}

@Override
public Bindable getBindable(Meta.CursorFactory cursorFactory) {
return bindable;
}

@Override
public Type getElementType() {
return resultType.getFieldList().size() == 1 ? Object.class : Object[].class;
}
};
}

/**
* Compiles an EnumerableRel to a Bindable using the SQL plugin's classloader. This is
* equivalent to {@code EnumerableInterpretable.toBindable()} + {@code getBindable()} but uses
* this class's classloader instead of {@code EnumerableInterpretable.class.getClassLoader()}.
*/
private static Bindable compileWithPluginClassLoader(
EnumerableRel rel, java.util.Map<String, Object> parameters) {
try {
org.apache.calcite.adapter.enumerable.EnumerableRelImplementor relImplementor =
new org.apache.calcite.adapter.enumerable.EnumerableRelImplementor(
rel.getCluster().getRexBuilder(), parameters);
org.apache.calcite.linq4j.tree.ClassDeclaration expr =
relImplementor.implementRoot(rel, EnumerableRel.Prefer.ARRAY);
String s =
org.apache.calcite.linq4j.tree.Expressions.toString(
expr.memberDeclarations, "\n", false);
Hook.JAVA_PLAN.run(s);

// Use this class's classloader (SQL plugin) instead of
// EnumerableInterpretable.class.getClassLoader() (analytics-engine parent).
// commons-compiler is in the parent classloader at runtime, so we use reflection.
ClassLoader classLoader = CalciteToolsHelper.class.getClassLoader();
Class<?> factoryFactoryClass =
classLoader.loadClass("org.codehaus.commons.compiler.CompilerFactoryFactory");
Object compilerFactory =
factoryFactoryClass
.getMethod("getDefaultCompilerFactory", ClassLoader.class)
.invoke(null, classLoader);
Object compiler =
compilerFactory.getClass().getMethod("newSimpleCompiler").invoke(compilerFactory);
compiler
.getClass()
.getMethod("setParentClassLoader", ClassLoader.class)
.invoke(compiler, classLoader);

String fullCode =
"public final class "
+ expr.name
+ " implements "
+ Bindable.class.getName()
+ ", "
+ org.apache.calcite.runtime.Typed.class.getName()
+ " {\n"
+ s
+ "\n}\n";
compiler.getClass().getMethod("cook", String.class).invoke(compiler, fullCode);
ClassLoader compiledClassLoader =
(ClassLoader) compiler.getClass().getMethod("getClassLoader").invoke(compiler);
@SuppressWarnings("unchecked")
Class<Bindable> clazz = (Class<Bindable>) compiledClassLoader.loadClass(expr.name);
return clazz.getDeclaredConstructors()[0].newInstance() instanceof Bindable b ? b : null;
} catch (Exception e) {
throw org.apache.calcite.util.Util.throwAsRuntime(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.opensearch.analytics.exec.QueryPlanExecutor;
import org.opensearch.sql.ast.statement.ExplainMode;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
Expand All @@ -38,9 +39,9 @@
*/
public class AnalyticsExecutionEngine implements ExecutionEngine {

private final QueryPlanExecutor planExecutor;
private final QueryPlanExecutor<RelNode, Iterable<Object[]>> planExecutor;

public AnalyticsExecutionEngine(QueryPlanExecutor planExecutor) {
public AnalyticsExecutionEngine(QueryPlanExecutor<RelNode, Iterable<Object[]>> planExecutor) {
this.planExecutor = planExecutor;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.analytics.exec.QueryPlanExecutor;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.SysLimit;
import org.opensearch.sql.common.response.ResponseListener;
Expand All @@ -35,12 +36,15 @@
class AnalyticsExecutionEngineTest {

private AnalyticsExecutionEngine engine;
private QueryPlanExecutor mockExecutor;

@SuppressWarnings("unchecked")
private QueryPlanExecutor<RelNode, Iterable<Object[]>> mockExecutor;

private CalcitePlanContext mockContext;

@BeforeEach
void setUp() throws Exception {
mockExecutor = mock(QueryPlanExecutor.class);
mockExecutor = (QueryPlanExecutor<RelNode, Iterable<Object[]>>) mock(QueryPlanExecutor.class);
engine = new AnalyticsExecutionEngine(mockExecutor);
mockContext = mock(CalcitePlanContext.class);
setSysLimit(mockContext, SysLimit.DEFAULT);
Expand Down
1 change: 1 addition & 0 deletions doctest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ testClusters {
}))
*/
plugin(getJobSchedulerPlugin(jsPlugin, bwcOpenSearchJSDownload))
plugin provider { (RegularFile) (() -> file("${rootDir}/libs/analytics-engine-3.6.0-SNAPSHOT.zip")) }
plugin ':opensearch-sql-plugin'
testDistribution = 'archive'
}
Expand Down
13 changes: 12 additions & 1 deletion integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -270,35 +270,44 @@ def getGeoSpatialPlugin() {
}
}

def getAnalyticsEnginePlugin() {
provider { (RegularFile) (() -> file("${rootDir}/libs/analytics-engine-3.6.0-SNAPSHOT.zip")) }
}

testClusters {
integTest {
testDistribution = 'archive'
plugin(getJobSchedulerPlugin())
plugin(getGeoSpatialPlugin())
plugin(getAnalyticsEnginePlugin())
plugin ":opensearch-sql-plugin"
setting "plugins.query.datasources.encryption.masterkey", "1234567812345678"
}
yamlRestTest {
testDistribution = 'archive'
plugin(getJobSchedulerPlugin())
plugin(getGeoSpatialPlugin())
plugin(getAnalyticsEnginePlugin())
plugin ":opensearch-sql-plugin"
setting "plugins.query.datasources.encryption.masterkey", "1234567812345678"
}
remoteCluster {
testDistribution = 'archive'
plugin(getJobSchedulerPlugin())
plugin(getGeoSpatialPlugin())
plugin(getAnalyticsEnginePlugin())
plugin ":opensearch-sql-plugin"
}
integTestWithSecurity {
testDistribution = 'archive'
plugin(getJobSchedulerPlugin())
plugin(getAnalyticsEnginePlugin())
plugin ":opensearch-sql-plugin"
}
remoteIntegTestWithSecurity {
testDistribution = 'archive'
plugin(getJobSchedulerPlugin())
plugin(getAnalyticsEnginePlugin())
plugin ":opensearch-sql-plugin"
}
}
Expand Down Expand Up @@ -352,8 +361,10 @@ task stopPrometheus(type: KillProcessTask) {
stopPrometheus.mustRunAfter startPrometheus

task integJdbcTest(type: RestIntegTestTask) {
testClusters.findAll {c -> c.clusterName == "integJdbcTest"}.first().
testClusters.findAll {c -> c.clusterName == "integJdbcTest"}.first().with {
plugin(getAnalyticsEnginePlugin())
plugin ":opensearch-sql-plugin"
}

useJUnitPlatform()
dependsOn ':opensearch-sql-plugin:bundlePlugin'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

package org.opensearch.sql.ppl;

import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
import static org.opensearch.sql.util.MatcherUtils.assertYamlEqualsIgnoreId;

import java.io.IOException;
import org.junit.Test;
import org.opensearch.client.Request;

/**
* Explain integration tests for queries routed through the analytics engine path (Project Mustang).
Expand All @@ -25,7 +27,22 @@ public class AnalyticsExplainIT extends PPLIntegTestCase {

@Override
protected void init() throws Exception {
// No index loading needed -- stub schema and data are hardcoded
// Create parquet_logs index so OpenSearchSchemaBuilder can build the schema for explain tests.
if (!isIndexExist(client(), "parquet_logs")) {
Request request = new Request("PUT", "/parquet_logs");
request.setJsonEntity(
"{"
+ "\"mappings\": {"
+ " \"properties\": {"
+ " \"ts\": {\"type\": \"date\"},"
+ " \"status\": {\"type\": \"integer\"},"
+ " \"message\": {\"type\": \"keyword\"},"
+ " \"ip_addr\": {\"type\": \"keyword\"}"
+ " }"
+ "}"
+ "}");
client().performRequest(request);
}
}

private String loadAnalyticsExpectedPlan(String fileName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.ppl;

import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.QUERY_API_ENDPOINT;
import static org.opensearch.sql.util.MatcherUtils.rows;
Expand Down Expand Up @@ -40,8 +41,48 @@ public class AnalyticsPPLIT extends PPLIntegTestCase {

@Override
protected void init() throws Exception {
// No index loading needed -- stub schema and data are hardcoded
// in RestUnifiedQueryAction and StubQueryPlanExecutor
// Create parquet indices with mappings so OpenSearchSchemaBuilder can build the schema.
// The stub executor still returns canned data regardless of the index contents.
createParquetLogsIndex();
createParquetMetricsIndex();
}

private void createParquetLogsIndex() throws IOException {
if (isIndexExist(client(), "parquet_logs")) {
return;
}
Request request = new Request("PUT", "/parquet_logs");
request.setJsonEntity(
"{"
+ "\"mappings\": {"
+ " \"properties\": {"
+ " \"ts\": {\"type\": \"date\"},"
+ " \"status\": {\"type\": \"integer\"},"
+ " \"message\": {\"type\": \"keyword\"},"
+ " \"ip_addr\": {\"type\": \"keyword\"}"
+ " }"
+ "}"
+ "}");
client().performRequest(request);
}

private void createParquetMetricsIndex() throws IOException {
if (isIndexExist(client(), "parquet_metrics")) {
return;
}
Request request = new Request("PUT", "/parquet_metrics");
request.setJsonEntity(
"{"
+ "\"mappings\": {"
+ " \"properties\": {"
+ " \"ts\": {\"type\": \"date\"},"
+ " \"cpu\": {\"type\": \"double\"},"
+ " \"memory\": {\"type\": \"double\"},"
+ " \"host\": {\"type\": \"keyword\"}"
+ " }"
+ "}"
+ "}");
client().performRequest(request);
}

// --- Full table scan tests with schema + data verification ---
Expand Down
Loading
Loading