Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ lazy val commonSettings = Seq(

// running `scalafmtAll` includes all subprojects under root
lazy val root = (project in file("."))
.aggregate(flintCommons, flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication, integtest)
.aggregate(flintCommons, flintCore, flintSparkIntegration, pplSparkIntegration, unifiedQuerySparkIntegration, sparkSqlApplication, integtest)
.disablePlugins(AssemblyPlugin)
.settings(name := "flint", publish / skip := true)

Expand Down Expand Up @@ -299,6 +299,43 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
assembly / test := (Test / test).value
)

lazy val unifiedQuerySparkIntegration = (project in file("unified-query-spark-integration"))
.disablePlugins(AssemblyPlugin)
.settings(
commonSettings,
name := "unified-query-spark-integration",
scalaVersion := scala212,
resolvers ++= Seq(
"OpenSearch Snapshots" at "https://ci.opensearch.org/ci/dbc/snapshots/maven/",
),
// Force all Jackson dependencies to use Spark's version to avoid conflicts
dependencyOverrides ++= Seq(
"com.fasterxml.jackson.core" % "jackson-annotations" % jacksonVersion,
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % jacksonVersion
),
libraryDependencies ++= Seq(
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
"org.mockito" %% "mockito-scala" % "1.16.42" % "test",
// unified-query-api dependency from OpenSearch SQL project
"org.opensearch.query" % "unified-query-api" % "2.19.4.0-SNAPSHOT"
excludeAll(
ExclusionRule(organization = "com.fasterxml.jackson.core"),
ExclusionRule(organization = "com.fasterxml.jackson.dataformat"),
ExclusionRule(organization = "com.fasterxml.jackson.module"),
ExclusionRule(organization = "com.fasterxml.jackson.jaxrs"),
ExclusionRule(organization = "org.opensearch"),
ExclusionRule(organization = "com.squareup.okhttp3"),
ExclusionRule(organization = "com.amazonaws"),
ExclusionRule(organization = "com.github.babbel"),
ExclusionRule(organization = "org.apache.logging.log4j"),
ExclusionRule(organization = "org.slf4j"))
exclude("org.opensearch.query", "unified-query-protocol")),
libraryDependencies ++= deps(sparkVersion),
publish / skip := true
)

lazy val IntegrationTest = config("it") extend Test
lazy val AwsIntegrationTest = config("aws-it") extend Test

Expand Down
114 changes: 114 additions & 0 deletions unified-query-spark-integration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Unified Query Spark Integration

This module provides the integration layer between the Calcite-based unified query engine and Apache Spark. It enables consistent query behavior across OpenSearch and Spark by leveraging the unified query parser and transpiler.

## Overview

The unified-query-spark-integration module bridges the gap between OpenSearch's Calcite-based unified query engine and Spark's execution environment. It provides:

- **UnifiedQuerySparkParser**: A custom Spark SQL parser that routes queries through the Calcite engine for transpilation to Spark SQL
- **SparkSchema**: Implements Calcite's Schema interface by bridging Spark SQL catalogs and tables

## Architecture

```
┌─────────────────────────────────┬──────────────────────────────────────┬─────────────────────────────────┐
│ Spark SQL │ Unified Query Integration │ Unified Query API │
├─────────────────────────────────┼──────────────────────────────────────┼─────────────────────────────────┤
│ spark.sql("<query text>") │ │ │
│ │ │ │ │
│ ├───────────────────►│ UnifiedQuerySparkParser │ │
│ │ │ │ │ │
│ │ │ ├────── query text ───────►│ UnifiedQueryPlanner │
│ │ │ │ │ │ │
│ │ │ │ │ ▼ │
│ │ │ │ │ Calcite RelNode │
│ │ │ │ │ │ │
│ │ │ │ │ ▼ │
│ │ │ │ │ UnifiedQueryTranspiler │
│ │ │ │ │ │ │
│ │◄──────────────────────────────────── Spark SQL text ──────┘───────────┘ │
│ ▼ │ │ │
│ Spark built-in SQL parser │ │ │
│ │ │ │ │
│ ▼ │ │ │
│ LogicalPlan │ │ │
│ │ │ │ │
│ ▼ │ │ │
│ Execute │ │ │
└─────────────────────────────────┴──────────────────────────────────────┴─────────────────────────────────┘
```

