Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/license.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
- run: |
build/mvn org.apache.rat:apache-rat-plugin:check \
-Ptpcds -Pkubernetes-it \
-Pspark-3.3 -Pspark-3.4 -Pspark-3.5 -Pspark-4.0
-Pspark-3.3 -Pspark-3.4 -Pspark-3.5 -Pspark-4.0 -Pspark-4.1
- name: Upload rat report
if: failure()
uses: actions/upload-artifact@v4
Expand Down
20 changes: 19 additions & 1 deletion .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ jobs:
java:
- 8
- 17
python:
- '3.9'
spark:
- '3.3'
- '3.4'
Expand All @@ -60,25 +62,41 @@ jobs:
comment: ["normal"]
include:
- java: 21
python: '3.11'
spark: '4.0'
spark-archive: '-Pscala-2.13'
exclude-tags: ''
comment: 'normal'
- java: 21
python: '3.11'
spark: '4.1'
spark-archive: '-Pscala-2.13'
exclude-tags: ''
comment: 'normal'
- java: 8
python: '3.9'
spark: '3.5'
spark-archive: '-Dspark.archive.mirror=https://www.apache.org/dyn/closer.lua/spark/spark-3.3.3 -Dspark.archive.name=spark-3.3.3-bin-hadoop3.tgz -Pzookeeper-3.6'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.3-binary'
- java: 8
python: '3.9'
spark: '3.5'
spark-archive: '-Dspark.archive.mirror=https://www.apache.org/dyn/closer.lua/spark/spark-3.4.3 -Dspark.archive.name=spark-3.4.3-bin-hadoop3.tgz -Pzookeeper-3.6'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.4-binary'
- java: 17
python: '3.11'
spark: '3.5'
spark-archive: '-Pscala-2.13 -Dspark.archive.mirror=https://www.apache.org/dyn/closer.lua/spark/spark-4.0.1 -Dspark.archive.name=spark-4.0.1-bin-hadoop3.tgz'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-4.0-binary'
- java: 17
python: '3.11'
spark: '3.5'
spark-archive: '-Pscala-2.13 -Dspark.archive.mirror=https://www.apache.org/dyn/closer.lua/spark/spark-4.1.0 -Dspark.archive.name=spark-4.1.0-bin-hadoop3.tgz'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-4.1-binary'
env:
SPARK_LOCAL_IP: localhost
steps:
Expand All @@ -100,7 +118,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.9'
python-version: ${{ matrix.python }}
- name: Build and test Kyuubi and Spark with maven w/o linters
run: |
if [[ "${{ matrix.java }}" == "8" && "${{ matrix.spark }}" == "3.5" && "${{ matrix.spark-archive }}" == "" ]]; then
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/style.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ jobs:
build/mvn clean install -pl extensions/spark/kyuubi-extension-spark-3-4 -Pspark-3.4
build/mvn clean install -pl extensions/spark/kyuubi-extension-spark-3-5,extensions/spark/kyuubi-spark-connector-hive -Pspark-3.5
build/mvn clean install -pl extensions/spark/kyuubi-extension-spark-4-0 -Pspark-4.0 -Pscala-2.13
build/mvn clean install -pl extensions/spark/kyuubi-extension-spark-4-1 -Pspark-4.1 -Pscala-2.13

- name: Scalastyle with maven
id: scalastyle-check
Expand Down
14 changes: 12 additions & 2 deletions build/release/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,26 @@ upload_svn_staging() {
}

