diff --git a/core/build.gradle b/core/build.gradle index 23f9b37e317..aad19d6400b 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -63,6 +63,8 @@ dependencies { } api 'org.apache.calcite:calcite-linq4j:1.41.0' api project(':common') + compileOnly files("${rootDir}/libs/analytics-framework-3.6.0-SNAPSHOT.jar") + testImplementation files("${rootDir}/libs/analytics-framework-3.6.0-SNAPSHOT.jar") implementation "com.github.seancfoley:ipaddress:5.4.2" implementation "com.jayway.jsonpath:json-path:2.9.0" diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java index a6d57ea01f6..a013af885d6 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java @@ -358,7 +358,143 @@ public Type getElementType() { } }; } - return super.implement(root); + return implementEnumerable(root); + } + + /** + * Implements the Enumerable path with classloader fix for Janino compilation. Calcite's {@code + * EnumerableInterpretable.getBindable()} hardcodes {@code + * EnumerableInterpretable.class.getClassLoader()} as the parent classloader for Janino. When + * analytics-engine is the parent classloader (via extendedPlugins), this returns the + * analytics-engine classloader which cannot see SQL plugin classes. This method replicates the + * Calcite implementation but uses this class's classloader (SQL plugin, child) which can see + * both parent and child classes. + */ + private PreparedResult implementEnumerable(RelRoot root) { + Hook.PLAN_BEFORE_IMPLEMENTATION.run(root); + RelDataType resultType = root.rel.getRowType(); + boolean isDml = root.kind.belongsTo(SqlKind.DML); + EnumerableRel enumerable = (EnumerableRel) root.rel; + + if (!root.isRefTrivial()) { + List projects = new java.util.ArrayList<>(); + final RexBuilder rexBuilder = enumerable.getCluster().getRexBuilder(); + for (java.util.Map.Entry field : root.fields) { + projects.add(rexBuilder.makeInputRef(enumerable, field.getKey())); + } + org.apache.calcite.rex.RexProgram program = + org.apache.calcite.rex.RexProgram.create( + enumerable.getRowType(), projects, null, root.validatedRowType, rexBuilder); + enumerable = + org.apache.calcite.adapter.enumerable.EnumerableCalc.create(enumerable, program); + } + + // Access the internalParameters map via reflection. This map is shared with the + // DataContext so stashed values (e.g., table scan references) are available at execution. + java.util.Map parameters; + try { + java.lang.reflect.Field f = + CalcitePrepareImpl.CalcitePreparingStmt.class.getDeclaredField("internalParameters"); + f.setAccessible(true); + @SuppressWarnings("unchecked") + java.util.Map p = (java.util.Map) f.get(this); + parameters = p; + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Failed to access internalParameters", e); + } + + // Match original CalcitePreparingStmt.implement() which puts _conformance before toBindable + parameters.put("_conformance", context.config().conformance()); + + CatalogReader.THREAD_LOCAL.set(catalogReader); + final Bindable bindable; + try { + bindable = compileWithPluginClassLoader(enumerable, parameters); + } finally { + CatalogReader.THREAD_LOCAL.remove(); + } + + return new PreparedResultImpl( + resultType, + requireNonNull(parameterRowType, "parameterRowType"), + requireNonNull(fieldOrigins, "fieldOrigins"), + root.collation.getFieldCollations().isEmpty() + ? ImmutableList.of() + : ImmutableList.of(root.collation), + root.rel, + mapTableModOp(isDml, root.kind), + isDml) { + @Override + public String getCode() { + throw new UnsupportedOperationException(); + } + + @Override + public Bindable getBindable(Meta.CursorFactory cursorFactory) { + return bindable; + } + + @Override + public Type getElementType() { + return resultType.getFieldList().size() == 1 ? Object.class : Object[].class; + } + }; + } + + /** + * Compiles an EnumerableRel to a Bindable using the SQL plugin's classloader. This is + * equivalent to {@code EnumerableInterpretable.toBindable()} + {@code getBindable()} but uses + * this class's classloader instead of {@code EnumerableInterpretable.class.getClassLoader()}. + */ + private static Bindable compileWithPluginClassLoader( + EnumerableRel rel, java.util.Map parameters) { + try { + org.apache.calcite.adapter.enumerable.EnumerableRelImplementor relImplementor = + new org.apache.calcite.adapter.enumerable.EnumerableRelImplementor( + rel.getCluster().getRexBuilder(), parameters); + org.apache.calcite.linq4j.tree.ClassDeclaration expr = + relImplementor.implementRoot(rel, EnumerableRel.Prefer.ARRAY); + String s = + org.apache.calcite.linq4j.tree.Expressions.toString( + expr.memberDeclarations, "\n", false); + Hook.JAVA_PLAN.run(s); + + // Use this class's classloader (SQL plugin) instead of + // EnumerableInterpretable.class.getClassLoader() (analytics-engine parent). + // commons-compiler is in the parent classloader at runtime, so we use reflection. + ClassLoader classLoader = CalciteToolsHelper.class.getClassLoader(); + Class factoryFactoryClass = + classLoader.loadClass("org.codehaus.commons.compiler.CompilerFactoryFactory"); + Object compilerFactory = + factoryFactoryClass + .getMethod("getDefaultCompilerFactory", ClassLoader.class) + .invoke(null, classLoader); + Object compiler = + compilerFactory.getClass().getMethod("newSimpleCompiler").invoke(compilerFactory); + compiler + .getClass() + .getMethod("setParentClassLoader", ClassLoader.class) + .invoke(compiler, classLoader); + + String fullCode = + "public final class " + + expr.name + + " implements " + + Bindable.class.getName() + + ", " + + org.apache.calcite.runtime.Typed.class.getName() + + " {\n" + + s + + "\n}\n"; + compiler.getClass().getMethod("cook", String.class).invoke(compiler, fullCode); + ClassLoader compiledClassLoader = + (ClassLoader) compiler.getClass().getMethod("getClassLoader").invoke(compiler); + @SuppressWarnings("unchecked") + Class clazz = (Class) compiledClassLoader.loadClass(expr.name); + return clazz.getDeclaredConstructors()[0].newInstance() instanceof Bindable b ? b : null; + } catch (Exception e) { + throw org.apache.calcite.util.Util.throwAsRuntime(e); + } } @Override diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java index 209838971ca..07176586004 100644 --- a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java @@ -13,6 +13,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; +import org.opensearch.analytics.exec.QueryPlanExecutor; import org.opensearch.sql.ast.statement.ExplainMode; import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; @@ -38,9 +39,9 @@ */ public class AnalyticsExecutionEngine implements ExecutionEngine { - private final QueryPlanExecutor planExecutor; + private final QueryPlanExecutor> planExecutor; - public AnalyticsExecutionEngine(QueryPlanExecutor planExecutor) { + public AnalyticsExecutionEngine(QueryPlanExecutor> planExecutor) { this.planExecutor = planExecutor; } diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java b/core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java deleted file mode 100644 index fd322ca432a..00000000000 --- a/core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.executor.analytics; - -import org.apache.calcite.rel.RelNode; - -/** - * Executes a Calcite {@link RelNode} logical plan against the analytics engine. - * - *

This is a local equivalent of {@code org.opensearch.analytics.exec.QueryPlanExecutor} from the - * analytics-framework library. It will be replaced by the upstream interface once the - * analytics-framework JAR is published. - * - * @see Upstream - * QueryPlanExecutor - */ -@FunctionalInterface -public interface QueryPlanExecutor { - - /** - * Executes the given logical plan and returns result rows. - * - * @param plan the Calcite RelNode subtree to execute - * @param context execution context (opaque to avoid server dependency) - * @return rows produced by the engine - */ - Iterable execute(RelNode plan, Object context); -} diff --git a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java index 70ada76c54a..c8e4cffae8b 100644 --- a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java @@ -24,6 +24,7 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.opensearch.analytics.exec.QueryPlanExecutor; import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.SysLimit; import org.opensearch.sql.common.response.ResponseListener; @@ -35,12 +36,15 @@ class AnalyticsExecutionEngineTest { private AnalyticsExecutionEngine engine; - private QueryPlanExecutor mockExecutor; + + @SuppressWarnings("unchecked") + private QueryPlanExecutor> mockExecutor; + private CalcitePlanContext mockContext; @BeforeEach void setUp() throws Exception { - mockExecutor = mock(QueryPlanExecutor.class); + mockExecutor = (QueryPlanExecutor>) mock(QueryPlanExecutor.class); engine = new AnalyticsExecutionEngine(mockExecutor); mockContext = mock(CalcitePlanContext.class); setSysLimit(mockContext, SysLimit.DEFAULT); diff --git a/doctest/build.gradle b/doctest/build.gradle index d758a6de4b8..9bf5b0233fe 100644 --- a/doctest/build.gradle +++ b/doctest/build.gradle @@ -195,6 +195,7 @@ testClusters { })) */ plugin(getJobSchedulerPlugin(jsPlugin, bwcOpenSearchJSDownload)) + plugin provider { (RegularFile) (() -> file("${rootDir}/libs/analytics-engine-3.6.0-SNAPSHOT.zip")) } plugin ':opensearch-sql-plugin' testDistribution = 'archive' } diff --git a/integ-test/build.gradle b/integ-test/build.gradle index b914cb0cbe0..7a243028dd5 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -270,11 +270,16 @@ def getGeoSpatialPlugin() { } } +def getAnalyticsEnginePlugin() { + provider { (RegularFile) (() -> file("${rootDir}/libs/analytics-engine-3.6.0-SNAPSHOT.zip")) } +} + testClusters { integTest { testDistribution = 'archive' plugin(getJobSchedulerPlugin()) plugin(getGeoSpatialPlugin()) + plugin(getAnalyticsEnginePlugin()) plugin ":opensearch-sql-plugin" setting "plugins.query.datasources.encryption.masterkey", "1234567812345678" } @@ -282,6 +287,7 @@ testClusters { testDistribution = 'archive' plugin(getJobSchedulerPlugin()) plugin(getGeoSpatialPlugin()) + plugin(getAnalyticsEnginePlugin()) plugin ":opensearch-sql-plugin" setting "plugins.query.datasources.encryption.masterkey", "1234567812345678" } @@ -289,16 +295,19 @@ testClusters { testDistribution = 'archive' plugin(getJobSchedulerPlugin()) plugin(getGeoSpatialPlugin()) + plugin(getAnalyticsEnginePlugin()) plugin ":opensearch-sql-plugin" } integTestWithSecurity { testDistribution = 'archive' plugin(getJobSchedulerPlugin()) + plugin(getAnalyticsEnginePlugin()) plugin ":opensearch-sql-plugin" } remoteIntegTestWithSecurity { testDistribution = 'archive' plugin(getJobSchedulerPlugin()) + plugin(getAnalyticsEnginePlugin()) plugin ":opensearch-sql-plugin" } } @@ -352,8 +361,10 @@ task stopPrometheus(type: KillProcessTask) { stopPrometheus.mustRunAfter startPrometheus task integJdbcTest(type: RestIntegTestTask) { - testClusters.findAll {c -> c.clusterName == "integJdbcTest"}.first(). + testClusters.findAll {c -> c.clusterName == "integJdbcTest"}.first().with { + plugin(getAnalyticsEnginePlugin()) plugin ":opensearch-sql-plugin" + } useJUnitPlatform() dependsOn ':opensearch-sql-plugin:bundlePlugin' diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsExplainIT.java index c408f6582e1..bd276c36aa5 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsExplainIT.java @@ -5,10 +5,12 @@ package org.opensearch.sql.ppl; +import static org.opensearch.sql.legacy.TestUtils.isIndexExist; import static org.opensearch.sql.util.MatcherUtils.assertYamlEqualsIgnoreId; import java.io.IOException; import org.junit.Test; +import org.opensearch.client.Request; /** * Explain integration tests for queries routed through the analytics engine path (Project Mustang). @@ -25,7 +27,22 @@ public class AnalyticsExplainIT extends PPLIntegTestCase { @Override protected void init() throws Exception { - // No index loading needed -- stub schema and data are hardcoded + // Create parquet_logs index so OpenSearchSchemaBuilder can build the schema for explain tests. + if (!isIndexExist(client(), "parquet_logs")) { + Request request = new Request("PUT", "/parquet_logs"); + request.setJsonEntity( + "{" + + "\"mappings\": {" + + " \"properties\": {" + + " \"ts\": {\"type\": \"date\"}," + + " \"status\": {\"type\": \"integer\"}," + + " \"message\": {\"type\": \"keyword\"}," + + " \"ip_addr\": {\"type\": \"keyword\"}" + + " }" + + "}" + + "}"); + client().performRequest(request); + } } private String loadAnalyticsExpectedPlan(String fileName) { diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java index de0fda93c6f..e522fbfef2c 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java @@ -6,6 +6,7 @@ package org.opensearch.sql.ppl; import static org.opensearch.sql.legacy.TestUtils.getResponseBody; +import static org.opensearch.sql.legacy.TestUtils.isIndexExist; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.QUERY_API_ENDPOINT; import static org.opensearch.sql.util.MatcherUtils.rows; @@ -40,8 +41,48 @@ public class AnalyticsPPLIT extends PPLIntegTestCase { @Override protected void init() throws Exception { - // No index loading needed -- stub schema and data are hardcoded - // in RestUnifiedQueryAction and StubQueryPlanExecutor + // Create parquet indices with mappings so OpenSearchSchemaBuilder can build the schema. + // The stub executor still returns canned data regardless of the index contents. + createParquetLogsIndex(); + createParquetMetricsIndex(); + } + + private void createParquetLogsIndex() throws IOException { + if (isIndexExist(client(), "parquet_logs")) { + return; + } + Request request = new Request("PUT", "/parquet_logs"); + request.setJsonEntity( + "{" + + "\"mappings\": {" + + " \"properties\": {" + + " \"ts\": {\"type\": \"date\"}," + + " \"status\": {\"type\": \"integer\"}," + + " \"message\": {\"type\": \"keyword\"}," + + " \"ip_addr\": {\"type\": \"keyword\"}" + + " }" + + "}" + + "}"); + client().performRequest(request); + } + + private void createParquetMetricsIndex() throws IOException { + if (isIndexExist(client(), "parquet_metrics")) { + return; + } + Request request = new Request("PUT", "/parquet_metrics"); + request.setJsonEntity( + "{" + + "\"mappings\": {" + + " \"properties\": {" + + " \"ts\": {\"type\": \"date\"}," + + " \"cpu\": {\"type\": \"double\"}," + + " \"memory\": {\"type\": \"double\"}," + + " \"host\": {\"type\": \"keyword\"}" + + " }" + + "}" + + "}"); + client().performRequest(request); } // --- Full table scan tests with schema + data verification --- diff --git a/integ-test/src/test/resources/expectedOutput/analytics/explain_aggregation.yaml b/integ-test/src/test/resources/expectedOutput/analytics/explain_aggregation.yaml index 01a08fd02ab..e7ec8c781fe 100644 --- a/integ-test/src/test/resources/expectedOutput/analytics/explain_aggregation.yaml +++ b/integ-test/src/test/resources/expectedOutput/analytics/explain_aggregation.yaml @@ -3,5 +3,5 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(count()=[$1], status=[$0]) LogicalAggregate(group=[{0}], count()=[COUNT()]) - LogicalProject(status=[$1]) + LogicalProject(status=[$2]) LogicalTableScan(table=[[opensearch, parquet_logs]]) diff --git a/integ-test/src/test/resources/expectedOutput/analytics/explain_eval.yaml b/integ-test/src/test/resources/expectedOutput/analytics/explain_eval.yaml index 4009b8e6193..647ef80bd84 100644 --- a/integ-test/src/test/resources/expectedOutput/analytics/explain_eval.yaml +++ b/integ-test/src/test/resources/expectedOutput/analytics/explain_eval.yaml @@ -1,5 +1,5 @@ calcite: logical: | LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) - LogicalProject(ts=[$0], status=[$1], message=[$2], ip_addr=[$3], error=[=($1, 500)]) + LogicalProject(ip_addr=[$0], message=[$1], status=[$2], ts=[$3], error=[=($2, 500)]) LogicalTableScan(table=[[opensearch, parquet_logs]]) diff --git a/integ-test/src/test/resources/expectedOutput/analytics/explain_filter_project.yaml b/integ-test/src/test/resources/expectedOutput/analytics/explain_filter_project.yaml index 68fd28ec9b3..a4208266ce8 100644 --- a/integ-test/src/test/resources/expectedOutput/analytics/explain_filter_project.yaml +++ b/integ-test/src/test/resources/expectedOutput/analytics/explain_filter_project.yaml @@ -1,6 +1,6 @@ calcite: logical: | LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) - LogicalProject(ts=[$0], message=[$2]) - LogicalFilter(condition=[=($1, 200)]) + LogicalProject(ts=[$3], message=[$1]) + LogicalFilter(condition=[=($2, 200)]) LogicalTableScan(table=[[opensearch, parquet_logs]]) diff --git a/integ-test/src/test/resources/expectedOutput/analytics/explain_project.yaml b/integ-test/src/test/resources/expectedOutput/analytics/explain_project.yaml index 225faa4616a..acc411753d6 100644 --- a/integ-test/src/test/resources/expectedOutput/analytics/explain_project.yaml +++ b/integ-test/src/test/resources/expectedOutput/analytics/explain_project.yaml @@ -1,5 +1,5 @@ calcite: logical: | LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) - LogicalProject(ts=[$0], message=[$2]) + LogicalProject(ts=[$3], message=[$1]) LogicalTableScan(table=[[opensearch, parquet_logs]]) diff --git a/integ-test/src/test/resources/expectedOutput/analytics/explain_sort.yaml b/integ-test/src/test/resources/expectedOutput/analytics/explain_sort.yaml index 7bc65162101..771ea96309a 100644 --- a/integ-test/src/test/resources/expectedOutput/analytics/explain_sort.yaml +++ b/integ-test/src/test/resources/expectedOutput/analytics/explain_sort.yaml @@ -1,5 +1,5 @@ calcite: logical: | - LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT]) - LogicalSort(sort0=[$0], dir0=[ASC-nulls-first]) + LogicalSystemLimit(sort0=[$3], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$3], dir0=[ASC-nulls-first]) LogicalTableScan(table=[[opensearch, parquet_logs]]) diff --git a/libs/analytics-engine-3.6.0-SNAPSHOT.jar b/libs/analytics-engine-3.6.0-SNAPSHOT.jar new file mode 100644 index 00000000000..ffba4657e2c Binary files /dev/null and b/libs/analytics-engine-3.6.0-SNAPSHOT.jar differ diff --git a/libs/analytics-engine-3.6.0-SNAPSHOT.zip b/libs/analytics-engine-3.6.0-SNAPSHOT.zip new file mode 100644 index 00000000000..62220069a28 Binary files /dev/null and b/libs/analytics-engine-3.6.0-SNAPSHOT.zip differ diff --git a/libs/analytics-framework-3.6.0-SNAPSHOT.jar b/libs/analytics-framework-3.6.0-SNAPSHOT.jar new file mode 100644 index 00000000000..0e137a5bf01 Binary files /dev/null and b/libs/analytics-framework-3.6.0-SNAPSHOT.jar differ diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java index 224d7019ec2..d7478c4ee67 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java @@ -137,8 +137,7 @@ public T compile( getter, new RelRecordType(List.of())); - Function1 function = - new RexExecutable(code, "generated Rex code").getFunction(); + Function1 function = compileRexCode(code); if (CONTEXTS.containsKey(context)) { return context.factoryClazz.cast(CONTEXTS.get(context).apply(function, rexNode.getType())); @@ -150,6 +149,39 @@ public T compile( CONTEXTS, context)); } + /** + * Compile Rex code using the SQL plugin's classloader. Calcite's {@link RexExecutable} hardcodes + * {@code RexExecutable.class.getClassLoader()} for Janino, which returns the analytics-engine + * parent classloader when extendedPlugins is used. This method uses this class's classloader (SQL + * plugin) so Janino can resolve both parent and child classes. + */ + @SuppressWarnings("unchecked") + private static Function1 compileRexCode(String code) { + try { + // Use reflection for Janino classes (available at runtime via parent classloader, + // not on opensearch module compile classpath). + ClassLoader classLoader = CalciteScriptEngine.class.getClassLoader(); + Class cbeClass = classLoader.loadClass("org.codehaus.janino.ClassBodyEvaluator"); + Object cbe = cbeClass.getDeclaredConstructor().newInstance(); + cbeClass.getMethod("setClassName", String.class).invoke(cbe, "Reducer"); + cbeClass + .getMethod("setExtendedClass", Class.class) + .invoke(cbe, org.apache.calcite.runtime.Utilities.class); + cbeClass + .getMethod("setImplementedInterfaces", Class[].class) + .invoke(cbe, (Object) new Class[] {Function1.class, java.io.Serializable.class}); + cbeClass.getMethod("setParentClassLoader", ClassLoader.class).invoke(cbe, classLoader); + + // ClassBodyEvaluator.cook(String) compiles the source code + cbeClass.getMethod("cook", String.class).invoke(cbe, code); + + Class clazz = (Class) cbeClass.getMethod("getClazz").invoke(cbe); + return (Function1) clazz.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw Util.throwAsRuntime(e); + } + } + @Override public Set> getSupportedContexts() { return CONTEXTS.keySet(); diff --git a/plugin/build.gradle b/plugin/build.gradle index 7b757759e13..af190dd7122 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -55,11 +55,51 @@ opensearchplugin { name 'opensearch-sql' description 'OpenSearch SQL' classname 'org.opensearch.sql.plugin.SQLPlugin' - extendedPlugins = ['opensearch-job-scheduler'] + extendedPlugins = ['opensearch-job-scheduler', 'analytics-engine'] licenseFile rootProject.file("LICENSE.txt") noticeFile rootProject.file("NOTICE") } +// Exclude jars provided by analytics-engine plugin (shared via extendedPlugins classloader). +// Must match exactly what's in the analytics-engine ZIP to avoid both jar hell and missing classes. +bundlePlugin { + exclude 'calcite-core-*.jar' + exclude 'calcite-linq4j-*.jar' + exclude 'avatica-core-*.jar' + exclude 'avatica-metrics-*.jar' + exclude 'guava-*.jar' + exclude 'failureaccess-*.jar' + exclude 'slf4j-api-*.jar' + exclude 'commons-codec-*.jar' + exclude 'commons-compiler-*.jar' + exclude 'janino-*.jar' + exclude 'joou-java-6-*.jar' + exclude 'memory-0*.jar' + exclude 'sketches-core-*.jar' + exclude 'commons-lang3-*.jar' + exclude 'commons-text-*.jar' + exclude 'commons-math3-*.jar' + exclude 'commons-dbcp2-*.jar' + exclude 'commons-io-*.jar' + exclude 'commons-pool2-*.jar' + exclude 'uzaygezen-core-*.jar' + exclude 'icu4j-*.jar' + exclude 'error_prone_annotations-*.jar' + exclude 'jts-io-common-*.jar' + exclude 'proj4j-*.jar' + exclude 'json-path-*.jar' + exclude 'json-smart-*.jar' + exclude 'accessors-smart-*.jar' + exclude 'asm-9*.jar' + exclude 'httpclient5-*.jar' + exclude 'httpcore5-5*.jar' + exclude 'httpcore5-h2-*.jar' + exclude 'httpcore5-reactive-*.jar' + exclude 'antlr4-runtime-4.13.2.jar' + exclude 'jackson-annotations-*.jar' + exclude 'jackson-databind-*.jar' +} + publishing { publications { pluginZip(MavenPublication) { publication -> @@ -161,6 +201,8 @@ dependencies { api project(":ppl") api project(':api') + compileOnly files("${rootDir}/libs/analytics-framework-3.6.0-SNAPSHOT.jar") + compileOnly files("${rootDir}/libs/analytics-engine-3.6.0-SNAPSHOT.jar") api project(':legacy') api project(':opensearch') api project(':prometheus') diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index c8b6ffcb55b..6fc4b2bd058 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -12,10 +12,12 @@ import java.util.Map; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.ThreadContext; +import org.opensearch.analytics.exec.QueryPlanExecutor; +import org.opensearch.analytics.schema.OpenSearchSchemaBuilder; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.sql.api.UnifiedQueryContext; @@ -29,9 +31,7 @@ import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine; -import org.opensearch.sql.executor.analytics.QueryPlanExecutor; import org.opensearch.sql.lang.LangSpec; -import org.opensearch.sql.plugin.rest.analytics.stub.StubSchemaProvider; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.protocol.response.QueryResult; @@ -51,9 +51,14 @@ public class RestUnifiedQueryAction { private final AnalyticsExecutionEngine analyticsEngine; private final NodeClient client; + private final ClusterService clusterService; - public RestUnifiedQueryAction(NodeClient client, QueryPlanExecutor planExecutor) { + public RestUnifiedQueryAction( + NodeClient client, + ClusterService clusterService, + QueryPlanExecutor> planExecutor) { this.client = client; + this.clusterService = clusterService; this.analyticsEngine = new AnalyticsExecutionEngine(planExecutor); } @@ -70,7 +75,7 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) { if (query == null || query.isEmpty()) { return false; } - try (UnifiedQueryContext context = buildContext(queryType, false)) { + try (UnifiedQueryContext context = buildParsingContext(queryType)) { String indexName = extractIndexName(query, context); if (indexName == null) { return false; @@ -151,11 +156,18 @@ private void doExplain( } } + /** + * Build a lightweight context for parsing only (index name extraction). Does not require cluster + * state or catalog schema. + */ + private static UnifiedQueryContext buildParsingContext(QueryType queryType) { + return UnifiedQueryContext.builder().language(queryType).build(); + } + private UnifiedQueryContext buildContext(QueryType queryType, boolean profiling) { - AbstractSchema schema = StubSchemaProvider.buildSchema(); return UnifiedQueryContext.builder() .language(queryType) - .catalog(SCHEMA_NAME, schema) + .catalog(SCHEMA_NAME, OpenSearchSchemaBuilder.buildSchema(clusterService.state())) .defaultNamespace(SCHEMA_NAME) .profiling(profiling) .build(); diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubQueryPlanExecutor.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubQueryPlanExecutor.java index 598d0e13699..40bc6f57667 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubQueryPlanExecutor.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubQueryPlanExecutor.java @@ -9,7 +9,7 @@ import java.util.List; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; -import org.opensearch.sql.executor.analytics.QueryPlanExecutor; +import org.opensearch.analytics.exec.QueryPlanExecutor; /** * Stub implementation of {@link QueryPlanExecutor} for development and testing. Returns canned data @@ -18,7 +18,7 @@ * *

Will be replaced by the real analytics engine implementation when available. */ -public class StubQueryPlanExecutor implements QueryPlanExecutor { +public class StubQueryPlanExecutor implements QueryPlanExecutor> { @Override public Iterable execute(RelNode plan, Object context) { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubSchemaProvider.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubSchemaProvider.java deleted file mode 100644 index d45a97f861f..00000000000 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubSchemaProvider.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.plugin.rest.analytics.stub; - -import java.util.Map; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.schema.Table; -import org.apache.calcite.schema.impl.AbstractSchema; -import org.apache.calcite.schema.impl.AbstractTable; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * Stub schema provider for development and testing. Returns hardcoded Calcite table definitions - * with standard types. Will be replaced by {@code EngineContext.getSchema()} when the analytics - * engine is ready. - */ -public class StubSchemaProvider { - - /** Build a stub Calcite schema with hardcoded parquet tables. */ - public static AbstractSchema buildSchema() { - return new AbstractSchema() { - @Override - protected Map getTableMap() { - return Map.of( - "parquet_logs", buildLogsTable(), - "parquet_metrics", buildMetricsTable()); - } - }; - } - - private static Table buildLogsTable() { - return new AbstractTable() { - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return typeFactory - .builder() - .add("ts", SqlTypeName.TIMESTAMP) - .add("status", SqlTypeName.INTEGER) - .add("message", SqlTypeName.VARCHAR) - .add("ip_addr", SqlTypeName.VARCHAR) - .build(); - } - }; - } - - private static Table buildMetricsTable() { - return new AbstractTable() { - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return typeFactory - .builder() - .add("ts", SqlTypeName.TIMESTAMP) - .add("cpu", SqlTypeName.DOUBLE) - .add("memory", SqlTypeName.DOUBLE) - .add("host", SqlTypeName.VARCHAR) - .build(); - } - }; - } -} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java index ed24dd3051f..18b9e89cbb2 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -83,7 +83,8 @@ public TransportPPLQueryAction( b.bind(DataSourceService.class).toInstance(dataSourceService); }); this.injector = Guice.createInjector(modules); - this.unifiedQueryHandler = new RestUnifiedQueryAction(client, new StubQueryPlanExecutor()); + this.unifiedQueryHandler = + new RestUnifiedQueryAction(client, clusterService, new StubQueryPlanExecutor()); this.pplEnabled = () -> MULTI_ALLOW_EXPLICIT_INDEX.get(clusterSettings) diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java index 1a3b6fb64c2..a8e594b1816 100644 --- a/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java +++ b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java @@ -11,8 +11,9 @@ import org.junit.Before; import org.junit.Test; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.sql.executor.QueryType; -import org.opensearch.sql.executor.analytics.QueryPlanExecutor; +import org.opensearch.sql.plugin.rest.analytics.stub.StubQueryPlanExecutor; import org.opensearch.transport.client.node.NodeClient; /** @@ -25,7 +26,9 @@ public class RestUnifiedQueryActionTest { @Before public void setUp() { - action = new RestUnifiedQueryAction(mock(NodeClient.class), mock(QueryPlanExecutor.class)); + action = + new RestUnifiedQueryAction( + mock(NodeClient.class), mock(ClusterService.class), new StubQueryPlanExecutor()); } @Test