When a query is submitted:
1. `UnifiedQuerySparkParser.parsePlan()` receives the query
2. `UnifiedQueryPlanner` parses the query and creates a Calcite RelNode (unified logical plan)
3. `UnifiedQueryTranspiler` converts the RelNode to Spark SQL using `OpenSearchSparkSqlDialect`
4. The generated SQL is passed to Spark's native parser for execution
5. If parsing fails (unsupported query), it falls back to the underlying Spark parser

#### Type Mapping

The following table shows how Spark SQL types are mapped to Unified Query types for query processing:

| Spark SQL Type | Unified Query Type | Notes |
|----------------|--------------|-------|
| `BooleanType` | `BOOLEAN` | |
| `ByteType` | `TINYINT` | |
| `ShortType` | `SMALLINT` | |
| `IntegerType` | `INTEGER` | |
| `LongType` | `BIGINT` | |
| `FloatType` | `REAL` | 4-byte single-precision |
| `DoubleType` | `DOUBLE` | |
| `DecimalType(p, s)` | `DECIMAL(p, s)` | Preserves precision and scale |
| `StringType` | `VARCHAR` | |
| `VarcharType(n)` | `VARCHAR(n)` | Preserves length |
| `CharType(n)` | `CHAR(n)` | Preserves length |
| `BinaryType` | `VARBINARY` | |
| `DateType` | `DATE` | |
| `TimestampType` | `TIMESTAMP_WITH_LOCAL_TIME_ZONE` | Timestamp with timezone |
| `TimestampNTZType` | `TIMESTAMP` | Timestamp without timezone |
| `ArrayType(T, nullable)` | `ARRAY<T>` | Preserves element nullability |
| `MapType(K, V, nullable)` | `MAP<K, V>` | Preserves value nullability |
| `StructType` | `RecordType` | Preserves field names and nullability |
| `NullType` | `NULL` | |
| `DayTimeIntervalType` | `INTERVAL_DAY_*` | Maps to appropriate day-time interval |
| `YearMonthIntervalType` | `INTERVAL_YEAR_*` | Maps to appropriate year-month interval |
| `UserDefinedType` | Delegates to `sqlType` | Unwraps to underlying SQL type |
| Unsupported types | `ANY` | Fallback for CalendarIntervalType, ObjectType, etc. |

## Dependencies

This module depends on:
- `unified-query-api`: OpenSearch's Calcite-based unified query engine (from OpenSearch SQL project)
- Apache Spark SQL (provided)
- Apache Calcite (transitive via unified-query-api)

## Build

Build the module with:

```bash
sbt unifiedQuerySparkIntegration/compile
```

Run tests with:

```bash
sbt unifiedQuerySparkIntegration/test
```

## Related Documentation

- [PPL on Spark Architecture](../docs/ppl-lang/PPL-on-Spark.md)

## GitHub Reference