upload_nexus_staging() {
# Spark Extension Plugin for Spark 3.3
# Spark Extension Plugin for Spark 3.3 and Scala 2.12
${KYUUBI_DIR}/build/mvn clean deploy -DskipTests -Papache-release,flink-provided,spark-provided,hive-provided,spark-3.3 \
-s "${KYUUBI_DIR}/build/release/asf-settings.xml" \
-pl extensions/spark/kyuubi-extension-spark-3-3 -am

# Spark Extension Plugin for Spark 3.4
# Spark Extension Plugin for Spark 3.4 and Scala 2.12
${KYUUBI_DIR}/build/mvn clean deploy -DskipTests -Papache-release,flink-provided,spark-provided,hive-provided,spark-3.4 \
-s "${KYUUBI_DIR}/build/release/asf-settings.xml" \
-pl extensions/spark/kyuubi-extension-spark-3-4 -am

# Spark Extension Plugin for Spark 4.0 and Scala 2.13
${KYUUBI_DIR}/build/mvn clean deploy -DskipTests -Papache-release,flink-provided,spark-provided,hive-provided,spark-4.0,scala-2.13 \
-s "${KYUUBI_DIR}/build/release/asf-settings.xml" \
-pl extensions/spark/kyuubi-extension-spark-4-0 -am

# Spark Extension Plugin for Spark 4.1 and Scala 2.13
${KYUUBI_DIR}/build/mvn clean deploy -DskipTests -Papache-release,flink-provided,spark-provided,hive-provided,spark-4.1,scala-2.13 \
-s "${KYUUBI_DIR}/build/release/asf-settings.xml" \
-pl extensions/spark/kyuubi-extension-spark-4-1 -am

# Spark Hive/TPC-DS/TPC-H Connector built with default Spark version (3.5) and Scala 2.13
${KYUUBI_DIR}/build/mvn clean deploy -DskipTests -Papache-release,flink-provided,spark-provided,hive-provided,spark-3.5,scala-2.13 \
-s "${KYUUBI_DIR}/build/release/asf-settings.xml" \
Expand Down
13 changes: 13 additions & 0 deletions dev/kyuubi-codecov/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,19 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-4.1</id>
<dependencies>
<!-- TODO: support Spark extension -->
<!-- TODO: support KSHC -->
<!-- TODO: support authz -->
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-lineage_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>codecov</id>
<build>
Expand Down
2 changes: 1 addition & 1 deletion dev/reformat
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ set -x

KYUUBI_HOME="$(cd "`dirname "$0"`/.."; pwd)"

PROFILES="-Pflink-provided,hive-provided,spark-provided,spark-4.0,spark-3.5,spark-3.4,spark-3.3,tpcds,kubernetes-it"
PROFILES="-Pflink-provided,hive-provided,spark-provided,spark-4.1,spark-4.0,spark-3.5,spark-3.4,spark-3.3,tpcds,kubernetes-it"

# python style checks rely on `black` in path
if ! command -v black &> /dev/null
Expand Down
32 changes: 16 additions & 16 deletions docs/quick_start/quick_start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,23 @@ For quick start deployment, we need to prepare the following stuffs:
These essential components are JVM-based applications. So, the JRE needs to be
pre-installed and the ``JAVA_HOME`` is correctly set to each component.

================ ============ ==================== =======================================================
Component Role Version Remarks
================ ============ ==================== =======================================================
**Java** JRE 8/11/17 Officially released against JDK8
**Kyuubi** Gateway \ |release| \ - Kyuubi Server
Engine lib - Kyuubi Engine
Beeline - Kyuubi Beeline
**Spark** Engine 3.3 to 3.5, 4.0 A Spark distribution
**Flink** Engine 1.17 to 1.20 A Flink distribution
**Trino** Engine N/A A Trino cluster allows to access via trino-client v411
**Doris** Engine N/A A Doris cluster
**Hive** Engine - 2.1-cdh6/2.3/3.1 - A Hive distribution
Metastore - N/A - An optional and external metadata store,
whose version is decided by engines
================ ============ ======================== =======================================================
Component Role Version Remarks
================ ============ ======================== =======================================================
**Java** JRE 8, 11, 17 Officially released against JDK8
**Kyuubi** Gateway \ |release| \ - Kyuubi Server
Engine lib - Kyuubi Engine
Beeline - Kyuubi Beeline
**Spark** Engine 3.3 to 3.5, 4.0 to 4.1 A Spark distribution
**Flink** Engine 1.17 to 1.20 A Flink distribution
**Trino** Engine N/A A Trino cluster allows to access via trino-client v411
**Doris** Engine N/A A Doris cluster
**Hive** Engine - 2.1-cdh6, 2.3, 3.1 - A Hive distribution
Metastore - N/A - An optional and external metadata store,
whose version is decided by engines
**Zookeeper** HA >=3.4.x
**Disk** Storage N/A N/A
================ ============ ==================== =======================================================
**Disk** Storage N/A N/A
================ ============ ======================== =======================================================

The other internal or external parts listed in the above sheet can be used individually
or all together. For example, you can use Kyuubi, Spark and Flink to build a streaming
Expand Down
4 changes: 3 additions & 1 deletion extensions/spark/kyuubi-spark-authz/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ build/mvn clean package -DskipTests -pl :kyuubi-spark-authz_2.12 -am -Dspark.ver

`-Dspark.version=`

- [x] master
- [ ] 4.1.x
- [ ] 4.0.x
- [x] 3.5.x (default)
- [x] 3.4.x
- [x] 3.3.x
Expand All @@ -46,6 +47,7 @@ build/mvn clean package -DskipTests -pl :kyuubi-spark-authz_2.12 -am -Dspark.ver

`-Dranger.version=`

- [ ] 2.7.x
- [x] 2.6.x (default)
- [x] 2.5.x
- [x] 2.4.x
Expand Down
1 change: 1 addition & 0 deletions extensions/spark/kyuubi-spark-lineage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ build/mvn clean package -DskipTests -pl :kyuubi-spark-lineage_2.12 -am -Dspark.v

`-Dspark.version=`

- [x] 4.1.x
- [x] 4.0.x
- [x] 3.5.x (default)
- [x] 3.4.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,10 @@ trait LineageParser {
val matchedActions = getField[Seq[MergeAction]](plan, "matchedActions")
val notMatchedActions = getField[Seq[MergeAction]](plan, "notMatchedActions")
val allAssignments = (matchedActions ++ notMatchedActions).collect {
case UpdateAction(_, assignments) => assignments
case InsertAction(_, assignments) => assignments
case ua: UpdateAction => ua.assignments
case ia: InsertAction => ia.assignments
}.flatten
val nextColumnsLlineage = ListMap(allAssignments.map { assignment =>
val nextColumnsLineage = ListMap(allAssignments.map { assignment =>
(
assignment.key.asInstanceOf[Attribute],
assignment.value.references)
Expand All @@ -379,11 +379,11 @@ trait LineageParser {
val sourceTable = getField[LogicalPlan](plan, "sourceTable")
val targetColumnsLineage = extractColumnsLineage(
targetTable,
nextColumnsLlineage.map { case (k, _) => (k, AttributeSet(k)) },
nextColumnsLineage.map { case (k, _) => (k, AttributeSet(k)) },
inputTablesByPlan)
val sourceColumnsLineage = extractColumnsLineage(
sourceTable,
nextColumnsLlineage,
nextColumnsLineage,
inputTablesByPlan)
val targetColumnsWithTargetTable = targetColumnsLineage.values.flatten.map { column =>
val unquotedQualifiedName = (column.qualifier :+ column.name).mkString(".")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,28 @@

package org.apache.kyuubi.engine.spark.operation

import java.lang.{Boolean => JBoolean}

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.kyuubi.SparkUtilsHelper
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.execution.CommandExecutionMode
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.{LINEAGE_PARSER_PLUGIN_PROVIDER, OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf
import org.apache.kyuubi.engine.spark.operation.PlanOnlyStatement._
import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError}
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.reflect.DynMethods

/**
* Perform the statement parsing, analyzing or optimizing only without executing it
Expand Down Expand Up @@ -110,11 +115,8 @@ class PlanOnlyStatement(
spark.sessionState.analyzer.checkAnalysis(analyzed)
val optimized = spark.sessionState.optimizer.execute(analyzed)
optimized.stats
iter = new IterableFetchIterator(Seq(Row(optimized.treeString(
verbose = true,
addSuffix = true,
SQLConf.get.maxToStringFields,
printOperatorId = false))))
iter = new IterableFetchIterator(
Seq(Row(treeString(optimized, verbose = true, addSuffix = true))))
case PhysicalMode =>
val physical = spark.sessionState.executePlan(plan, CommandExecutionMode.SKIP).sparkPlan
iter = new IterableFetchIterator(Seq(Row(physical.toString())))
Expand Down Expand Up @@ -184,3 +186,33 @@ class PlanOnlyStatement(
}

}

object PlanOnlyStatement {

private val uboundTreeStringMehod = DynMethods.builder("treeString")
.impl( // SPARK-52065 (4.1.0)
classOf[TreeNode[_]],
classOf[Boolean],
classOf[Boolean],
classOf[Int],
classOf[Boolean],
classOf[Boolean])
.impl(
classOf[TreeNode[_]],
classOf[Boolean],
classOf[Boolean],
classOf[Int],
classOf[Boolean])
.build()

def treeString(
tree: TreeNode[_],
verbose: JBoolean,
addSuffix: JBoolean = false,
maxFields: Integer = SQLConf.get.maxToStringFields,
printOperatorId: JBoolean = false,
printOutputColumns: JBoolean = false): String = {
uboundTreeStringMehod.bind(tree)
.invoke(verbose, addSuffix, maxFields, printOperatorId, printOutputColumns)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

import org.apache.kyuubi.Logging
import org.apache.kyuubi.util.reflect.{DynClasses, DynMethods}

/**
* A place to invoke non-public APIs of [[Utils]], anything to be added here need to
Expand All @@ -37,11 +38,21 @@ object SparkUtilsHelper extends Logging {
Utils.redact(regex, text)
}

private val readOnlySparkConfCls = DynClasses.builder()
.impl("org.apache.spark.ReadOnlySparkConf")
.orNull()
.build()

private val getLocalDirMethod = DynMethods.builder("getLocalDir")
.impl(Utils.getClass, readOnlySparkConfCls) // SPARK-53459 (4.1.0)
.impl(Utils.getClass, classOf[SparkConf])
.build(Utils)

/**
* Get the path of a temporary directory.
*/
def getLocalDir(conf: SparkConf): String = {
Utils.getLocalDir(conf)
getLocalDirMethod.invoke(conf)
}

def classesArePresent(className: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
}

test("arrow-based query metrics") {
// TODO: the issue is being investigated by Fu Chen
assume(SPARK_ENGINE_RUNTIME_VERSION < "4.1")
val listener = new SQLMetricsListener
withJdbcStatement() { statement =>
withSparkListener(listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
processBuilder.start
eventually(timeout(90.seconds), interval(500.milliseconds)) {
val error = processBuilder.getError
assert(error.getMessage.contains(
"java.lang.IllegalArgumentException: spark.ui.port should be int, but was abc"))
assert(error.getMessage.contains("spark.ui.port should be int") ||
error.getMessage.contains("INVALID_CONF_VALUE.TYPE_MISMATCH"))
assert(error.isInstanceOf[KyuubiSQLException])
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
"--forward")
result = testPrematureExitForControlCli(logArgs, "")
assert(result.contains(s"Submitted application: $sparkBatchTestAppName"))
assert(result.contains("Shutdown hook called"))
assert(result.contains("Successfully stopped SparkContext"))
Copy link
Member Author

@pan3793 pan3793 Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-54259 demotes the log level of "Shutdown hook called" to debug

}

test("submit batch test") {
Expand All @@ -272,7 +272,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
ldapUserPasswd)
val result = testPrematureExitForControlCli(submitArgs, "")
assert(result.contains(s"Submitted application: $sparkBatchTestAppName"))
assert(result.contains("Shutdown hook called"))
assert(result.contains("Successfully stopped SparkContext"))
}

test("submit batch test with waitCompletion=false") {
Expand All @@ -289,7 +289,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
s"${CtlConf.CTL_BATCH_LOG_QUERY_INTERVAL.key}=100")
val result = testPrematureExitForControlCli(submitArgs, "")
assert(result.contains("bin/spark-submit"))
assert(!result.contains("Shutdown hook called"))
assert(!result.contains("Successfully stopped SparkContext"))
}

test("list batch test") {
Expand Down
Loading
Loading