From 8c7131e0f36e08a1ea0f7243281cb4c9491dcccb Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Mon, 11 Mar 2024 22:20:22 -0700 Subject: [PATCH 1/6] minimal support --- .../apache/spark/sql/delta/TableFeature.scala | 11 +- .../spark/sql/delta/schema/SchemaUtils.scala | 8 ++ .../spark/sql/delta/util/PartitionUtils.scala | 2 +- .../spark/sql/delta/DeltaVariantSuite.scala | 112 ++++++++++++++++++ 4 files changed, 131 insertions(+), 2 deletions(-) create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index f58666d946a..edaf646b396 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -359,7 +359,8 @@ object TableFeature { // managed-commits are under development and only available in testing. ManagedCommitTableFeature, InCommitTimestampTableFeature, - TypeWideningTableFeature) + TypeWideningTableFeature, + VariantTypeTableFeature) } val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap require(features.size == featureMap.size, "Lowercase feature names must not duplicate.") @@ -494,6 +495,14 @@ object IdentityColumnsTableFeature } } +object VariantTypeTableFeature extends ReaderWriterFeature(name = "variantType-dev") + with FeatureAutomaticallyEnabledByMetadata { + override def metadataRequiresFeatureToBeEnabled( + metadata: Metadata, spark: SparkSession): Boolean = { + SchemaUtils.checkForVariantTypeColumnsRecursively(metadata.schema) + } +} + object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz") with FeatureAutomaticallyEnabledByMetadata { override def metadataRequiresFeatureToBeEnabled( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala index c8dd66af470..37d5ad5a2c8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala @@ -1263,6 +1263,13 @@ def normalizeColumnNamesInDataType( unsupportedDataTypes.toSeq } + /** + * Find VariantType columns in the table schema. + */ + def checkForVariantTypeColumnsRecursively(schema: StructType): Boolean = { + SchemaUtils.typeExistsRecursively(schema)(_.isInstanceOf[VariantType]) + } + /** * Find TimestampNTZ columns in the table schema. */ @@ -1302,6 +1309,7 @@ def normalizeColumnNamesInDataType( case DateType => case TimestampType => case TimestampNTZType => + case VariantType => case BinaryType => case _: DecimalType => case a: ArrayType => diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala index 6f870e30ca5..698f89904b3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala @@ -605,7 +605,7 @@ private[delta] object PartitionUtils { partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach { field => field.dataType match { - case _: AtomicType => // OK + case a: AtomicType if !a.isInstanceOf[VariantType] => // OK case _ => throw DeltaErrors.cannotUseDataTypeForPartitionColumnError(field) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala new file mode 100644 index 00000000000..1334bfa6923 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala @@ -0,0 +1,112 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.SparkThrowable +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils} +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.types.StructType + +class DeltaVariantSuite + extends QueryTest + with DeltaSQLCommandTest { + + private def getProtocolForTable(table: String): Protocol = { + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table)) + deltaLog.unsafeVolatileSnapshot.protocol + } + + test("create a new table with Variant, higher protocol and feature should be picked.") { + withTable("tbl") { + sql("CREATE TABLE tbl(s STRING, v VARIANT) USING DELTA") + sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))") + // TODO(r.chen): Enable once `parse_json` is properly implemented in OSS Spark. + // assert(spark.table("tbl").selectExpr("v::int").head == Row(99)) + assert( + getProtocolForTable("tbl") == + VariantTypeTableFeature.minProtocolVersion.withFeature(VariantTypeTableFeature) + ) + } + } + + test("creating a table without Variant should use the usual minimum protocol") { + withTable("tbl") { + sql("CREATE TABLE tbl(s STRING, i INTEGER) USING DELTA") + assert(getProtocolForTable("tbl") == Protocol(1, 2)) + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl")) + assert( + !deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(VariantTypeTableFeature), + s"Table tbl contains VariantTypeFeature descriptor when its not supposed to" + ) + } + } + + test("add a new Variant column should upgrade to the correct protocol versions") { + withTable("tbl") { + sql("CREATE TABLE tbl(s STRING) USING delta") + assert(getProtocolForTable("tbl") == Protocol(1, 2)) + + // Should throw error + val e = intercept[SparkThrowable] { + sql("ALTER TABLE tbl ADD COLUMN v VARIANT") + } + // capture the existing protocol here. + // we will check the error message later in this test as we need to compare the + // expected schema and protocol + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl")) + val currentProtocol = deltaLog.unsafeVolatileSnapshot.protocol + val currentFeatures = currentProtocol.implicitlyAndExplicitlySupportedFeatures + .map(_.name) + .toSeq + .sorted + .mkString(", ") + + // add table feature + sql( + s"ALTER TABLE tbl " + + s"SET TBLPROPERTIES('delta.feature.variantType-dev' = 'supported')" + ) + + sql("ALTER TABLE tbl ADD COLUMN v VARIANT") + + // check previously thrown error message + checkError( + e, + errorClass = "DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT", + parameters = Map( + "unsupportedFeatures" -> VariantTypeTableFeature.name, + "supportedFeatures" -> currentFeatures + ) + ) + + sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))") + // TODO(r.chen): Enable once `parse_json` is properly implemented in OSS Spark. + // assert(spark.table("tbl").selectExpr("v::int").head == Row(99)) + + assert( + getProtocolForTable("tbl") == + VariantTypeTableFeature.minProtocolVersion + .withFeature(VariantTypeTableFeature) + .withFeature(InvariantsTableFeature) + .withFeature(AppendOnlyTableFeature) + ) + } + } +} From a84e8f4f03599b92ad4be47783653fd2340c5e66 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Tue, 19 Mar 2024 11:37:01 -0700 Subject: [PATCH 2/6] test --- .../scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala index 1334bfa6923..b19e29b52fa 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala @@ -36,7 +36,7 @@ class DeltaVariantSuite withTable("tbl") { sql("CREATE TABLE tbl(s STRING, v VARIANT) USING DELTA") sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))") - // TODO(r.chen): Enable once `parse_json` is properly implemented in OSS Spark. + // TODO(r.chen): Enable once variant casting is properly implemented in OSS Spark. // assert(spark.table("tbl").selectExpr("v::int").head == Row(99)) assert( getProtocolForTable("tbl") == @@ -97,7 +97,7 @@ class DeltaVariantSuite ) sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))") - // TODO(r.chen): Enable once `parse_json` is properly implemented in OSS Spark. + // TODO(r.chen): Enable once variant casting is properly implemented in OSS Spark. // assert(spark.table("tbl").selectExpr("v::int").head == Row(99)) assert( From 5c01b9a54cc6afed858e05833b357c86e3e5fc2f Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Thu, 11 Apr 2024 22:19:25 -0700 Subject: [PATCH 3/6] cross compile --- build.sbt | 2 + .../scala-spark-3.5/shims/VariantShim.scala | 26 +++++ .../shims/VariantShim.scala | 23 ++++ .../spark/sql/delta/schema/SchemaUtils.scala | 4 +- .../spark/sql/delta/util/PartitionUtils.scala | 2 +- .../shims/DeltaVariantSparkOnlyTests.scala | 19 +++ .../shims/DeltaVariantSparkOnlyTests.scala | 109 ++++++++++++++++++ .../spark/sql/delta/DeltaVariantSuite.scala | 95 +-------------- 8 files changed, 183 insertions(+), 97 deletions(-) create mode 100644 spark/src/main/scala-spark-3.5/shims/VariantShim.scala create mode 100644 spark/src/main/scala-spark-master/shims/VariantShim.scala create mode 100644 spark/src/test/scala-spark-3.5/shims/DeltaVariantSparkOnlyTests.scala create mode 100644 spark/src/test/scala-spark-master/shims/DeltaVariantSparkOnlyTests.scala diff --git a/build.sbt b/build.sbt index 508cc4b5dfc..3e79b965754 100644 --- a/build.sbt +++ b/build.sbt @@ -138,6 +138,7 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match { // For adding staged Spark RC versions, e.g.: // resolvers += "Apache Spark 3.5.0 (RC1) Staging" at "https://repository.apache.org/content/repositories/orgapachespark-1444/", Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-3.5", + Test / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "test" / "scala-spark-3.5", Antlr4 / antlr4Version := "4.9.3", // Java-/Scala-/Uni-Doc Settings @@ -153,6 +154,7 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match { targetJvm := "17", resolvers += "Spark master staging" at "https://repository.apache.org/content/groups/snapshots/", Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-master", + Test / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "test" / "scala-spark-master", Antlr4 / antlr4Version := "4.13.1", Test / javaOptions ++= Seq( // Copied from SparkBuild.scala to support Java 17 for unit tests (see apache/spark#34153) diff --git a/spark/src/main/scala-spark-3.5/shims/VariantShim.scala b/spark/src/main/scala-spark-3.5/shims/VariantShim.scala new file mode 100644 index 00000000000..d802146eba1 --- /dev/null +++ b/spark/src/main/scala-spark-3.5/shims/VariantShim.scala @@ -0,0 +1,26 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types + +object VariantShim { + + /** + * Spark's variant type is implemented for Spark 4.0 and is not implemented in Spark 3.5. Thus, + * any Spark 3.5 DataType cannot be a variant type. + */ + def isTypeVariant(dt: DataType): Boolean = false +} diff --git a/spark/src/main/scala-spark-master/shims/VariantShim.scala b/spark/src/main/scala-spark-master/shims/VariantShim.scala new file mode 100644 index 00000000000..7853eb593c6 --- /dev/null +++ b/spark/src/main/scala-spark-master/shims/VariantShim.scala @@ -0,0 +1,23 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types + +object VariantShim { + + /** Spark's variant type is only implemented in Spark 4.0 and above.*/ + def isTypeVariant(dt: DataType): Boolean = dt.isInstanceOf[VariantType] +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala index 37d5ad5a2c8..dde29afbbd9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala @@ -1267,7 +1267,7 @@ def normalizeColumnNamesInDataType( * Find VariantType columns in the table schema. */ def checkForVariantTypeColumnsRecursively(schema: StructType): Boolean = { - SchemaUtils.typeExistsRecursively(schema)(_.isInstanceOf[VariantType]) + SchemaUtils.typeExistsRecursively(schema)(VariantShim.isTypeVariant(_)) } /** @@ -1309,7 +1309,7 @@ def normalizeColumnNamesInDataType( case DateType => case TimestampType => case TimestampNTZType => - case VariantType => + case dt if VariantShim.isTypeVariant(dt) => case BinaryType => case _: DecimalType => case a: ArrayType => diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala index 698f89904b3..87f72fc1a59 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala @@ -605,7 +605,7 @@ private[delta] object PartitionUtils { partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach { field => field.dataType match { - case a: AtomicType if !a.isInstanceOf[VariantType] => // OK + case a: AtomicType if !VariantShim.isTypeVariant(a) => // OK case _ => throw DeltaErrors.cannotUseDataTypeForPartitionColumnError(field) } } diff --git a/spark/src/test/scala-spark-3.5/shims/DeltaVariantSparkOnlyTests.scala b/spark/src/test/scala-spark-3.5/shims/DeltaVariantSparkOnlyTests.scala new file mode 100644 index 00000000000..a7edc237bca --- /dev/null +++ b/spark/src/test/scala-spark-3.5/shims/DeltaVariantSparkOnlyTests.scala @@ -0,0 +1,19 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +trait DeltaVariantSparkOnlyTests { self: DeltaVariantSuite => } diff --git a/spark/src/test/scala-spark-master/shims/DeltaVariantSparkOnlyTests.scala b/spark/src/test/scala-spark-master/shims/DeltaVariantSparkOnlyTests.scala new file mode 100644 index 00000000000..09ddabbee48 --- /dev/null +++ b/spark/src/test/scala-spark-master/shims/DeltaVariantSparkOnlyTests.scala @@ -0,0 +1,109 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.SparkThrowable +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils} +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.types.StructType + +trait DeltaVariantSparkOnlyTests + extends QueryTest + with DeltaSQLCommandTest { self: DeltaVariantSuite => + private def getProtocolForTable(table: String): Protocol = { + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table)) + deltaLog.unsafeVolatileSnapshot.protocol + } + + test("create a new table with Variant, higher protocol and feature should be picked.") { + withTable("tbl") { + sql("CREATE TABLE tbl(s STRING, v VARIANT) USING DELTA") + sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))") + assert(spark.table("tbl").selectExpr("v::int").head == Row(99)) + assert( + getProtocolForTable("tbl") == + VariantTypeTableFeature.minProtocolVersion.withFeature(VariantTypeTableFeature) + ) + } + } + + test("creating a table without Variant should use the usual minimum protocol") { + withTable("tbl") { + sql("CREATE TABLE tbl(s STRING, i INTEGER) USING DELTA") + assert(getProtocolForTable("tbl") == Protocol(1, 2)) + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl")) + assert( + !deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(VariantTypeTableFeature), + s"Table tbl contains VariantTypeFeature descriptor when its not supposed to" + ) + } + } + + test("add a new Variant column should upgrade to the correct protocol versions") { + withTable("tbl") { + sql("CREATE TABLE tbl(s STRING) USING delta") + assert(getProtocolForTable("tbl") == Protocol(1, 2)) + + // Should throw error + val e = intercept[SparkThrowable] { + sql("ALTER TABLE tbl ADD COLUMN v VARIANT") + } + // capture the existing protocol here. + // we will check the error message later in this test as we need to compare the + // expected schema and protocol + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl")) + val currentProtocol = deltaLog.unsafeVolatileSnapshot.protocol + val currentFeatures = currentProtocol.implicitlyAndExplicitlySupportedFeatures + .map(_.name) + .toSeq + .sorted + .mkString(", ") + + // add table feature + sql( + s"ALTER TABLE tbl " + + s"SET TBLPROPERTIES('delta.feature.variantType-dev' = 'supported')" + ) + + sql("ALTER TABLE tbl ADD COLUMN v VARIANT") + + // check previously thrown error message + checkError( + e, + errorClass = "DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT", + parameters = Map( + "unsupportedFeatures" -> VariantTypeTableFeature.name, + "supportedFeatures" -> currentFeatures + ) + ) + + sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))") + assert(spark.table("tbl").selectExpr("v::int").head == Row(99)) + + assert( + getProtocolForTable("tbl") == + VariantTypeTableFeature.minProtocolVersion + .withFeature(VariantTypeTableFeature) + .withFeature(InvariantsTableFeature) + .withFeature(AppendOnlyTableFeature) + ) + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala index b19e29b52fa..6a95026d335 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala @@ -16,97 +16,4 @@ package org.apache.spark.sql.delta -import org.apache.spark.SparkThrowable -import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils} -import org.apache.spark.sql.delta.test.DeltaSQLCommandTest -import org.apache.spark.sql.types.StructType - -class DeltaVariantSuite - extends QueryTest - with DeltaSQLCommandTest { - - private def getProtocolForTable(table: String): Protocol = { - val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table)) - deltaLog.unsafeVolatileSnapshot.protocol - } - - test("create a new table with Variant, higher protocol and feature should be picked.") { - withTable("tbl") { - sql("CREATE TABLE tbl(s STRING, v VARIANT) USING DELTA") - sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))") - // TODO(r.chen): Enable once variant casting is properly implemented in OSS Spark. - // assert(spark.table("tbl").selectExpr("v::int").head == Row(99)) - assert( - getProtocolForTable("tbl") == - VariantTypeTableFeature.minProtocolVersion.withFeature(VariantTypeTableFeature) - ) - } - } - - test("creating a table without Variant should use the usual minimum protocol") { - withTable("tbl") { - sql("CREATE TABLE tbl(s STRING, i INTEGER) USING DELTA") - assert(getProtocolForTable("tbl") == Protocol(1, 2)) - - val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl")) - assert( - !deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(VariantTypeTableFeature), - s"Table tbl contains VariantTypeFeature descriptor when its not supposed to" - ) - } - } - - test("add a new Variant column should upgrade to the correct protocol versions") { - withTable("tbl") { - sql("CREATE TABLE tbl(s STRING) USING delta") - assert(getProtocolForTable("tbl") == Protocol(1, 2)) - - // Should throw error - val e = intercept[SparkThrowable] { - sql("ALTER TABLE tbl ADD COLUMN v VARIANT") - } - // capture the existing protocol here. - // we will check the error message later in this test as we need to compare the - // expected schema and protocol - val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl")) - val currentProtocol = deltaLog.unsafeVolatileSnapshot.protocol - val currentFeatures = currentProtocol.implicitlyAndExplicitlySupportedFeatures - .map(_.name) - .toSeq - .sorted - .mkString(", ") - - // add table feature - sql( - s"ALTER TABLE tbl " + - s"SET TBLPROPERTIES('delta.feature.variantType-dev' = 'supported')" - ) - - sql("ALTER TABLE tbl ADD COLUMN v VARIANT") - - // check previously thrown error message - checkError( - e, - errorClass = "DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT", - parameters = Map( - "unsupportedFeatures" -> VariantTypeTableFeature.name, - "supportedFeatures" -> currentFeatures - ) - ) - - sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))") - // TODO(r.chen): Enable once variant casting is properly implemented in OSS Spark. - // assert(spark.table("tbl").selectExpr("v::int").head == Row(99)) - - assert( - getProtocolForTable("tbl") == - VariantTypeTableFeature.minProtocolVersion - .withFeature(VariantTypeTableFeature) - .withFeature(InvariantsTableFeature) - .withFeature(AppendOnlyTableFeature) - ) - } - } -} +class DeltaVariantSuite extends DeltaVariantSparkOnlyTests From 265e886ec3ba7cd1f34c98246a5a9bba1415eeb0 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Thu, 11 Apr 2024 22:28:47 -0700 Subject: [PATCH 4/6] style --- spark/src/main/scala-spark-master/shims/VariantShim.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala-spark-master/shims/VariantShim.scala b/spark/src/main/scala-spark-master/shims/VariantShim.scala index 7853eb593c6..63285b58466 100644 --- a/spark/src/main/scala-spark-master/shims/VariantShim.scala +++ b/spark/src/main/scala-spark-master/shims/VariantShim.scala @@ -18,6 +18,6 @@ package org.apache.spark.sql.types object VariantShim { - /** Spark's variant type is only implemented in Spark 4.0 and above.*/ + /** Spark's variant type is only implemented in Spark 4.0 and above. */ def isTypeVariant(dt: DataType): Boolean = dt.isInstanceOf[VariantType] } From 7fddedfaef115fef00b0ac0637e117a27f8a32be Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Tue, 19 Mar 2024 18:55:51 -0700 Subject: [PATCH 5/6] fix compile --- .../io/delta/kernel/internal/SnapshotManagerSuite.scala | 4 ++-- .../io/delta/kernel/defaults/DeltaTableReadsSuite.scala | 4 ++-- .../io/delta/kernel/defaults/LogReplayMetricsSuite.scala | 2 +- .../scala/io/delta/kernel/defaults/utils/TestRow.scala | 2 +- .../scala/io/delta/kernel/defaults/utils/TestUtils.scala | 8 ++++---- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala index 71c2a256eee..6afe5a72939 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala @@ -617,7 +617,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { // corrupt incomplete multi-part checkpoint val corruptedCheckpointStatuses = FileNames.checkpointFileWithParts(logPath, 10, 5).asScala .map(p => FileStatus.of(p.toString, 10, 10)) - .take(4) + .take(4).toSeq val deltas = deltaFileStatuses(10L to 13L) testExpectedError[RuntimeException]( corruptedCheckpointStatuses ++ deltas, @@ -666,7 +666,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { // _last_checkpoint refers to incomplete multi-part checkpoint val corruptedCheckpointStatuses = FileNames.checkpointFileWithParts(logPath, 20, 5).asScala .map(p => FileStatus.of(p.toString, 10, 10)) - .take(4) + .take(4).toSeq testExpectedError[RuntimeException]( files = corruptedCheckpointStatuses ++ deltaFileStatuses(10L to 20L) ++ singularCheckpointFileStatuses(Seq(10L)), diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala index 73017f4eb2a..d58bc5149c2 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala @@ -296,12 +296,12 @@ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils { Seq(TestRow(2), TestRow(2), TestRow(2)), TestRow("2", "2", TestRow(2, 2L)), "2" - ) :: Nil) + ) :: Nil).toSeq checkTable( path = path, expectedAnswer = expectedAnswer, - readCols = readCols + readCols = readCols.toSeq ) } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala index 51193d3ac90..f82eefe7b65 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala @@ -342,7 +342,7 @@ trait FileReadMetrics { self: Object => } } - def getVersionsRead: Seq[Long] = versionsRead + def getVersionsRead: Seq[Long] = versionsRead.toSeq def resetMetrics(): Unit = { versionsRead.clear() diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestRow.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestRow.scala index 661d286a3c9..d63ffa8f8eb 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestRow.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestRow.scala @@ -110,7 +110,7 @@ object TestRow { case _: StructType => TestRow(row.getStruct(i)) case _ => throw new UnsupportedOperationException("unrecognized data type") } - }) + }.toSeq) } def apply(row: SparkRow): TestRow = { diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala index a3c9576aeb7..92fdd02a2e1 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala @@ -73,7 +73,7 @@ trait TestUtils extends Assertions with SQLHelper { while (iter.hasNext) { result.append(iter.next()) } - result + result.toSeq } finally { iter.close() } @@ -153,7 +153,7 @@ trait TestUtils extends Assertions with SQLHelper { // for all primitive types Seq(new Column((basePath :+ field.getName).asJava.toArray(new Array[String](0)))); case _ => Seq.empty - } + }.toSeq } def collectScanFileRows(scan: Scan, tableClient: TableClient = defaultTableClient): Seq[Row] = { @@ -231,7 +231,7 @@ trait TestUtils extends Assertions with SQLHelper { } } } - result + result.toSeq } /** @@ -626,7 +626,7 @@ trait TestUtils extends Assertions with SQLHelper { toSparkType(field.getDataType), field.isNullable ) - }) + }.toSeq) } } From d24a0b158ceb6fa9821c20e056d8e07deac36d79 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Thu, 11 Apr 2024 23:38:01 -0700 Subject: [PATCH 6/6] cross compile spark test dep for kernel defaults --- build.sbt | 53 +++++++++++-------- .../delta/kernel/defaults/utils/TestRow.scala | 3 +- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/build.sbt b/build.sbt index 3e79b965754..a2f5b91eaa5 100644 --- a/build.sbt +++ b/build.sbt @@ -38,6 +38,8 @@ val LATEST_RELEASED_SPARK_VERSION = "3.5.0" val SPARK_MASTER_VERSION = "4.0.0-SNAPSHOT" val sparkVersion = settingKey[String]("Spark version") spark / sparkVersion := getSparkVersion() +kernelDefaults / sparkVersion := getSparkVersion() +goldenTables / sparkVersion := getSparkVersion() // Dependent library versions val defaultSparkVersion = LATEST_RELEASED_SPARK_VERSION @@ -126,6 +128,25 @@ lazy val commonSettings = Seq( unidocSourceFilePatterns := Nil, ) +/** + * Java-/Scala-/Uni-Doc settings aren't working yet against Spark Master. + 1) delta-spark on Spark Master uses JDK 17. delta-iceberg uses JDK 8 or 11. For some reason, + generating delta-spark unidoc compiles delta-iceberg + 2) delta-spark unidoc fails to compile. spark 3.5 is on its classpath. likely due to iceberg + issue above. + */ +def crossSparkProjectSettings(): Seq[Setting[_]] = getSparkVersion() match { + case LATEST_RELEASED_SPARK_VERSION => Seq( + // Java-/Scala-/Uni-Doc Settings + scalacOptions ++= Seq( + "-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc + ), + unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/")) + ) + + case SPARK_MASTER_VERSION => Seq() +} + /** * Note: we cannot access sparkVersion.value here, since that can only be used within a task or * setting macro. @@ -140,12 +161,6 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match { Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-3.5", Test / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "test" / "scala-spark-3.5", Antlr4 / antlr4Version := "4.9.3", - - // Java-/Scala-/Uni-Doc Settings - scalacOptions ++= Seq( - "-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc - ), - unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/")) ) case SPARK_MASTER_VERSION => Seq( @@ -170,13 +185,6 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match { "--add-opens=java.base/sun.security.action=ALL-UNNAMED", "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED" ) - - // Java-/Scala-/Uni-Doc Settings - // This isn't working yet against Spark Master. - // 1) delta-spark on Spark Master uses JDK 17. delta-iceberg uses JDK 8 or 11. For some reason, - // generating delta-spark unidoc compiles delta-iceberg - // 2) delta-spark unidoc fails to compile. spark 3.5 is on its classpath. likely due to iceberg - // issue above. ) } @@ -190,6 +198,7 @@ lazy val spark = (project in file("spark")) sparkMimaSettings, releaseSettings, crossSparkSettings(), + crossSparkProjectSettings(), libraryDependencies ++= Seq( // Adding test classifier seems to break transitive resolution of the core dependencies "org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided", @@ -357,6 +366,7 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults")) scalaStyleSettings, javaOnlyReleaseSettings, Test / javaOptions ++= Seq("-ea"), + crossSparkSettings(), libraryDependencies ++= Seq( "org.apache.hadoop" % "hadoop-client-runtime" % hadoopVersion, "com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5", @@ -373,10 +383,10 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults")) "org.openjdk.jmh" % "jmh-core" % "1.37" % "test", "org.openjdk.jmh" % "jmh-generator-annprocess" % "1.37" % "test", - "org.apache.spark" %% "spark-hive" % defaultSparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests", ), javaCheckstyleSettings("kernel/dev/checkstyle.xml"), // Unidoc settings @@ -1071,14 +1081,15 @@ lazy val goldenTables = (project in file("connectors/golden-tables")) name := "golden-tables", commonSettings, skipReleaseSettings, + crossSparkSettings(), libraryDependencies ++= Seq( // Test Dependencies "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "commons-io" % "commons-io" % "2.8.0" % "test", - "org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test", - "org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests" + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "test", + "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests" ) ) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestRow.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestRow.scala index d63ffa8f8eb..4a7da88d842 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestRow.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestRow.scala @@ -16,6 +16,7 @@ package io.delta.kernel.defaults.utils import scala.collection.JavaConverters._ +import scala.collection.mutable.{Seq => MutableSeq} import org.apache.spark.sql.{types => sparktypes} import org.apache.spark.sql.{Row => SparkRow} import io.delta.kernel.data.{ArrayValue, ColumnVector, MapValue, Row} @@ -133,7 +134,7 @@ object TestRow { case _: sparktypes.BinaryType => obj.asInstanceOf[Array[Byte]] case _: sparktypes.DecimalType => obj.asInstanceOf[java.math.BigDecimal] case arrayType: sparktypes.ArrayType => - obj.asInstanceOf[Seq[Any]] + obj.asInstanceOf[MutableSeq[Any]] .map(decodeCellValue(arrayType.elementType, _)) case mapType: sparktypes.MapType => obj.asInstanceOf[Map[Any, Any]].map { case (k, v) =>