- [opensearch-spark#1136](https://github.com/opensearch-project/opensearch-spark/issues/1136) - Unified Query Spark Integration

## License

See the [LICENSE](../LICENSE) file for our project's licensing.

## Copyright

Copyright OpenSearch Contributors. See [NOTICE](../NOTICE) for details.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.query

import java.util

import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
import org.apache.calcite.schema.{Schema, Table}
import org.apache.calcite.schema.impl.{AbstractSchema, AbstractTable}

import org.apache.spark.sql.SparkSession

/**
* Implements Calcite's Schema interface by bridging Spark SQL catalogs and tables.
*
* Schema hierarchy mirrors Spark's catalog structure:
* {{{
* SparkSchema (catalog)
* +-- SubSchema (database) - dynamic, no existence validation
* +-- Table
* }}}
*
* @param spark
* The current SparkSession
* @param catalogName
* The name of the catalog this schema represents
*/
class SparkSchema(spark: SparkSession, catalogName: String) extends AbstractSchema {

override protected def getSubSchemaMap: util.Map[String, Schema] =
new LazyMap[String, Schema](dbName => buildSubSchema(dbName))

private def buildSubSchema(dbName: String): Schema = new AbstractSchema() {
override def getTableMap: util.Map[String, Table] =
new LazyMap[String, Table](tableName => buildCalciteTable(dbName, tableName))
}

private def buildCalciteTable(dbName: String, tableName: String): Table = {
val quotedTableName = s"`$catalogName`.`$dbName`.`$tableName`"
val sparkTable = spark.table(quotedTableName)
new AbstractTable {
override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
sparkTable.schema.toCalcite(typeFactory)
}
}
}

/** A read-only map that computes values on demand using the provided function. */
private class LazyMap[K, V](valueFn: K => V) extends util.AbstractMap[K, V] {
override def get(key: Any): V = valueFn(key.asInstanceOf[K])

// Returns empty set as iteration is not supported and Calcite only uses get() lookups
override def entrySet(): util.Set[util.Map.Entry[K, V]] = util.Collections.emptySet()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.query

import org.opensearch.sql.api.{UnifiedQueryContext, UnifiedQueryPlanner}
import org.opensearch.sql.api.transpiler.UnifiedQueryTranspiler
import org.opensearch.sql.common.antlr.SyntaxCheckException
import org.opensearch.sql.ppl.calcite.OpenSearchSparkSqlDialect

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{DataType, StructType}

/**
* A custom Spark SQL parser that delegates query parsing and planning to the Unified Query
* Planner. It converts unified queries into Spark SQL queries for execution, and falls back to
* the default Spark parser when the input query is not supported.
*
* @param context
* The unified query context (by-name parameter to allow deferred construction by the caller)
* @param sparkParser
* The underlying Spark SQL parser to delegate non-PPL queries
*/
class UnifiedQuerySparkParser(context: => UnifiedQueryContext, sparkParser: ParserInterface)
extends ParserInterface
with Logging {

/** Lazily initialized planner that converts PPL queries to unified logical plans. */
private lazy val unifiedQueryPlanner = new UnifiedQueryPlanner(context)

/** Transpiler that converts unified plan to Spark SQL using Spark SQL dialect. */
private val sparkSqlTranspiler = UnifiedQueryTranspiler
.builder()
.dialect(OpenSearchSparkSqlDialect.DEFAULT)
.build()

/**
* Parses a query string into a Spark LogicalPlan. Attempts to parse as PPL and transpile to
* Spark SQL. If parsing fails with a SyntaxCheckException, delegates to the underlying Spark
* parser.
*
* @param query
* The query string
* @return
* Spark LogicalPlan
*/
override def parsePlan(query: String): LogicalPlan = {
try {
val unifiedPlan = unifiedQueryPlanner.plan(query)
val sqlText = sparkSqlTranspiler.toSql(unifiedPlan)
sparkParser.parsePlan(sqlText)
} catch {
// Fall back to Spark parser if unified query planner cannot handle
case _: SyntaxCheckException => sparkParser.parsePlan(query)
// Unwrap IllegalStateException to propagate the root cause
case e: IllegalStateException if e.getCause != null => throw e.getCause
}
}

// Delegate all other ParserInterface methods to the underlying Spark parser

override def parseExpression(sqlText: String): Expression =
sparkParser.parseExpression(sqlText)

override def parseTableIdentifier(sqlText: String): TableIdentifier =
sparkParser.parseTableIdentifier(sqlText)

override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier =
sparkParser.parseFunctionIdentifier(sqlText)

override def parseMultipartIdentifier(sqlText: String): Seq[String] =
sparkParser.parseMultipartIdentifier(sqlText)

override def parseTableSchema(sqlText: String): StructType =
sparkParser.parseTableSchema(sqlText)

override def parseDataType(sqlText: String): DataType =
sparkParser.parseDataType(sqlText)

override def parseQuery(sqlText: String): LogicalPlan =
sparkParser.parseQuery(sqlText)
}
Loading
Loading