Skip to content

Commit 7ddbf1e

Browse files
authored
Unify PPL execution in Spark via unified query API (#1313)
* Create unified query integration module with dependency Signed-off-by: Chen Dai <daichen@amazon.com> * Add unified query spark parser and spark schema impl Signed-off-by: Chen Dai <daichen@amazon.com> * Refactor type converter and unit test Signed-off-by: Chen Dai <daichen@amazon.com> * Refactor unit test to reuse mock table utility Signed-off-by: Chen Dai <daichen@amazon.com> * Fix nullability and wrong type mapping Signed-off-by: Chen Dai <daichen@amazon.com> * Migrate unit tests to BDD style Signed-off-by: Chen Dai <daichen@amazon.com> * Lazy initializing unified query context Signed-off-by: Chen Dai <daichen@amazon.com> * Update unit test, javadoc and readme Signed-off-by: Chen Dai <daichen@amazon.com> * Clean up sbt config and readme Signed-off-by: Chen Dai <daichen@amazon.com> * Update javadoc Signed-off-by: Chen Dai <daichen@amazon.com> * Remove PPL-specific logic and defer to later integration with ppl-spark-integration Signed-off-by: Chen Dai <daichen@amazon.com> * Fix unquoted table identifier issue Signed-off-by: Chen Dai <daichen@amazon.com> * Unwrap and propagate root cause in unified query parser Signed-off-by: Chen Dai <daichen@amazon.com> --------- Signed-off-by: Chen Dai <daichen@amazon.com>
1 parent 47ce810 commit 7ddbf1e

File tree

7 files changed

+831
-1
lines changed

7 files changed

+831
-1
lines changed

build.sbt

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ lazy val commonSettings = Seq(
100100

101101
// running `scalafmtAll` includes all subprojects under root
102102
lazy val root = (project in file("."))
103-
.aggregate(flintCommons, flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication, integtest)
103+
.aggregate(flintCommons, flintCore, flintSparkIntegration, pplSparkIntegration, unifiedQuerySparkIntegration, sparkSqlApplication, integtest)
104104
.disablePlugins(AssemblyPlugin)
105105
.settings(name := "flint", publish / skip := true)
106106

@@ -299,6 +299,43 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
299299
assembly / test := (Test / test).value
300300
)
301301

302+
lazy val unifiedQuerySparkIntegration = (project in file("unified-query-spark-integration"))
303+
.disablePlugins(AssemblyPlugin)
304+
.settings(
305+
commonSettings,
306+
name := "unified-query-spark-integration",
307+
scalaVersion := scala212,
308+
resolvers ++= Seq(
309+
"OpenSearch Snapshots" at "https://ci.opensearch.org/ci/dbc/snapshots/maven/",
310+
),
311+
// Force all Jackson dependencies to use Spark's version to avoid conflicts
312+
dependencyOverrides ++= Seq(
313+
"com.fasterxml.jackson.core" % "jackson-annotations" % jacksonVersion,
314+
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % jacksonVersion
315+
),
316+
libraryDependencies ++= Seq(
317+
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
318+
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
319+
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
320+
"org.mockito" %% "mockito-scala" % "1.16.42" % "test",
321+
// unified-query-api dependency from OpenSearch SQL project
322+
"org.opensearch.query" % "unified-query-api" % "2.19.4.0-SNAPSHOT"
323+
excludeAll(
324+
ExclusionRule(organization = "com.fasterxml.jackson.core"),
325+
ExclusionRule(organization = "com.fasterxml.jackson.dataformat"),
326+
ExclusionRule(organization = "com.fasterxml.jackson.module"),
327+
ExclusionRule(organization = "com.fasterxml.jackson.jaxrs"),
328+
ExclusionRule(organization = "org.opensearch"),
329+
ExclusionRule(organization = "com.squareup.okhttp3"),
330+
ExclusionRule(organization = "com.amazonaws"),
331+
ExclusionRule(organization = "com.github.babbel"),
332+
ExclusionRule(organization = "org.apache.logging.log4j"),
333+
ExclusionRule(organization = "org.slf4j"))
334+
exclude("org.opensearch.query", "unified-query-protocol")),
335+
libraryDependencies ++= deps(sparkVersion),
336+
publish / skip := true
337+
)
338+
302339
lazy val IntegrationTest = config("it") extend Test
303340
lazy val AwsIntegrationTest = config("aws-it") extend Test
304341

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# Unified Query Spark Integration
2+
3+
This module provides the integration layer between the Calcite-based unified query engine and Apache Spark. It enables consistent query behavior across OpenSearch and Spark by leveraging the unified query parser and transpiler.
4+
5+
## Overview
6+
7+
The unified-query-spark-integration module bridges the gap between OpenSearch's Calcite-based unified query engine and Spark's execution environment. It provides:
8+
9+
- **UnifiedQuerySparkParser**: A custom Spark SQL parser that routes queries through the Calcite engine for transpilation to Spark SQL
10+
- **SparkSchema**: Implements Calcite's Schema interface by bridging Spark SQL catalogs and tables
11+
12+
## Architecture
13+
14+
```
15+
┌─────────────────────────────────┬──────────────────────────────────────┬─────────────────────────────────┐
16+
│ Spark SQL │ Unified Query Integration │ Unified Query API │
17+
├─────────────────────────────────┼──────────────────────────────────────┼─────────────────────────────────┤
18+
│ spark.sql("<query text>") │ │ │
19+
│ │ │ │ │
20+
│ ├───────────────────►│ UnifiedQuerySparkParser │ │
21+
│ │ │ │ │ │
22+
│ │ │ ├────── query text ───────►│ UnifiedQueryPlanner │
23+
│ │ │ │ │ │ │
24+
│ │ │ │ │ ▼ │
25+
│ │ │ │ │ Calcite RelNode │
26+
│ │ │ │ │ │ │
27+
│ │ │ │ │ ▼ │
28+
│ │ │ │ │ UnifiedQueryTranspiler │
29+
│ │ │ │ │ │ │
30+
│ │◄──────────────────────────────────── Spark SQL text ──────┘───────────┘ │
31+
│ ▼ │ │ │
32+
│ Spark built-in SQL parser │ │ │
33+
│ │ │ │ │
34+
│ ▼ │ │ │
35+
│ LogicalPlan │ │ │
36+
│ │ │ │ │
37+
│ ▼ │ │ │
38+
│ Execute │ │ │
39+
└─────────────────────────────────┴──────────────────────────────────────┴─────────────────────────────────┘
40+
```
41+
42+
When a query is submitted:
43+
1. `UnifiedQuerySparkParser.parsePlan()` receives the query
44+
2. `UnifiedQueryPlanner` parses the query and creates a Calcite RelNode (unified logical plan)
45+
3. `UnifiedQueryTranspiler` converts the RelNode to Spark SQL using `OpenSearchSparkSqlDialect`
46+
4. The generated SQL is passed to Spark's native parser for execution
47+
5. If parsing fails (unsupported query), it falls back to the underlying Spark parser
48+
49+
#### Type Mapping
50+
51+
The following table shows how Spark SQL types are mapped to Unified Query types for query processing:
52+
53+
| Spark SQL Type | Unified Query Type | Notes |
54+
|----------------|--------------|-------|
55+
| `BooleanType` | `BOOLEAN` | |
56+
| `ByteType` | `TINYINT` | |
57+
| `ShortType` | `SMALLINT` | |
58+
| `IntegerType` | `INTEGER` | |
59+
| `LongType` | `BIGINT` | |
60+
| `FloatType` | `REAL` | 4-byte single-precision |
61+
| `DoubleType` | `DOUBLE` | |
62+
| `DecimalType(p, s)` | `DECIMAL(p, s)` | Preserves precision and scale |
63+
| `StringType` | `VARCHAR` | |
64+
| `VarcharType(n)` | `VARCHAR(n)` | Preserves length |
65+
| `CharType(n)` | `CHAR(n)` | Preserves length |
66+
| `BinaryType` | `VARBINARY` | |
67+
| `DateType` | `DATE` | |
68+
| `TimestampType` | `TIMESTAMP_WITH_LOCAL_TIME_ZONE` | Timestamp with timezone |
69+
| `TimestampNTZType` | `TIMESTAMP` | Timestamp without timezone |
70+
| `ArrayType(T, nullable)` | `ARRAY<T>` | Preserves element nullability |
71+
| `MapType(K, V, nullable)` | `MAP<K, V>` | Preserves value nullability |
72+
| `StructType` | `RecordType` | Preserves field names and nullability |
73+
| `NullType` | `NULL` | |
74+
| `DayTimeIntervalType` | `INTERVAL_DAY_*` | Maps to appropriate day-time interval |
75+
| `YearMonthIntervalType` | `INTERVAL_YEAR_*` | Maps to appropriate year-month interval |
76+
| `UserDefinedType` | Delegates to `sqlType` | Unwraps to underlying SQL type |
77+
| Unsupported types | `ANY` | Fallback for CalendarIntervalType, ObjectType, etc. |
78+
79+
## Dependencies
80+
81+
This module depends on:
82+
- `unified-query-api`: OpenSearch's Calcite-based unified query engine (from OpenSearch SQL project)
83+
- Apache Spark SQL (provided)
84+
- Apache Calcite (transitive via unified-query-api)
85+
86+
## Build
87+
88+
Build the module with:
89+
90+
```bash
91+
sbt unifiedQuerySparkIntegration/compile
92+
```
93+
94+
Run tests with:
95+
96+
```bash
97+
sbt unifiedQuerySparkIntegration/test
98+
```
99+
100+
## Related Documentation
101+
102+
- [PPL on Spark Architecture](../docs/ppl-lang/PPL-on-Spark.md)
103+
104+
## GitHub Reference
105+
106+
- [opensearch-spark#1136](https://github.com/opensearch-project/opensearch-spark/issues/1136) - Unified Query Spark Integration
107+
108+
## License
109+
110+
See the [LICENSE](../LICENSE) file for our project's licensing.
111+
112+
## Copyright
113+
114+
Copyright OpenSearch Contributors. See [NOTICE](../NOTICE) for details.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.flint.spark.query
7+
8+
import java.util
9+
10+
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
11+
import org.apache.calcite.schema.{Schema, Table}
12+
import org.apache.calcite.schema.impl.{AbstractSchema, AbstractTable}
13+
14+
import org.apache.spark.sql.SparkSession
15+
16+
/**
17+
* Implements Calcite's Schema interface by bridging Spark SQL catalogs and tables.
18+
*
19+
* Schema hierarchy mirrors Spark's catalog structure:
20+
* {{{
21+
* SparkSchema (catalog)
22+
* +-- SubSchema (database) - dynamic, no existence validation
23+
* +-- Table
24+
* }}}
25+
*
26+
* @param spark
27+
* The current SparkSession
28+
* @param catalogName
29+
* The name of the catalog this schema represents
30+
*/
31+
class SparkSchema(spark: SparkSession, catalogName: String) extends AbstractSchema {
32+
33+
override protected def getSubSchemaMap: util.Map[String, Schema] =
34+
new LazyMap[String, Schema](dbName => buildSubSchema(dbName))
35+
36+
private def buildSubSchema(dbName: String): Schema = new AbstractSchema() {
37+
override def getTableMap: util.Map[String, Table] =
38+
new LazyMap[String, Table](tableName => buildCalciteTable(dbName, tableName))
39+
}
40+
41+
private def buildCalciteTable(dbName: String, tableName: String): Table = {
42+
val quotedTableName = s"`$catalogName`.`$dbName`.`$tableName`"
43+
val sparkTable = spark.table(quotedTableName)
44+
new AbstractTable {
45+
override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
46+
sparkTable.schema.toCalcite(typeFactory)
47+
}
48+
}
49+
}
50+
51+
/** A read-only map that computes values on demand using the provided function. */
52+
private class LazyMap[K, V](valueFn: K => V) extends util.AbstractMap[K, V] {
53+
override def get(key: Any): V = valueFn(key.asInstanceOf[K])
54+
55+
// Returns empty set as iteration is not supported and Calcite only uses get() lookups
56+
override def entrySet(): util.Set[util.Map.Entry[K, V]] = util.Collections.emptySet()
57+
}
58+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.flint.spark.query
7+
8+
import org.opensearch.sql.api.{UnifiedQueryContext, UnifiedQueryPlanner}
9+
import org.opensearch.sql.api.transpiler.UnifiedQueryTranspiler
10+
import org.opensearch.sql.common.antlr.SyntaxCheckException
11+
import org.opensearch.sql.ppl.calcite.OpenSearchSparkSqlDialect
12+
13+
import org.apache.spark.internal.Logging
14+
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
15+
import org.apache.spark.sql.catalyst.expressions.Expression
16+
import org.apache.spark.sql.catalyst.parser.ParserInterface
17+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
18+
import org.apache.spark.sql.types.{DataType, StructType}
19+
20+
/**
21+
* A custom Spark SQL parser that delegates query parsing and planning to the Unified Query
22+
* Planner. It converts unified queries into Spark SQL queries for execution, and falls back to
23+
* the default Spark parser when the input query is not supported.
24+
*
25+
* @param context
26+
* The unified query context (by-name parameter to allow deferred construction by the caller)
27+
* @param sparkParser
28+
* The underlying Spark SQL parser to delegate non-PPL queries
29+
*/
30+
class UnifiedQuerySparkParser(context: => UnifiedQueryContext, sparkParser: ParserInterface)
31+
extends ParserInterface
32+
with Logging {
33+
34+
/** Lazily initialized planner that converts PPL queries to unified logical plans. */
35+
private lazy val unifiedQueryPlanner = new UnifiedQueryPlanner(context)
36+
37+
/** Transpiler that converts unified plan to Spark SQL using Spark SQL dialect. */
38+
private val sparkSqlTranspiler = UnifiedQueryTranspiler
39+
.builder()
40+
.dialect(OpenSearchSparkSqlDialect.DEFAULT)
41+
.build()
42+
43+
/**
44+
* Parses a query string into a Spark LogicalPlan. Attempts to parse as PPL and transpile to
45+
* Spark SQL. If parsing fails with a SyntaxCheckException, delegates to the underlying Spark
46+
* parser.
47+
*
48+
* @param query
49+
* The query string
50+
* @return
51+
* Spark LogicalPlan
52+
*/
53+
override def parsePlan(query: String): LogicalPlan = {
54+
try {
55+
val unifiedPlan = unifiedQueryPlanner.plan(query)
56+
val sqlText = sparkSqlTranspiler.toSql(unifiedPlan)
57+
sparkParser.parsePlan(sqlText)
58+
} catch {
59+
// Fall back to Spark parser if unified query planner cannot handle
60+
case _: SyntaxCheckException => sparkParser.parsePlan(query)
61+
// Unwrap IllegalStateException to propagate the root cause
62+
case e: IllegalStateException if e.getCause != null => throw e.getCause
63+
}
64+
}
65+
66+
// Delegate all other ParserInterface methods to the underlying Spark parser
67+
68+
override def parseExpression(sqlText: String): Expression =
69+
sparkParser.parseExpression(sqlText)
70+
71+
override def parseTableIdentifier(sqlText: String): TableIdentifier =
72+
sparkParser.parseTableIdentifier(sqlText)
73+
74+
override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier =
75+
sparkParser.parseFunctionIdentifier(sqlText)
76+
77+
override def parseMultipartIdentifier(sqlText: String): Seq[String] =
78+
sparkParser.parseMultipartIdentifier(sqlText)
79+
80+
override def parseTableSchema(sqlText: String): StructType =
81+
sparkParser.parseTableSchema(sqlText)
82+
83+
override def parseDataType(sqlText: String): DataType =
84+
sparkParser.parseDataType(sqlText)
85+
86+
override def parseQuery(sqlText: String): LogicalPlan =
87+
sparkParser.parseQuery(sqlText)
88+
}

0 commit comments

Comments
 (0)