diff --git a/.github/workflows/spark_test.yaml b/.github/workflows/spark_test.yaml index 1e573e64cfc..8f2c214790b 100644 --- a/.github/workflows/spark_test.yaml +++ b/.github/workflows/spark_test.yaml @@ -6,7 +6,7 @@ jobs: strategy: matrix: # These Scala versions must match those in the build.sbt - scala: [2.12.17, 2.13.8] + scala: [2.13.8] env: SCALA_VERSION: ${{ matrix.scala }} steps: @@ -23,7 +23,7 @@ jobs: uses: actions/setup-java@v3 with: distribution: "zulu" - java-version: "8" + java-version: "17" - name: Cache Scala, SBT uses: actions/cache@v3 with: diff --git a/build.sbt b/build.sbt index 8c759f8747e..d0ed2ef6b15 100644 --- a/build.sbt +++ b/build.sbt @@ -32,12 +32,13 @@ val all_scala_versions = Seq(scala212, scala213) // sbt 'set default_scala_version := 2.13.8' [commands] // FIXME Why not use scalaVersion? val default_scala_version = settingKey[String]("Default Scala version") -Global / default_scala_version := scala212 +Global / default_scala_version := scala213 +// TODO set scala only to 2.13 for spark but keep 2.12 for other projects? // Dependent library versions -val sparkVersion = "3.5.0" +val sparkVersion = "4.0.0-SNAPSHOT" val flinkVersion = "1.16.1" -val hadoopVersion = "3.3.4" +val hadoopVersion = "3.3.6" val scalaTestVersion = "3.2.15" val scalaTestVersionForConnectors = "3.0.8" val parquet4sVersion = "1.9.4" @@ -90,6 +91,20 @@ lazy val commonSettings = Seq( unidocSourceFilePatterns := Nil, ) +// Copied from SparkBuild.scala to support Java 17 for unit tests (see apache/spark#34153) +val extraJavaTestArgs = Seq( + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", + "--add-opens=java.base/java.io=ALL-UNNAMED", + "--add-opens=java.base/java.net=ALL-UNNAMED", + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED", + "--add-opens=java.base/sun.security.action=ALL-UNNAMED", + "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED") + lazy val spark = (project in file("spark")) .dependsOn(storage) .enablePlugins(Antlr4Plugin) @@ -117,17 +132,18 @@ lazy val spark = (project in file("spark")) "org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests", ), // For adding staged Spark RC versions, Ex: - // resolvers += "Apche Spark 3.5.0 (RC1) Staging" at "https://repository.apache.org/content/repositories/orgapachespark-1444/", + resolvers += "Spark master staging" at "https://repository.apache.org/content/groups/snapshots/", Compile / packageBin / mappings := (Compile / packageBin / mappings).value ++ listPythonFiles(baseDirectory.value.getParentFile / "python"), - Antlr4 / antlr4Version:= "4.9.3", + Antlr4 / antlr4Version:= "4.13.1", Antlr4 / antlr4PackageName := Some("io.delta.sql.parser"), Antlr4 / antlr4GenListener := true, Antlr4 / antlr4GenVisitor := true, Test / testOptions += Tests.Argument("-oDF"), Test / testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), + Test / javaOptions ++= extraJavaTestArgs, // Required for UTs with Java 17 // Don't execute in parallel since we can't have multiple Sparks in the same JVM Test / parallelExecution := false, @@ -238,7 +254,8 @@ lazy val sharing = (project in file("sharing")) "org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests", "org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests", "org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests", - ) + ), + Test / javaOptions ++= extraJavaTestArgs // Required for UTs with Java 17 ).configureUnidoc() lazy val kernelApi = (project in file("kernel/kernel-api")) @@ -349,6 +366,7 @@ val icebergSparkRuntimeArtifactName = { s"iceberg-spark-runtime-$expMaj.$expMin" } +/* lazy val testDeltaIcebergJar = (project in file("testDeltaIcebergJar")) // delta-iceberg depends on delta-spark! So, we need to include it during our test. .dependsOn(spark % "test") @@ -364,6 +382,7 @@ lazy val testDeltaIcebergJar = (project in file("testDeltaIcebergJar")) "org.apache.spark" %% "spark-core" % sparkVersion % "test" ) ) + */ val deltaIcebergSparkIncludePrefixes = Seq( // We want everything from this package @@ -376,6 +395,7 @@ val deltaIcebergSparkIncludePrefixes = Seq( "org/apache/spark/sql/delta/commands/convert/IcebergTable" ) +/* // Build using: build/sbt clean icebergShaded/compile iceberg/compile // It will fail the first time, just re-run it. // scalastyle:off println @@ -450,6 +470,7 @@ lazy val iceberg = (project in file("iceberg")) assemblyPackageScala / assembleArtifact := false ) // scalastyle:on println + */ lazy val generateIcebergJarsTask = TaskKey[Unit]("generateIcebergJars", "Generate Iceberg JARs") @@ -1119,7 +1140,8 @@ val createTargetClassesDir = taskKey[Unit]("create target classes dir") // Don't use these groups for any other projects lazy val sparkGroup = project - .aggregate(spark, contribs, storage, storageS3DynamoDB, iceberg, testDeltaIcebergJar, sharing) + .aggregate(spark, contribs, storage, storageS3DynamoDB, sharing) + // .aggregate(spark, contribs, storage, storageS3DynamoDB, iceberg, testDeltaIcebergJar, sharing) .settings( // crossScalaVersions must be set to Nil on the aggregating project crossScalaVersions := Nil, diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/data/ColumnVector.java b/kernel/kernel-api/src/main/java/io/delta/kernel/data/ColumnVector.java index 55c639a0161..2117794462c 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/data/ColumnVector.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/data/ColumnVector.java @@ -175,6 +175,14 @@ default ArrayValue getArray(int rowId) { throw new UnsupportedOperationException("Invalid value request for data type"); } + /** + * Return the variant value located at {@code rowId}. Returns null if the slot for {@code rowId} + * is null + */ + default VariantValue getVariant(int rowId) { + throw new UnsupportedOperationException("Invalid value request for data type"); + } + /** * Get the child vector associated with the given ordinal. This method is applicable only to the * {@code struct} type columns. diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/data/Row.java b/kernel/kernel-api/src/main/java/io/delta/kernel/data/Row.java index adcacbc0f4c..560f3113d7b 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/data/Row.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/data/Row.java @@ -117,4 +117,10 @@ public interface Row { * Throws error if the column at given ordinal is not of map type, */ MapValue getMap(int ordinal); + + /** + * Return variant value of the column located at the given ordinal. + * Throws error if the column at given ordinal is not of variant type. + */ + VariantValue getVariant(int ordinal); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/data/VariantValue.java b/kernel/kernel-api/src/main/java/io/delta/kernel/data/VariantValue.java new file mode 100644 index 00000000000..abf57d54502 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/data/VariantValue.java @@ -0,0 +1,25 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.data; + +/** + * Abstraction to represent a single Variant value in a {@link ColumnVector}. + */ +public interface VariantValue { + byte[] getValue(); + + byte[] getMetadata(); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/ChildVectorBasedRow.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/ChildVectorBasedRow.java index ae4793fa479..74e76f979b0 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/ChildVectorBasedRow.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/ChildVectorBasedRow.java @@ -21,6 +21,7 @@ import io.delta.kernel.data.ColumnVector; import io.delta.kernel.data.MapValue; import io.delta.kernel.data.Row; +import io.delta.kernel.data.VariantValue; import io.delta.kernel.types.StructType; /** @@ -111,5 +112,10 @@ public MapValue getMap(int ordinal) { return getChild(ordinal).getMap(rowId); } + @Override + public VariantValue getVariant(int ordinal) { + return getChild(ordinal).getVariant(rowId); + } + protected abstract ColumnVector getChild(int ordinal); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/GenericRow.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/GenericRow.java index 01a12bb84de..c4d6aeaf8ca 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/GenericRow.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/GenericRow.java @@ -23,6 +23,7 @@ import io.delta.kernel.data.ArrayValue; import io.delta.kernel.data.MapValue; import io.delta.kernel.data.Row; +import io.delta.kernel.data.VariantValue; import io.delta.kernel.types.*; /** @@ -134,6 +135,12 @@ public MapValue getMap(int ordinal) { return (MapValue) getValue(ordinal); } + @Override + public VariantValue getVariant(int ordinal) { + throwIfUnsafeAccess(ordinal, VariantType.class, "variant"); + return (VariantValue) getValue(ordinal); + } + private Object getValue(int ordinal) { return ordinalToValue.get(ordinal); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index 9557e949181..549c86b9fef 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -313,6 +313,8 @@ private void validateSupportedTable(Protocol protocol, Metadata metadata) { case "columnMapping": ColumnMapping.throwOnUnsupportedColumnMappingMode(metadata); break; + case "variantType-dev": + break; default: throw new UnsupportedOperationException( "Unsupported table feature: " + readerFeature); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/VectorUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/VectorUtils.java index 70becebb42c..2269fd656b2 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/VectorUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/VectorUtils.java @@ -110,6 +110,8 @@ private static Object getValueAsObject( return toJavaList(columnVector.getArray(rowId)); } else if (dataType instanceof MapType) { return toJavaMap(columnVector.getMap(rowId)); + } else if (dataType instanceof VariantType) { + return columnVector.getVariant(rowId); } else { throw new UnsupportedOperationException("unsupported data type"); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/types/BasePrimitiveType.java b/kernel/kernel-api/src/main/java/io/delta/kernel/types/BasePrimitiveType.java index 0b83f7aaf47..97e9e5575b6 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/types/BasePrimitiveType.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/types/BasePrimitiveType.java @@ -63,6 +63,7 @@ public static List getAllPrimitiveTypes() { put("timestamp", TimestampType.TIMESTAMP); put("binary", BinaryType.BINARY); put("string", StringType.STRING); + put("variant", VariantType.VARIANT); } }); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/types/VariantType.java b/kernel/kernel-api/src/main/java/io/delta/kernel/types/VariantType.java new file mode 100644 index 00000000000..71a84cdb718 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/types/VariantType.java @@ -0,0 +1,31 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.types; + +import io.delta.kernel.annotation.Evolving; + +/** + * A logical variant type. + * @since 4.0.0 + */ +@Evolving +public class VariantType extends BasePrimitiveType { + public static final VariantType VARIANT = new VariantType(); + + private VariantType() { + super("variant"); + } +} diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala index cf467dc1214..489c35ff1a2 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala @@ -618,7 +618,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { // corrupt incomplete multi-part checkpoint val corruptedCheckpointStatuses = FileNames.checkpointFileWithParts(logPath, 10, 5).asScala .map(p => FileStatus.of(p.toString, 10, 10)) - .take(4) + .take(4).toSeq val deltas = deltaFileStatuses(10L to 13L) testExpectedError[RuntimeException]( corruptedCheckpointStatuses ++ deltas, @@ -667,7 +667,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { // _last_checkpoint refers to incomplete multi-part checkpoint val corruptedCheckpointStatuses = FileNames.checkpointFileWithParts(logPath, 20, 5).asScala .map(p => FileStatus.of(p.toString, 10, 10)) - .take(4) + .take(4).toSeq testExpectedError[RuntimeException]( files = corruptedCheckpointStatuses ++ deltaFileStatuses(10L to 20L) ++ singularCheckpointFileStatuses(Seq(10L)), diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/DefaultJsonRow.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/DefaultJsonRow.java index bbada452c0a..8f60b3848ce 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/DefaultJsonRow.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/DefaultJsonRow.java @@ -33,6 +33,7 @@ import io.delta.kernel.data.ColumnVector; import io.delta.kernel.data.MapValue; import io.delta.kernel.data.Row; +import io.delta.kernel.data.VariantValue; import io.delta.kernel.types.*; import io.delta.kernel.internal.util.InternalUtils; @@ -128,6 +129,11 @@ public MapValue getMap(int ordinal) { return (MapValue) parsedValues[ordinal]; } + @Override + public VariantValue getVariant(int ordinal) { + throw new UnsupportedOperationException("not yet implemented"); + } + private static void throwIfTypeMismatch(String expType, boolean hasExpType, JsonNode jsonNode) { if (!hasExpType) { throw new RuntimeException( diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/value/DefaultVariantValue.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/value/DefaultVariantValue.java new file mode 100644 index 00000000000..c4c283e9609 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/value/DefaultVariantValue.java @@ -0,0 +1,63 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.data.value; + +import java.util.Arrays; + +import io.delta.kernel.data.VariantValue; + +/** + * Default implementation of a Delta kernel VariantValue. + */ +public class DefaultVariantValue implements VariantValue { + private final byte[] value; + private final byte[] metadata; + + public DefaultVariantValue(byte[] value, byte[] metadata) { + this.value = value; + this.metadata = metadata; + } + + @Override + public byte[] getValue() { + return value; + } + + @Override + public byte[] getMetadata() { + return metadata; + } + + @Override + public String toString() { + return "VariantValue{value=" + Arrays.toString(value) + + ", metadata=" + Arrays.toString(metadata) + '}'; + } + + /** + * Compare two variants in bytes. The variant equality is more complex than it, and we haven't + * supported it in the user surface yet. This method is only intended for tests. + */ + @Override + public boolean equals(Object other) { + if (other instanceof DefaultVariantValue) { + return Arrays.equals(value, ((DefaultVariantValue) other).getValue()) && + Arrays.equals(metadata, ((DefaultVariantValue) other).getMetadata()); + } else { + return false; + } + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/AbstractColumnVector.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/AbstractColumnVector.java index 8196b93a178..55138cecd09 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/AbstractColumnVector.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/AbstractColumnVector.java @@ -22,6 +22,7 @@ import io.delta.kernel.data.ArrayValue; import io.delta.kernel.data.ColumnVector; import io.delta.kernel.data.MapValue; +import io.delta.kernel.data.VariantValue; import io.delta.kernel.types.DataType; import static io.delta.kernel.internal.util.Preconditions.checkArgument; @@ -138,6 +139,11 @@ public ArrayValue getArray(int rowId) { throw unsupportedDataAccessException("array"); } + @Override + public VariantValue getVariant(int rowId) { + throw unsupportedDataAccessException("variant"); + } + // TODO no need to override these here; update default implementations in `ColumnVector` // to have a more informative exception message protected UnsupportedOperationException unsupportedDataAccessException(String accessType) { diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultGenericVector.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultGenericVector.java index 8fdb6b322d5..673220daa27 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultGenericVector.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultGenericVector.java @@ -23,6 +23,7 @@ import io.delta.kernel.data.ColumnVector; import io.delta.kernel.data.MapValue; import io.delta.kernel.data.Row; +import io.delta.kernel.data.VariantValue; import io.delta.kernel.types.*; import static io.delta.kernel.internal.util.Preconditions.checkArgument; @@ -170,6 +171,13 @@ public ColumnVector getChild(int ordinal) { (rowId) -> (Row) rowIdToValueAccessor.apply(rowId)); } + @Override + public VariantValue getVariant(int rowId) { + assertValidRowId(rowId); + throwIfUnsafeAccess(VariantType.class, "variant"); + return (VariantValue) rowIdToValueAccessor.apply(rowId); + } + private void throwIfUnsafeAccess( Class expDataType, String accessType) { if (!expDataType.isAssignableFrom(dataType.getClass())) { String msg = String.format( diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultSubFieldVector.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultSubFieldVector.java index a0aac0f12b1..1656d572f8a 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultSubFieldVector.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultSubFieldVector.java @@ -23,6 +23,7 @@ import io.delta.kernel.data.ColumnVector; import io.delta.kernel.data.MapValue; import io.delta.kernel.data.Row; +import io.delta.kernel.data.VariantValue; import io.delta.kernel.types.DataType; import io.delta.kernel.types.StructField; import io.delta.kernel.types.StructType; @@ -155,6 +156,12 @@ public ArrayValue getArray(int rowId) { return rowIdToRowAccessor.apply(rowId).getArray(columnOrdinal); } + @Override + public VariantValue getVariant(int rowId) { + assertValidRowId(rowId); + return rowIdToRowAccessor.apply(rowId).getVariant(columnOrdinal); + } + @Override public ColumnVector getChild(int childOrdinal) { StructType structType = (StructType) dataType; diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultVariantVector.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultVariantVector.java new file mode 100644 index 00000000000..a12c426fda3 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultVariantVector.java @@ -0,0 +1,93 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.data.vector; + +import java.util.Optional; +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.VariantValue; +import io.delta.kernel.types.DataType; + +import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import io.delta.kernel.defaults.internal.data.value.DefaultVariantValue; + +/** + * {@link io.delta.kernel.data.ColumnVector} implementation for variant type data. + */ +public class DefaultVariantVector + extends AbstractColumnVector { + private final ColumnVector valueVector; + private final ColumnVector metadataVector; + + + /** + * Create an instance of {@link io.delta.kernel.data.ColumnVector} for array type. + * + * @param size number of elements in the vector. + * @param type {@code variant} datatype definition. + * @param nullability Optional array of nullability value for each element in the vector. + * All values in the vector are considered non-null when parameter is + * empty. + * @param value The child binary column vector representing each variant's values. + * @param metadata The child binary column vector representing each variant's metadata. + */ + public DefaultVariantVector( + int size, + DataType type, + Optional nullability, + ColumnVector value, + ColumnVector metadata) { + super(size, type, nullability); + this.valueVector = requireNonNull(value, "value is null"); + this.metadataVector = requireNonNull(metadata, "metadata is null"); + } + + /** + * Get the value at given {@code rowId}. The return value is undefined and can be + * anything, if the slot for {@code rowId} is null. + * + * @param rowId + * @return + */ + @Override + public VariantValue getVariant(int rowId) { + checkValidRowId(rowId); + if (isNullAt(rowId)) { + return null; + } + + return new DefaultVariantValue( + valueVector.getBinary(rowId), metadataVector.getBinary(rowId)); + } + + /** + * Get the child column vector at the given {@code ordinal}. Variants should only have two + * child vectors, one for value and one for metadata. + * + * @param ordinal + * @return + */ + @Override + public ColumnVector getChild(int ordinal) { + checkArgument(ordinal >= 0 && ordinal < 2, "Invalid ordinal " + ordinal); + if (ordinal == 0) { + return valueVector; + } else { + return metadataVector; + } + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultViewVector.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultViewVector.java index 49c1fe00a2b..f256c6300c9 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultViewVector.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/vector/DefaultViewVector.java @@ -20,6 +20,7 @@ import io.delta.kernel.data.ArrayValue; import io.delta.kernel.data.ColumnVector; import io.delta.kernel.data.MapValue; +import io.delta.kernel.data.VariantValue; import io.delta.kernel.types.DataType; import static io.delta.kernel.internal.util.Preconditions.checkArgument; @@ -137,6 +138,12 @@ public ArrayValue getArray(int rowId) { return underlyingVector.getArray(offset + rowId); } + @Override + public VariantValue getVariant(int rowId) { + checkValidRowId(rowId); + return underlyingVector.getVariant(offset + rowId); + } + @Override public ColumnVector getChild(int ordinal) { return new DefaultViewVector(underlyingVector.getChild(ordinal), offset, offset + size); diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnReaders.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnReaders.java index b06da114600..824781e6651 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnReaders.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnReaders.java @@ -78,6 +78,8 @@ public static Converter createConverter( initialBatchSize, (DecimalType) typeFromClient, typeFromFile); } else if (typeFromClient instanceof TimestampType) { return TimestampConverters.createTimestampConverter(initialBatchSize, typeFromFile); + } else if (typeFromClient instanceof VariantType) { + return new VariantColumnReader(initialBatchSize); } throw new UnsupportedOperationException(typeFromClient + " is not supported"); diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnWriters.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnWriters.java index c3f75e55474..afefabb90ff 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnWriters.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnWriters.java @@ -142,6 +142,8 @@ private static ColumnWriter createColumnWriter( return new MapWriter(colName, fieldIndex, columnVector); } else if (dataType instanceof StructType) { return new StructWriter(colName, fieldIndex, columnVector); + } else if (dataType instanceof VariantType) { + return new VariantWriter(colName, fieldIndex, columnVector); } throw new IllegalArgumentException("Unsupported column vector type: " + dataType); @@ -474,4 +476,31 @@ void writeNonNullRowValue(RecordConsumer recordConsumer, int rowId) { recordConsumer.endGroup(); } } + + static class VariantWriter extends ColumnWriter { + private ColumnWriter valueWriter; + private ColumnWriter metadataWriter; + + VariantWriter(String name, int fieldId, ColumnVector variantColumnVector) { + super(name, fieldId, variantColumnVector); + valueWriter = new BinaryWriter( + "value", + 0, + variantColumnVector.getChild(0) + ); + metadataWriter = new BinaryWriter( + "metadata", + 1, + variantColumnVector.getChild(1) + ); + } + + @Override + void writeNonNullRowValue(RecordConsumer recordConsumer, int rowId) { + recordConsumer.startGroup(); + valueWriter.writeRowValue(recordConsumer, rowId); + metadataWriter.writeRowValue(recordConsumer, rowId); + recordConsumer.endGroup(); + } + } } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetSchemaUtils.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetSchemaUtils.java index a86ee4323d0..149e1892479 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetSchemaUtils.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetSchemaUtils.java @@ -235,6 +235,8 @@ private static Type toParquetType( type = toParquetMapType((MapType) dataType, name, repetition); } else if (dataType instanceof StructType) { type = toParquetStructType((StructType) dataType, name, repetition); + } else if (dataType instanceof VariantType) { + type = toParquetVariantType(name, repetition); } else { throw new UnsupportedOperationException( "Writing given type data to Parquet is not supported: " + dataType); @@ -296,6 +298,13 @@ private static Type toParquetStructType(StructType structType, String name, return new GroupType(repetition, name, fields); } + private static Type toParquetVariantType(String name, Repetition repetition) { + return Types.buildGroup(repetition) + .addField(toParquetType(BinaryType.BINARY, "value", REQUIRED, Optional.empty())) + .addField(toParquetType(BinaryType.BINARY, "metadata", REQUIRED, Optional.empty())) + .named(name); + } + /** * Recursively checks whether the given data type has any Parquet field ids in it. */ diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/VariantColumnReader.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/VariantColumnReader.java new file mode 100644 index 00000000000..24132058337 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/VariantColumnReader.java @@ -0,0 +1,121 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.parquet; + +import java.util.*; + +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.VariantType; +import static io.delta.kernel.internal.util.Preconditions.checkArgument; + +import io.delta.kernel.defaults.internal.data.vector.DefaultVariantVector; +import io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.BaseColumnReader; +import io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.BinaryColumnReader; + +class VariantColumnReader + extends GroupConverter + implements BaseColumnReader { + private final BinaryColumnReader valueConverter; + private final BinaryColumnReader metadataConverter; + + // working state + private int currentRowIndex; + private boolean[] nullability; + // If the value is null, start/end never get called which is a signal for null + // Set the initial state to true and when start() is called set it to false. + private boolean isCurrentValueNull = true; + + /** + * Create converter for {@link VariantType} column. + * + * @param initialBatchSize Estimate of initial row batch size. Used in memory allocations. + */ + VariantColumnReader(int initialBatchSize) { + checkArgument(initialBatchSize > 0, "invalid initialBatchSize: %s", initialBatchSize); + this.nullability = ParquetColumnReaders.initNullabilityVector(initialBatchSize); + + this.valueConverter = new BinaryColumnReader(BinaryType.BINARY, initialBatchSize); + this.metadataConverter = new BinaryColumnReader(BinaryType.BINARY, initialBatchSize); + } + + @Override + public Converter getConverter(int fieldIndex) { + checkArgument( + fieldIndex >= 0 && fieldIndex < 2, + "variant type is represented by a struct with 2 fields"); + if (fieldIndex == 0) { + return valueConverter; + } else { + return metadataConverter; + } + } + + @Override + public void start() { + isCurrentValueNull = false; + } + + @Override + public void end() { + } + + @Override + public void finalizeCurrentRow(long currentRowIndex) { + resizeIfNeeded(); + finalizeLastRowInConverters(currentRowIndex); + nullability[this.currentRowIndex] = isCurrentValueNull; + isCurrentValueNull = true; + + this.currentRowIndex++; + } + + public ColumnVector getDataColumnVector(int batchSize) { + ColumnVector vector = new DefaultVariantVector( + batchSize, + VariantType.VARIANT, + Optional.of(nullability), + valueConverter.getDataColumnVector(batchSize), + metadataConverter.getDataColumnVector(batchSize) + ); + resetWorkingState(); + return vector; + } + + @Override + public void resizeIfNeeded() { + if (nullability.length == currentRowIndex) { + int newSize = nullability.length * 2; + this.nullability = Arrays.copyOf(this.nullability, newSize); + ParquetColumnReaders.setNullabilityToTrue(this.nullability, newSize / 2, newSize); + } + } + + @Override + public void resetWorkingState() { + this.currentRowIndex = 0; + this.isCurrentValueNull = true; + this.nullability = ParquetColumnReaders.initNullabilityVector(this.nullability.length); + } + + private void finalizeLastRowInConverters(long prevRowIndex) { + valueConverter.finalizeCurrentRow(prevRowIndex); + metadataConverter.finalizeCurrentRow(prevRowIndex); + } +} diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/integration/DataBuilderUtils.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/integration/DataBuilderUtils.java index 1474bd2db67..5b76dc88687 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/integration/DataBuilderUtils.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/integration/DataBuilderUtils.java @@ -26,6 +26,7 @@ import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.MapValue; import io.delta.kernel.data.Row; +import io.delta.kernel.data.VariantValue; import io.delta.kernel.types.StructType; import static io.delta.kernel.internal.util.Preconditions.checkArgument; @@ -165,5 +166,10 @@ public MapValue getMap(int ordinal) { throw new UnsupportedOperationException( "map type unsupported for TestColumnBatchBuilder; use scala test utilities"); } + + @Override + public VariantValue getVariant(int ordinal) { + return (VariantValue) values.get(ordinal); + } } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala index e6ceb3dd4cc..4eee1071ffb 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala @@ -261,12 +261,12 @@ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils { Seq(TestRow(2), TestRow(2), TestRow(2)), TestRow("2", "2", TestRow(2, 2L)), "2" - ) :: Nil) + ) :: Nil).toSeq checkTable( path = path, expectedAnswer = expectedAnswer, - readCols = readCols + readCols = readCols.toSeq ) } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala index 51193d3ac90..f82eefe7b65 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala @@ -342,7 +342,7 @@ trait FileReadMetrics { self: Object => } } - def getVersionsRead: Seq[Long] = versionsRead + def getVersionsRead: Seq[Long] = versionsRead.toSeq def resetMetrics(): Unit = { versionsRead.clear() diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala index 8b19a8422db..b62fbfb24df 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala @@ -17,6 +17,8 @@ package io.delta.kernel.defaults.internal.parquet import java.math.BigDecimal +import org.apache.spark.sql.DataFrame + import io.delta.golden.GoldenTableUtils.goldenTableFile import io.delta.kernel.defaults.utils.{ExpressionTestUtils, TestRow, VectorTestUtils} import io.delta.kernel.types._ @@ -141,4 +143,56 @@ class ParquetFileReaderSuite extends AnyFunSuite checkAnswer(actResult2, expResult2) } + + /** + * Writes a table using Spark, reads it back using the Delta Kernel implementation, and asserts + * that the results are the same. + */ + private def testRead(testName: String)(df: => DataFrame): Unit = { + test(testName) { + withTable("test_table") { + df.write + .format("delta") + .mode("overwrite") + .saveAsTable("test_table") + val path = spark.sql("describe table extended `test_table`") + .where("col_name = 'Location'") + .collect()(0) + .getString(1) + .replace("file:", "") + + val kernelSchema = tableSchema(path) + val actResult = readParquetFilesUsingKernel(path, kernelSchema) + val expResult = readParquetFilesUsingSpark(path, kernelSchema) + checkAnswer(actResult, expResult) + } + } + } + + testRead("basic read variant") { + spark.range(0, 10, 1, 1).selectExpr( + "parse_json(cast(id as string)) as basic_v", + "named_struct('v', parse_json(cast(id as string))) as struct_v", + """array( + parse_json(cast(id as string)), + parse_json(cast(id as string)), + parse_json(cast(id as string)) + ) as array_v""", + "map('test', parse_json(cast(id as string))) as map_value_v", + "map(parse_json(cast(id as string)), parse_json(cast(id as string))) as map_key_v" + ) + } + + testRead("basic null variant") { + spark.range(0, 10, 1, 1).selectExpr( + "cast(null as variant) basic_v", + "named_struct('v', cast(null as variant)) as struct_v", + """array( + parse_json(cast(id as string)), + parse_json(cast(id as string)), + null + ) as array_v""", + "map('test', cast(null as variant)) as map_value_v" + ) + } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileWriterSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileWriterSuite.scala index 5994a6a6694..d6d5ccb10cf 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileWriterSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileWriterSuite.scala @@ -17,6 +17,8 @@ package io.delta.kernel.defaults.internal.parquet import java.lang.{Double => DoubleJ, Float => FloatJ} +import org.apache.spark.sql.DataFrame + import io.delta.golden.GoldenTableUtils.{goldenTableFile, goldenTablePath} import io.delta.kernel.data.{ColumnarBatch, FilteredColumnarBatch} import io.delta.kernel.defaults.internal.DefaultKernelUtils @@ -189,6 +191,64 @@ class ParquetFileWriterSuite extends AnyFunSuite } } + def testWrite(testName: String)(df: => DataFrame): Unit = { + test(testName) { + withTable("test_table") { + withTempDir { writeDir => + df.write + .format("delta") + .mode("overwrite") + .saveAsTable("test_table") + val filePath = spark.sql("describe table extended `test_table`") + .where("col_name = 'Location'") + .collect()(0) + .getString(1) + .replace("file:", "") + + val schema = tableSchema(filePath) + + val physicalSchema = if (hasColumnMappingId(filePath)) { + convertToPhysicalSchema(schema, schema, ColumnMapping.COLUMN_MAPPING_MODE_ID) + } else { + schema + } + val readData = readParquetUsingKernelAsColumnarBatches(filePath, physicalSchema) + .map(_.toFiltered(Option.empty[Predicate])) + val writePath = writeDir.getAbsolutePath + val writeOutput = writeToParquetUsingKernel(readData, writePath) + verifyContentUsingKernelReader(writePath, readData) + } + } + } + } + + testWrite("basic write variant") { + spark.range(0, 10, 1, 1).selectExpr( + "parse_json(cast(id as string)) as basic_v", + "named_struct('v', parse_json(cast(id as string))) as struct_v", + """array( + parse_json(cast(id as string)), + parse_json(cast(id as string)), + parse_json(cast(id as string)) + ) as array_v""", + "map('test', parse_json(cast(id as string))) as map_value_v", + "map(parse_json(cast(id as string)), parse_json(cast(id as string))) as map_key_v" + ) + } + + testWrite("basic write null variant") { + spark.range(0, 10, 1, 1).selectExpr( + "cast(null as variant) basic_v", + "named_struct('v', cast(null as variant)) as struct_v", + """array( + parse_json(cast(id as string)), + parse_json(cast(id as string)), + null + ) as array_v""", + "map('test', cast(null as variant)) as map_value_v" + ) + } + test("columnar batches containing different schema") { withTempDir { tempPath => val targetDir = tempPath.getAbsolutePath diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestRow.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestRow.scala index 6782721d3d1..171a769c333 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestRow.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestRow.scala @@ -18,7 +18,9 @@ package io.delta.kernel.defaults.utils import scala.collection.JavaConverters._ import org.apache.spark.sql.{types => sparktypes} import org.apache.spark.sql.{Row => SparkRow} +import org.apache.spark.unsafe.types.VariantVal import io.delta.kernel.data.{ArrayValue, ColumnVector, MapValue, Row} +import io.delta.kernel.defaults.internal.data.value.DefaultVariantValue import io.delta.kernel.types._ import java.time.LocalDate @@ -40,7 +42,7 @@ import java.time.LocalDate * - ArrayType --> Seq[Any] * - MapType --> Map[Any, Any] * - StructType --> TestRow - * + * - VariantType --> VariantVal * For complex types array and map, the inner elements types should align with this mapping. */ class TestRow(val values: Array[Any]) { @@ -103,9 +105,10 @@ object TestRow { case _: ArrayType => arrayValueToScalaSeq(row.getArray(i)) case _: MapType => mapValueToScalaMap(row.getMap(i)) case _: StructType => TestRow(row.getStruct(i)) + case _: VariantType => row.getVariant(i) case _ => throw new UnsupportedOperationException("unrecognized data type") } - }) + }.toSeq) } def apply(row: SparkRow): TestRow = { @@ -133,6 +136,7 @@ object TestRow { decodeCellValue(mapType.keyType, k) -> decodeCellValue(mapType.valueType, v) } case _: sparktypes.StructType => TestRow(obj.asInstanceOf[SparkRow]) + case _: sparktypes.VariantType => obj.asInstanceOf[VariantVal] case _ => throw new UnsupportedOperationException("unrecognized data type") } } @@ -163,6 +167,9 @@ object TestRow { decodeCellValue(mapType.keyType, k) -> decodeCellValue(mapType.valueType, v) } case _: sparktypes.StructType => TestRow(row.getStruct(i)) + case _: sparktypes.VariantType => + val sparkVariant = row.getAs[VariantVal](i) + new DefaultVariantValue(sparkVariant.getValue(), sparkVariant.getMetadata()) case _ => throw new UnsupportedOperationException("unrecognized data type") } }) @@ -193,6 +200,9 @@ object TestRow { TestRow.fromSeq(Seq.range(0, dataType.length()).map { ordinal => getAsTestObject(vector.getChild(ordinal), rowId) }) + case _: VariantType => + val kernelVariant = vector.getVariant(rowId) + new VariantVal(kernelVariant.getValue(), kernelVariant.getMetadata()) case _ => throw new UnsupportedOperationException("unrecognized data type") } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala index 41926b12eef..a14d2497bec 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala @@ -73,7 +73,7 @@ trait TestUtils extends Assertions with SQLHelper { while (iter.hasNext) { result.append(iter.next()) } - result + result.toSeq } finally { iter.close() } @@ -117,6 +117,17 @@ trait TestUtils extends Assertions with SQLHelper { lazy val classLoader: ClassLoader = ResourceLoader.getClass.getClassLoader } + /** + * Drops table `tableName` after calling `f`. + */ + def withTable(tableNames: String*)(f: => Unit): Unit = { + try f finally { + tableNames.foreach { name => + spark.sql(s"DROP TABLE IF EXISTS $name") + } + } + } + def withGoldenTable(tableName: String)(testFunc: String => Unit): Unit = { val tablePath = GoldenTableUtils.goldenTablePath(tableName) testFunc(tablePath) @@ -153,7 +164,7 @@ trait TestUtils extends Assertions with SQLHelper { // for all primitive types Seq(new Column((basePath :+ field.getName).asJava.toArray(new Array[String](0)))); case _ => Seq.empty - } + }.toSeq } def collectScanFileRows(scan: Scan, tableClient: TableClient = defaultTableClient): Seq[Row] = { @@ -231,7 +242,7 @@ trait TestUtils extends Assertions with SQLHelper { } } } - result + result.toSeq } /** @@ -623,7 +634,8 @@ trait TestUtils extends Assertions with SQLHelper { toSparkType(field.getDataType), field.isNullable ) - }) + }.toSeq) + case VariantType.VARIANT => sparktypes.DataTypes.VariantType } } diff --git a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala index d4ece43b8ce..cfbd57c390b 100644 --- a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala +++ b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala @@ -522,7 +522,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { private def createUnresolvedTable( tableName: Seq[String], commandName: String): UnresolvedTable = { - UnresolvedTable(tableName, commandName, relationTypeMismatchHint = None) + UnresolvedTable(tableName, commandName) } // Build the text of the CHECK constraint expression. The user-specified whitespace is in the diff --git a/spark/src/main/scala/io/delta/tables/DeltaTableBuilder.scala b/spark/src/main/scala/io/delta/tables/DeltaTableBuilder.scala index 293b1d89b2f..7d5978215fd 100644 --- a/spark/src/main/scala/io/delta/tables/DeltaTableBuilder.scala +++ b/spark/src/main/scala/io/delta/tables/DeltaTableBuilder.scala @@ -27,6 +27,7 @@ import org.apache.spark.annotation._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LogicalPlan, ReplaceTable} +import org.apache.spark.sql.catalyst.plans.logical.ColumnDefinition import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.SQLExecution @@ -343,7 +344,7 @@ class DeltaTableBuilder private[tables]( val unresolvedTable = org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier(table) CreateTable( unresolvedTable, - StructType(columns.toSeq), + columns.map(ColumnDefinition.fromV1Column(_, spark.sessionState.sqlParser)).toSeq, partitioning, tableSpec, ifNotExists) @@ -351,7 +352,7 @@ class DeltaTableBuilder private[tables]( val unresolvedTable = org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier(table) ReplaceTable( unresolvedTable, - StructType(columns.toSeq), + columns.map(ColumnDefinition.fromV1Column(_, spark.sessionState.sqlParser)).toSeq, partitioning, tableSpec, orCreate) diff --git a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala index 6f12986c676..8cf60c85fe1 100644 --- a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala +++ b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala @@ -77,7 +77,7 @@ object VacuumTableCommand { dryRun: Boolean): VacuumTableCommand = { val child = UnresolvedDeltaPathOrIdentifier(path, table, "VACUUM") val unresolvedInventoryTable = inventoryTable.map(rt => - UnresolvedTable(rt.nameParts, "VACUUM", relationTypeMismatchHint = None)) + UnresolvedTable(rt.nameParts, "VACUUM")) VacuumTableCommand(child, horizonHours, unresolvedInventoryTable, inventoryQuery, dryRun) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeltaUpdateTable.scala b/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeltaUpdateTable.scala index e2d4d829837..cf626f6bb1d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeltaUpdateTable.scala +++ b/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeltaUpdateTable.scala @@ -16,7 +16,8 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.delta.DeltaAnalysisException + import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, ExtractValue, GetStructField} /** @@ -66,7 +67,7 @@ object DeltaUpdateTable { def fail(extraMsg: String): Nothing = { val msg = Option(errMsg).map(_ + " - ").getOrElse("") + extraMsg - throw new AnalysisException(msg) + throw new DeltaAnalysisException(msg) } def extractRecursively(expr: Expression): Seq[String] = expr match { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala index 55b3e658784..c5f3d982d27 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala @@ -220,7 +220,8 @@ object ColumnWithDefaultExprUtils extends DeltaLogging { incrementalExecution.currentBatchId, incrementalExecution.prevOffsetSeqMetadata, incrementalExecution.offsetSeqMetadata, - incrementalExecution.watermarkPropagator + incrementalExecution.watermarkPropagator, + incrementalExecution.isFirstBatch ) newIncrementalExecution.executedPlan // Force the lazy generation of execution plan diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index ed3515c6669..8ef4aebfb49 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -257,7 +257,14 @@ trait DeltaErrorsBase startPosition: Option[Int] = None, plan: Option[LogicalPlan] = None, cause: Option[Throwable] = None): AnalysisException = { - new ExtendedAnalysisException(msg, line, startPosition, plan, cause) + if (plan.isEmpty) { + new DeltaAnalysisException(msg, line, startPosition, cause) + } else { + new ExtendedAnalysisException( + new DeltaAnalysisException(msg, line, startPosition, cause), + plan.get + ) + } } def notNullColumnMissingException(constraint: Constraints.NotNull): Throwable = { @@ -295,7 +302,7 @@ trait DeltaErrorsBase } def invalidConstraintName(name: String): AnalysisException = { - new AnalysisException(s"Cannot use '$name' as the name of a CHECK constraint.") + new DeltaAnalysisException(s"Cannot use '$name' as the name of a CHECK constraint.") } def nonexistentConstraint(constraintName: String, tableName: String): AnalysisException = { @@ -1096,7 +1103,7 @@ trait DeltaErrorsBase } def bloomFilterInvalidParameterValueException(message: String): Throwable = { - new AnalysisException( + new DeltaAnalysisException( s"Cannot create bloom filter index, invalid parameter value: $message") } @@ -1221,7 +1228,7 @@ trait DeltaErrorsBase def prettyMap(m: Map[String, String]): String = { m.map(e => s"${e._1}=${e._2}").mkString("[", ", ", "]") } - new AnalysisException( + new DeltaAnalysisException( s"""You are trying to convert a table which already has a delta log where the table |properties in the catalog don't match the configuration in the delta log. |Table properties in catalog: ${prettyMap(tableProperties)} @@ -1399,7 +1406,7 @@ trait DeltaErrorsBase def restoreTimestampBeforeEarliestException( userTimestamp: String, earliestTimestamp: String): Throwable = { - new AnalysisException( + new DeltaAnalysisException( s"Cannot restore table to timestamp ($userTimestamp) as it is before the earliest version " + s"available. Please use a timestamp after ($earliestTimestamp)" ) @@ -1579,7 +1586,7 @@ trait DeltaErrorsBase } def viewNotSupported(operationName: String): Throwable = { - new AnalysisException(s"Operation $operationName can not be performed on a view") + new DeltaAnalysisException(s"Operation $operationName can not be performed on a view") } def postCommitHookFailedException( @@ -1733,7 +1740,7 @@ trait DeltaErrorsBase } def generatedColumnsNonDeterministicExpression(expr: Expression): Throwable = { - new AnalysisException( + new DeltaAnalysisException( s"Found ${expr.sql}. A generated column cannot use a non deterministic expression") } @@ -2069,7 +2076,7 @@ trait DeltaErrorsBase columnName: String, constraints: Map[String, String]): Throwable = { val plural = if (constraints.size > 1) "s" else "" - new AnalysisException( + new DeltaAnalysisException( s""" |Cannot $operation column $columnName because this column is referenced by the following | check constraint$plural:\n\t${constraints.mkString("\n\t")} @@ -2081,7 +2088,7 @@ trait DeltaErrorsBase columnName: String, fields: Seq[StructField]): Throwable = { val plural = if (fields.size > 1) "s" else "" - new AnalysisException( + new DeltaAnalysisException( s""" |Cannot $operation column $columnName because this column is referenced by the following | generated column$plural:\n\t${fields.map(_.name).mkString("\n\t")} @@ -2410,7 +2417,7 @@ trait DeltaErrorsBase hasStart: Boolean, hasStep: Boolean, hasInsert: Boolean): Throwable = { - new AnalysisException(s"Inconsistent IDENTITY metadata for column $colName " + + new DeltaAnalysisException(s"Inconsistent IDENTITY metadata for column $colName " + s"detected: $hasStart, $hasStep, $hasInsert") } @@ -3430,7 +3437,7 @@ class MetadataMismatchErrorBuilder { } def finalizeAndThrow(conf: SQLConf): Unit = { - throw new AnalysisException(bits.mkString("\n")) + throw new DeltaAnalysisException(bits.mkString("\n")) } } @@ -3481,7 +3488,7 @@ class DeltaIllegalStateException( override def getErrorClass: String = errorClass override def getMessageParameters: java.util.Map[String, String] = { - DeltaThrowableHelper.getParameterNames(errorClass, null) + DeltaThrowableHelper.getParameterNames(Option(errorClass), errorSubClass = None) .zip(messageParameters).toMap.asJava } } @@ -3545,7 +3552,7 @@ class DeltaRuntimeException( override def getErrorClass: String = errorClass override def getMessageParameters: java.util.Map[String, String] = - DeltaThrowableHelper.getParameterNames(errorClass, null) + DeltaThrowableHelper.getParameterNames(Option(errorClass), errorSubClass = None) .zip(messageParameters).toMap.asJava } @@ -3610,8 +3617,8 @@ class DeltaTablePropertyValidationFailedException( override def getMessageParameters: java.util.Map[String, String] = { DeltaThrowableHelper.getParameterNames( - "DELTA_VIOLATE_TABLE_PROPERTY_VALIDATION_FAILED", - subClass.tag).zip(subClass.messageParameters(table)).toMap.asJava + Some("DELTA_VIOLATE_TABLE_PROPERTY_VALIDATION_FAILED"), + Some(subClass.tag)).zip(subClass.messageParameters(table)).toMap.asJava } override def getErrorClass: String = diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaSharedExceptions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaSharedExceptions.scala index 0f07a125f95..41121a5fffe 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaSharedExceptions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaSharedExceptions.scala @@ -20,28 +20,75 @@ import scala.collection.JavaConverters._ import org.antlr.v4.runtime.ParserRuleContext +import org.apache.spark.QueryContext import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils} import org.apache.spark.sql.catalyst.trees.Origin +// Spark's AnalysisException constructor is no longer public. This means all calls +// with AnalysisException(msg: String) are invalid. +// For now we adjust DeltaAnalysisException to expose a constructor that allows this. Otherwise, +// we should consider changing all of those exceptions to use an errorClass with the public +// AnalysisException constructors (this can just be INTERNAL_ERROR) class DeltaAnalysisException( - errorClass: String, - messageParameters: Array[String], - cause: Option[Throwable] = None, - origin: Option[Origin] = None) + message: String, + line: Option[Int] = None, + startPosition: Option[Int] = None, + cause: Option[Throwable] = None, + errorClass: Option[String] = None, + messageParameters: Array[String] = Array.empty, + context: Array[QueryContext] = Array.empty) extends AnalysisException( - message = DeltaThrowableHelper.getMessage(errorClass, messageParameters), + message, + line, + startPosition, + cause, + errorClass, messageParameters = DeltaThrowableHelper - .getParameterNames(errorClass, errorSubClass = null) - .zip(messageParameters) - .toMap, - errorClass = Some(errorClass), - line = origin.flatMap(_.line), - startPosition = origin.flatMap(_.startPosition), - context = origin.map(_.getQueryContext).getOrElse(Array.empty), - cause = cause) - with DeltaThrowable { - def getMessageParametersArray: Array[String] = messageParameters + .getParameterNames(errorClass, errorSubClass = None) + .zip(messageParameters) + .toMap, + context) + with DeltaThrowable { + + /* Implemented for testing */ + private[delta] def getMessageParametersArray: Array[String] = errorClass match { + case Some(_) => messageParameters + case None => Array.empty + } + + def this( + errorClass: String, + messageParameters: Array[String]) = + this( + message = DeltaThrowableHelper.getMessage(errorClass, messageParameters), + messageParameters = messageParameters, + errorClass = Some(errorClass) + ) + + def this( + errorClass: String, + messageParameters: Array[String], + cause: Option[Throwable]) = + this( + message = DeltaThrowableHelper.getMessage(errorClass, messageParameters), + messageParameters = messageParameters, + errorClass = Some(errorClass), + cause = cause) + + def this( + errorClass: String, + messageParameters: Array[String], + cause: Option[Throwable], + origin: Option[Origin]) = + this( + message = DeltaThrowableHelper.getMessage(errorClass, messageParameters), + messageParameters = messageParameters, + errorClass = Some(errorClass), + line = origin.flatMap(_.line), + startPosition = origin.flatMap(_.startPosition), + context = origin.map(_.getQueryContext).getOrElse(Array.empty), + cause = cause) } class DeltaIllegalArgumentException( @@ -55,7 +102,7 @@ class DeltaIllegalArgumentException( def getMessageParametersArray: Array[String] = messageParameters override def getMessageParameters: java.util.Map[String, String] = { - DeltaThrowableHelper.getParameterNames(errorClass, errorSubClass = null) + DeltaThrowableHelper.getParameterNames(Option(errorClass), errorSubClass = None) .zip(messageParameters).toMap.asJava } } @@ -70,7 +117,7 @@ class DeltaUnsupportedOperationException( def getMessageParametersArray: Array[String] = messageParameters override def getMessageParameters: java.util.Map[String, String] = { - DeltaThrowableHelper.getParameterNames(errorClass, errorSubClass = null) + DeltaThrowableHelper.getParameterNames(Option(errorClass), errorSubClass = None) .zip(messageParameters).toMap.asJava } } @@ -96,7 +143,7 @@ class DeltaArithmeticException( override def getErrorClass: String = errorClass override def getMessageParameters: java.util.Map[String, String] = { - DeltaThrowableHelper.getParameterNames(errorClass, errorSubClass = null) + DeltaThrowableHelper.getParameterNames(Option(errorClass), errorSubClass = None) .zip(messageParameters).toMap.asJava } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala index e0616efbe18..56884554b29 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala @@ -587,7 +587,7 @@ object UnresolvedDeltaPathOrIdentifier { (path, tableIdentifier) match { case (Some(p), None) => UnresolvedPathBasedDeltaTable(p, Map.empty, cmd) case (None, Some(t)) => - UnresolvedTable(t.nameParts, cmd, None) + UnresolvedTable(t.nameParts, cmd) case _ => throw new IllegalArgumentException( s"Exactly one of path or tableIdentifier must be provided to $cmd") } @@ -609,7 +609,7 @@ object UnresolvedPathOrIdentifier { cmd: String): LogicalPlan = { (path, tableIdentifier) match { case (_, Some(t)) => - UnresolvedTable(t.nameParts, cmd, None) + UnresolvedTable(t.nameParts, cmd) case (Some(p), None) => UnresolvedPathBasedTable(p, Map.empty, cmd) case _ => throw new IllegalArgumentException( s"At least one of path or tableIdentifier must be provided to $cmd") diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaThrowable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaThrowable.scala index 5d8b185ba70..521200281d3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaThrowable.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaThrowable.scala @@ -25,7 +25,7 @@ trait DeltaThrowable extends SparkThrowable { // Portable error identifier across SQL engines // If null, error class or SQLSTATE is not set override def getSqlState: String = - DeltaThrowableHelper.getSqlState(this.getErrorClass.split('.').head) + DeltaThrowableHelper.getSqlState(this.getErrorClass) // True if this error is an internal error. override def isInternalError: Boolean = DeltaThrowableHelper.isInternalError(this.getErrorClass) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaThrowableHelper.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaThrowableHelper.scala index 138caaa20ed..e3f2e67c766 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaThrowableHelper.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaThrowableHelper.scala @@ -63,11 +63,13 @@ object DeltaThrowableHelper def isInternalError(errorClass: String): Boolean = errorClass == "INTERNAL_ERROR" - def getParameterNames(errorClass: String, errorSubClass: String): Array[String] = { - val wholeErrorClass = if (errorSubClass == null) { - errorClass - } else { - errorClass + "." + errorSubClass + def getParameterNames( + errorClass: Option[String], + errorSubClass: Option[String]): Array[String] = { + val wholeErrorClass = (errorClass, errorSubClass) match { + case (Some(errorClass), Some(errorSubClass)) => errorClass + "." + errorSubClass + case (Some(errorClass), None) => errorClass + case _ => return Array.empty } val parameterizedMessage = errorClassReader.getMessageTemplate(wholeErrorClass) val pattern = "<[a-zA-Z0-9_-]+>".r diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala index a8d1def58ce..df8326a6476 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala @@ -78,6 +78,7 @@ object ResolveDeltaMergeInto { throw new DeltaAnalysisException( errorClass = "DELTA_MERGE_UNRESOLVED_EXPRESSION", messageParameters = Array(a.sql, mergeClauseType, cols), + cause = None, origin = Some(a.origin)) } } @@ -310,6 +311,7 @@ object ResolveDeltaMergeInto { errorClass = "DELTA_MERGE_RESOLVED_ATTRIBUTE_MISSING_FROM_INPUT", messageParameters = Array(missingAttributes, input, resolvedMerge.simpleString(SQLConf.get.maxToStringFields)), + cause = None, origin = Some(resolvedMerge.origin) ) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 1966d0ed176..3a3cf786808 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -358,7 +358,8 @@ object TableFeature { RowTrackingFeature, InCommitTimestampTableFeature, TypeWideningTableFeature, - VacuumProtocolCheckTableFeature) + VacuumProtocolCheckTableFeature, + VariantTypeTableFeature) } val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap require(features.size == featureMap.size, "Lowercase feature names must not duplicate.") @@ -483,6 +484,14 @@ object ColumnMappingTableFeature } } +object VariantTypeTableFeature extends ReaderWriterFeature(name = "variantType-dev") + with FeatureAutomaticallyEnabledByMetadata { + override def metadataRequiresFeatureToBeEnabled( + metadata: Metadata, spark: SparkSession): Boolean = { + SchemaUtils.checkForVariantTypeColumnsRecursively(metadata.schema) + } +} + object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz") with FeatureAutomaticallyEnabledByMetadata { override def metadataRequiresFeatureToBeEnabled( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index e364a65ae15..d1edaefe87e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -278,7 +278,11 @@ case class DeltaTableV2( def withOptions(newOptions: Map[String, String]): DeltaTableV2 = { val ttSpec = DeltaDataSource.getTimeTravelVersion(newOptions) if (timeTravelOpt.nonEmpty && ttSpec.nonEmpty) { - throw DeltaErrors.multipleTimeTravelSyntaxUsed + // If the options match the spec on the table, we ignore the error. + if (timeTravelOpt.get.version != ttSpec.get.version || + timeTravelOpt.get.timestamp != ttSpec.get.timestamp) { + throw DeltaErrors.multipleTimeTravelSyntaxUsed + } } val caseInsensitiveNewOptions = new CaseInsensitiveStringMap(newOptions.asJava) @@ -329,7 +333,7 @@ object DeltaTableV2 { /** Resolves a table identifier into a DeltaTableV2, leveraging standard v2 table resolution. */ def apply(spark: SparkSession, tableId: TableIdentifier, cmd: String): DeltaTableV2 = { - resolve(spark, UnresolvedTable(tableId.nameParts, cmd, None), cmd) + resolve(spark, UnresolvedTable(tableId.nameParts, cmd), cmd) } /** Applies standard v2 table resolution to an unresolved Delta table plan node */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala index a9ba8cb3e5c..583fd981551 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala @@ -160,7 +160,7 @@ trait MergeIntoCommandBase extends LeafRunnableCommand // queries that add void columns. val newNullColumn = SchemaUtils.findNullTypeColumn(migratedSchema.get) if (newNullColumn.isDefined) { - throw new AnalysisException( + throw new DeltaAnalysisException( s"""Cannot add column '${newNullColumn.get}' with type 'void'. Please explicitly specify a |non-void type.""".stripMargin.replaceAll("\n", " ") ) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/constraints/DeltaInvariantCheckerExec.scala b/spark/src/main/scala/org/apache/spark/sql/delta/constraints/DeltaInvariantCheckerExec.scala index 0a1e063effa..0f0e8eb6800 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/constraints/DeltaInvariantCheckerExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/constraints/DeltaInvariantCheckerExec.scala @@ -105,7 +105,8 @@ object DeltaInvariantCheckerExec { // Specialized optimizer to run necessary rules so that the check expressions can be evaluated. object DeltaInvariantCheckerOptimizer extends RuleExecutor[LogicalPlan] { final override protected def batches = Seq( - Batch("Finish Analysis", Once, ReplaceExpressions) + Batch("Finish Analysis", Once, ReplaceExpressions), Batch("Rewrite With expression", Once, + org.apache.spark.sql.catalyst.optimizer.RewriteWithExpression) ) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala index fe98ca0de13..e159397fa78 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala @@ -729,7 +729,7 @@ def normalizeColumnNamesInDataType( findRecursively(column, schema) } catch { case e: AnalysisException => - throw new AnalysisException(e.getMessage + s":\n${schema.treeString}") + throw new DeltaAnalysisException(e.getMessage + s":\n${schema.treeString}") } } @@ -901,7 +901,7 @@ def normalizeColumnNamesInDataType( StructType(pre ++ Seq(mid) ++ post.tail) -> droppedColumn } else { if (length == 1) { - throw new AnalysisException( + throw new DeltaAnalysisException( "Cannot drop column from a struct type with a single field: " + schema) } StructType(pre ++ post.tail) -> field @@ -1253,6 +1253,13 @@ def normalizeColumnNamesInDataType( unsupportedDataTypes.toSeq } + /** + * Find VariantType columns in the table schema. + */ + def checkForVariantTypeColumnsRecursively(schema: StructType): Boolean = { + SchemaUtils.typeExistsRecursively(schema)(_.isInstanceOf[VariantType]) + } + /** * Find TimestampNTZ columns in the table schema. */ @@ -1292,6 +1299,7 @@ def normalizeColumnNamesInDataType( case DateType => case TimestampType => case TimestampNTZType => + case VariantType => case BinaryType => case _: DecimalType => case a: ArrayType => diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/AnalysisHelper.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/AnalysisHelper.scala index 73f9f610dc4..5ae30006b85 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/AnalysisHelper.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/AnalysisHelper.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.delta.util // scalastyle:off import.ordering.noEmptyLine -import org.apache.spark.sql.delta.DeltaErrors +import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaErrors} import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt @@ -52,7 +52,7 @@ trait AnalysisHelper { tryResolveReferencesForExpressions(sparkSession)(exprs, Seq(planProvidingAttrs)) resolvedExprs.foreach { expr => if (!expr.resolved) { - throw new AnalysisException( + throw new DeltaAnalysisException( s"cannot resolve ${expr.sql} given $planProvidingAttrs") } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala index 6f870e30ca5..698f89904b3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala @@ -605,7 +605,7 @@ private[delta] object PartitionUtils { partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach { field => field.dataType match { - case _: AtomicType => // OK + case a: AtomicType if !a.isInstanceOf[VariantType] => // OK case _ => throw DeltaErrors.cannotUseDataTypeForPartitionColumnError(field) } } diff --git a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala index 6d60358e6d7..d86b99eaccc 100644 --- a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala +++ b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala @@ -40,25 +40,25 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { // Setting `delegate` to `null` is fine. The following tests don't need to touch `delegate`. val parser = new DeltaSqlParser(null) assert(parser.parsePlan("vacuum 123_") === - VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM", None), None, None, None, false)) + VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM"), None, None, None, false)) assert(parser.parsePlan("vacuum 1a.123_") === - VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM", None), + VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM"), None, None, None, false)) assert(parser.parsePlan("vacuum a.123A") === - VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM", None), + VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM"), None, None, None, false)) assert(parser.parsePlan("vacuum a.123E3_column") === - VacuumTableCommand(UnresolvedTable(Seq("a", "123E3_column"), "VACUUM", None), + VacuumTableCommand(UnresolvedTable(Seq("a", "123E3_column"), "VACUUM"), None, None, None, false)) assert(parser.parsePlan("vacuum a.123D_column") === - VacuumTableCommand(UnresolvedTable(Seq("a", "123D_column"), "VACUUM", None), + VacuumTableCommand(UnresolvedTable(Seq("a", "123D_column"), "VACUUM"), None, None, None, false)) assert(parser.parsePlan("vacuum a.123BD_column") === - VacuumTableCommand(UnresolvedTable(Seq("a", "123BD_column"), "VACUUM", None), + VacuumTableCommand(UnresolvedTable(Seq("a", "123BD_column"), "VACUUM"), None, None, None, false)) assert(parser.parsePlan("vacuum delta.`/tmp/table`") === - VacuumTableCommand(UnresolvedTable(Seq("delta", "/tmp/table"), "VACUUM", None), + VacuumTableCommand(UnresolvedTable(Seq("delta", "/tmp/table"), "VACUUM"), None, None, None, false)) assert(parser.parsePlan("vacuum \"/tmp/table\"") === @@ -91,19 +91,19 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { assert(parsedCmd === OptimizeTableCommand(None, Some(tblId("tbl")), Nil)(Nil)) assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child === - UnresolvedTable(Seq("tbl"), "OPTIMIZE", None)) + UnresolvedTable(Seq("tbl"), "OPTIMIZE")) parsedCmd = parser.parsePlan("OPTIMIZE db.tbl") assert(parsedCmd === OptimizeTableCommand(None, Some(tblId("tbl", "db")), Nil)(Nil)) assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child === - UnresolvedTable(Seq("db", "tbl"), "OPTIMIZE", None)) + UnresolvedTable(Seq("db", "tbl"), "OPTIMIZE")) parsedCmd = parser.parsePlan("OPTIMIZE catalog_foo.db.tbl") assert(parsedCmd === OptimizeTableCommand(None, Some(tblId("tbl", "db", "catalog_foo")), Nil)(Nil)) assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child === - UnresolvedTable(Seq("catalog_foo", "db", "tbl"), "OPTIMIZE", None)) + UnresolvedTable(Seq("catalog_foo", "db", "tbl"), "OPTIMIZE")) assert(parser.parsePlan("OPTIMIZE tbl_${system:spark.testing}") === OptimizeTableCommand(None, Some(tblId("tbl_true")), Nil)(Nil)) @@ -135,7 +135,7 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { assert(parsedCmd === OptimizeTableCommand(None, Some(tblId("/path/to/tbl", "delta")), Nil)(Nil)) assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child === - UnresolvedTable(Seq("delta", "/path/to/tbl"), "OPTIMIZE", None)) + UnresolvedTable(Seq("delta", "/path/to/tbl"), "OPTIMIZE")) assert(parser.parsePlan("OPTIMIZE tbl WHERE part = 1") === OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"))(Nil)) @@ -181,7 +181,7 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { // Desc detail on a table assert(parser.parsePlan("DESCRIBE DETAIL catalog_foo.db.tbl") === DescribeDeltaDetailCommand( - UnresolvedTable(Seq("catalog_foo", "db", "tbl"), DescribeDeltaDetailCommand.CMD_NAME, None), + UnresolvedTable(Seq("catalog_foo", "db", "tbl"), DescribeDeltaDetailCommand.CMD_NAME), Map.empty)) // Desc detail on a raw path @@ -193,7 +193,7 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { // Desc detail on a delta raw path assert(parser.parsePlan("DESCRIBE DETAIL delta.`dummy_raw_path`") === DescribeDeltaDetailCommand( - UnresolvedTable(Seq("delta", "dummy_raw_path"), DescribeDeltaDetailCommand.CMD_NAME, None), + UnresolvedTable(Seq("delta", "dummy_raw_path"), DescribeDeltaDetailCommand.CMD_NAME), Map.empty)) } @@ -201,17 +201,17 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { val parser = new DeltaSqlParser(null) var parsedCmd = parser.parsePlan("DESCRIBE HISTORY catalog_foo.db.tbl") assert(parsedCmd.asInstanceOf[DescribeDeltaHistory].child === - UnresolvedTable(Seq("catalog_foo", "db", "tbl"), DescribeDeltaHistory.COMMAND_NAME, None)) + UnresolvedTable(Seq("catalog_foo", "db", "tbl"), DescribeDeltaHistory.COMMAND_NAME)) parsedCmd = parser.parsePlan("DESCRIBE HISTORY delta.`/path/to/tbl`") assert(parsedCmd.asInstanceOf[DescribeDeltaHistory].child === - UnresolvedTable(Seq("delta", "/path/to/tbl"), DescribeDeltaHistory.COMMAND_NAME, None)) + UnresolvedTable(Seq("delta", "/path/to/tbl"), DescribeDeltaHistory.COMMAND_NAME)) parsedCmd = parser.parsePlan("DESCRIBE HISTORY '/path/to/tbl'") assert(parsedCmd.asInstanceOf[DescribeDeltaHistory].child === UnresolvedPathBasedDeltaTable("/path/to/tbl", Map.empty, DescribeDeltaHistory.COMMAND_NAME)) } private def targetPlanForTable(tableParts: String*): UnresolvedTable = - UnresolvedTable(tableParts.toSeq, "REORG", relationTypeMismatchHint = None) + UnresolvedTable(tableParts.toSeq, "REORG") test("REORG command is parsed as expected") { val parser = new DeltaSqlParser(null) @@ -362,7 +362,7 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { val parsedCmd = parser.parsePlan(sql) assert(parsedCmd === AlterTableDropFeature( - UnresolvedTable(Seq(table), "ALTER TABLE ... DROP FEATURE", None), + UnresolvedTable(Seq(table), "ALTER TABLE ... DROP FEATURE"), featureName, truncateHistory)) } diff --git a/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala b/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala index 07e1cc28339..d8f1430773d 100644 --- a/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala +++ b/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala @@ -22,7 +22,7 @@ import java.util.Locale import scala.language.postfixOps // scalastyle:off import.ordering.noEmptyLine -import org.apache.spark.sql.delta.{DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, TestReaderWriterFeature, TestWriterFeature} +import org.apache.spark.sql.delta.{DeltaIllegalArgumentException, DeltaIllegalStateException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, TestReaderWriterFeature, TestWriterFeature} import org.apache.spark.sql.delta.actions.{ Metadata, Protocol } import org.apache.spark.sql.delta.storage.LocalLogStore import org.apache.spark.sql.delta.test.DeltaSQLCommandTest @@ -188,7 +188,7 @@ class DeltaTableSuite extends QueryTest } // DeltaTable can be passed to executor but method call causes exception. - val e = intercept[SparkException] { + val e = intercept[DeltaIllegalStateException] { withTempDir { dir => testData.write.format("delta").mode("append").save(dir.getAbsolutePath) val dt: DeltaTable = DeltaTable.forPath(dir.getAbsolutePath) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CloneParquetSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CloneParquetSuite.scala index 1a79551bde7..1babd75c6cd 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CloneParquetSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CloneParquetSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.delta import org.apache.spark.SparkException +import org.apache.spark.SparkThrowable import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.col @@ -49,10 +50,13 @@ class CloneParquetByPathSuite extends CloneParquetSuiteBase withParquetTable(df, Seq("key1", "key2")) { sourceIdent => val tableName = "cloneTable" withTable(tableName) { - val se = intercept[SparkException] { + val errorMessage = intercept[SparkThrowable] { sql(s"CREATE TABLE $tableName $mode CLONE $sourceIdent") + } match { + case e: SparkException => e.getMessage // Spark 3.5 + case e: DeltaAnalysisException => e.getMessage // Spark 4.0 } - assert(se.getMessage.contains("Expecting 0 partition column(s)")) + assert(errorMessage.contains("Expecting 0 partition column(s)")) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ConvertToDeltaSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ConvertToDeltaSuiteBase.scala index d83c8ae34d7..c6a539579fa 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/ConvertToDeltaSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ConvertToDeltaSuiteBase.scala @@ -359,7 +359,7 @@ trait ConvertToDeltaSuiteBase extends ConvertToDeltaSuiteBaseCommons case ae: AnalysisException => ae } assert(realCause.getMessage.contains("Failed to merge")) - assert(exception.isInstanceOf[AnalysisException] || + assert(realCause.isInstanceOf[AnalysisException] || realCause.getMessage.contains("/part="), "Error message should contain the file name") } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala index 93c22fa3882..0861a5c2954 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala @@ -311,7 +311,7 @@ class DummyCatalog extends TableCatalog { } override def loadTable(ident: Identifier): Table = { if (!tableExists(ident)) { - throw new NoSuchTableException("") + throw new NoSuchTableException(ident) } val tablePath = getTablePath(ident.name()) DeltaTableV2(spark = spark, path = tablePath, catalogTable = Some(createCatalogTable(ident))) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala index e551244afed..ab7f2a1a4b3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala @@ -1940,7 +1940,7 @@ class DeltaColumnMappingSuite extends QueryTest exception = e, errorClass = errorClass, parameters = DeltaThrowableHelper - .getParameterNames(errorClass, errorSubClass = null) + .getParameterNames(Option(errorClass), errorSubClass = None) .zip(invalidColumns).toMap ) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index f4b5f87a956..1114671b282 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -513,7 +513,7 @@ trait DeltaErrorsSuiteBase { val e = intercept[DeltaAnalysisException] { throw DeltaErrors.generatedColumnsReferToWrongColumns( - new AnalysisException("analysis exception")) + new DeltaAnalysisException("analysis exception")) } checkErrorMessage(e, None, None, Some("A generated column cannot use a non-existent column or " + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala index 1c37bd595ba..e3ddd4a84e8 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.FileNames import org.scalatest.GivenWhenThen -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.{SparkConf, SparkException, SparkFileNotFoundException, SparkThrowable} import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.parser.ParseException @@ -567,12 +567,14 @@ abstract class DeltaHistoryManagerBase extends DeltaTimeTravelTests DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key -> "false", DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false") { sql(s"vacuum $tblName retain 0 hours") - intercept[SparkException] { + var e = intercept[SparkThrowable] { sql(s"select * from ${versionAsOf(tblName, 0)}").collect() } - intercept[SparkException] { + assert(e.isInstanceOf[SparkException] || e.isInstanceOf[SparkFileNotFoundException]) + e = intercept[SparkThrowable] { sql(s"select count(*) from ${versionAsOf(tblName, 1)}").collect() } + assert(e.isInstanceOf[SparkException] || e.isInstanceOf[SparkFileNotFoundException]) } } } @@ -593,13 +595,15 @@ abstract class DeltaHistoryManagerBase extends DeltaTimeTravelTests } assert(e1.getMessage.contains("[0, 2]")) - val e2 = intercept[IllegalArgumentException] { + val e2 = intercept[AnalysisException] { spark.read.format("delta") .option("versionAsOf", 3) .option("timestampAsOf", "2020-10-22 23:20:11") .table(tblName).collect() } - assert(e2.getMessage.contains("either provide 'timestampAsOf' or 'versionAsOf'")) + assert(e2.getMessage.contains( + "Cannot specify both version and timestamp when time travelling the table")) + assert(e2.errorClass.isDefined && e2.errorClass.get == "INVALID_TIME_TRAVEL_SPEC") } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala index 7e548916432..73e2e1b8694 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala @@ -435,7 +435,8 @@ abstract class DeltaInsertIntoTestsWithTempViews( checkAnswer(spark.table("v"), expectedResult) } catch { case e: AnalysisException => - assert(e.getMessage.contains("Inserting into a view is not allowed") || + assert(e.getMessage.contains("[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]") || + e.getMessage.contains("Inserting into a view is not allowed") || e.getMessage.contains("Inserting into an RDD-based table is not allowed") || e.getMessage.contains("Table default.v not found") || e.getMessage.contains("Table or view 'v' not found in database 'default'") || @@ -609,13 +610,12 @@ class DeltaColumnDefaultsInsertSuite extends InsertIntoSQLOnlyTests with DeltaSQ errorClass = "WRONG_COLUMN_DEFAULTS_FOR_DELTA_ALTER_TABLE_ADD_COLUMN_NOT_SUPPORTED" ) } - // The default value fails to analyze. checkError( exception = intercept[AnalysisException] { sql(s"create table t4 (s int default badvalue) using $v2Format " + s"$tblPropertiesAllowDefaults") }, - errorClass = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION", + errorClass = "INVALID_DEFAULT_VALUE.NOT_CONSTANT", parameters = Map( "statement" -> "CREATE TABLE", "colName" -> "`s`", diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala index 0b3475b228f..7deaed98815 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala @@ -32,6 +32,8 @@ import org.apache.spark.sql.delta.util.FileNames.deltaFile import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path, PathHandle} import org.apache.spark.SparkException +import org.apache.spark.SparkFileNotFoundException +import org.apache.spark.SparkThrowable import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier @@ -1491,7 +1493,10 @@ class DeltaSuite extends QueryTest val thrown = intercept[SparkException] { data.toDF().collect() } - assert(thrown.getMessage.contains("is not a Parquet file")) + assert( + thrown.getMessage.contains("is not a Parquet file") // Spark 3.5 + || thrown.getMessage.contains("[FAILED_READ_FILE]") // Spark 4.0 + ) } } } @@ -1540,10 +1545,14 @@ class DeltaSuite extends QueryTest tempDirPath.getFileSystem(deltaLog.newDeltaHadoopConf()), pathToDelete) assert(deleted) - val thrown = intercept[SparkException] { + intercept[SparkThrowable] { data.toDF().collect() + } match { + case _: SparkFileNotFoundException => // Spark 4.0 + case thrown: SparkException => + assert(thrown.getMessage.contains("FileNotFound")) } - assert(thrown.getMessage.contains("FileNotFound")) + } } @@ -1671,11 +1680,15 @@ class DeltaSuite extends QueryTest spark.emptyDataFrame ) } - var cause = ex.getCause - while (cause.getCause != null) { - cause = cause.getCause + ex match { + case _: SparkFileNotFoundException => // Spark 4.0 + case _ => // Spark 3.5 + var cause = ex.getCause + while (cause.getCause != null) { + cause = cause.getCause + } + assert(cause.getMessage.contains(".parquet does not exist")) } - assert(cause.getMessage.contains(".parquet does not exist")) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala index 0965a57175b..75d0e376308 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.scalatest.GivenWhenThen -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.{SparkConf, SparkException, SparkFileNotFoundException} import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Literal @@ -390,7 +390,7 @@ trait DeltaVacuumSuiteBase extends QueryTest checkAnswer(sql(s"SELECT * FROM delta.`${dir.getAbsolutePath}`"), df1) // try reading cdc data - val e = intercept[SparkException] { + val e = intercept[SparkFileNotFoundException] { spark.read .format("delta") .option(DeltaOptions.CDC_READ_OPTION, "true") @@ -448,7 +448,7 @@ trait DeltaVacuumSuiteBase extends QueryTest assert(getCDCFiles(deltaLog).size === 1) // still just the one cdc file from before. // try reading cdc data - val e = intercept[SparkException] { + val e = intercept[SparkFileNotFoundException] { spark.read .format("delta") .option(DeltaOptions.CDC_READ_OPTION, "true") @@ -512,6 +512,8 @@ class DeltaVacuumSuite vacuumSQLTest(tablePath, viewName) } assert( + e.getMessage.contains("'VACUUM' expects a table but `v` is a view") || + e.getMessage.contains("EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE") || e.getMessage.contains("v is a temp view. 'VACUUM' expects a table.")) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala new file mode 100644 index 00000000000..b19e29b52fa --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala @@ -0,0 +1,112 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.SparkThrowable +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils} +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.types.StructType + +class DeltaVariantSuite + extends QueryTest + with DeltaSQLCommandTest { + + private def getProtocolForTable(table: String): Protocol = { + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table)) + deltaLog.unsafeVolatileSnapshot.protocol + } + + test("create a new table with Variant, higher protocol and feature should be picked.") { + withTable("tbl") { + sql("CREATE TABLE tbl(s STRING, v VARIANT) USING DELTA") + sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))") + // TODO(r.chen): Enable once variant casting is properly implemented in OSS Spark. + // assert(spark.table("tbl").selectExpr("v::int").head == Row(99)) + assert( + getProtocolForTable("tbl") == + VariantTypeTableFeature.minProtocolVersion.withFeature(VariantTypeTableFeature) + ) + } + } + + test("creating a table without Variant should use the usual minimum protocol") { + withTable("tbl") { + sql("CREATE TABLE tbl(s STRING, i INTEGER) USING DELTA") + assert(getProtocolForTable("tbl") == Protocol(1, 2)) + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl")) + assert( + !deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(VariantTypeTableFeature), + s"Table tbl contains VariantTypeFeature descriptor when its not supposed to" + ) + } + } + + test("add a new Variant column should upgrade to the correct protocol versions") { + withTable("tbl") { + sql("CREATE TABLE tbl(s STRING) USING delta") + assert(getProtocolForTable("tbl") == Protocol(1, 2)) + + // Should throw error + val e = intercept[SparkThrowable] { + sql("ALTER TABLE tbl ADD COLUMN v VARIANT") + } + // capture the existing protocol here. + // we will check the error message later in this test as we need to compare the + // expected schema and protocol + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl")) + val currentProtocol = deltaLog.unsafeVolatileSnapshot.protocol + val currentFeatures = currentProtocol.implicitlyAndExplicitlySupportedFeatures + .map(_.name) + .toSeq + .sorted + .mkString(", ") + + // add table feature + sql( + s"ALTER TABLE tbl " + + s"SET TBLPROPERTIES('delta.feature.variantType-dev' = 'supported')" + ) + + sql("ALTER TABLE tbl ADD COLUMN v VARIANT") + + // check previously thrown error message + checkError( + e, + errorClass = "DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT", + parameters = Map( + "unsupportedFeatures" -> VariantTypeTableFeature.name, + "supportedFeatures" -> currentFeatures + ) + ) + + sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))") + // TODO(r.chen): Enable once variant casting is properly implemented in OSS Spark. + // assert(spark.table("tbl").selectExpr("v::int").head == Row(99)) + + assert( + getProtocolForTable("tbl") == + VariantTypeTableFeature.minProtocolVersion + .withFeature(VariantTypeTableFeature) + .withFeature(InvariantsTableFeature) + .withFeature(AppendOnlyTableFeature) + ) + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala index 26deaae11eb..dc203ae66c0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala @@ -224,8 +224,15 @@ trait DescribeDeltaHistorySuiteBase val e = intercept[AnalysisException] { sql(s"DESCRIBE HISTORY $viewName").collect() } - assert(e.getMessage.contains("spark_catalog.default.delta_view is a view. " + - "'DESCRIBE HISTORY' expects a table")) + // Error message varies slightly between Spark versions because Spark 3.5 still uses + // QueryCompilationErrors.expectTableNotViewError instead of + // QueryCompilationErrors.unsupportedViewOperationError + assert( + e.getMessage.contains( // 3.5 + "spark_catalog.default.delta_view is a view. 'DESCRIBE HISTORY' expects a table") || + e.getMessage.contains( // 4.0 + "'DESCRIBE HISTORY' expects a table but `spark_catalog`.`default`.`delta_view` is a view") + ) } } @@ -238,7 +245,15 @@ trait DescribeDeltaHistorySuiteBase val e = intercept[AnalysisException] { sql(s"DESCRIBE HISTORY $viewName").collect() } - assert(e.getMessage.contains("v is a temp view. 'DESCRIBE HISTORY' expects a table")) + // Error message varies slightly between Spark versions because Spark 3.5 still uses + // QueryCompilationErrors.expectTableNotViewError instead of + // QueryCompilationErrors.unsupportedViewOperationError + assert( + e.getMessage.contains( // 3.5 + "v is a temp view. 'DESCRIBE HISTORY' expects a table") || + e.getMessage.contains( // 4.0 + "'DESCRIBE HISTORY' expects a table but `v` is a view") + ) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ImplicitDMLCastingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ImplicitDMLCastingSuite.scala index 93deca0b39d..71cfa6c2019 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/ImplicitDMLCastingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ImplicitDMLCastingSuite.scala @@ -339,7 +339,8 @@ class ImplicitDMLCastingSuite extends QueryTest val expectedDetails = Seq("DELTA_CAST_OVERFLOW_IN_TABLE_WRITE", sourceValueType, valueColumnName) for (detail <- expectedDetails) { - assert(userFacingError.toString.contains(detail)) + assert(userFacingError.toString.contains(detail) || + userFacingError.getCause.toString.contains(detail)) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMetricsBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMetricsBase.scala index 46e676b6d86..2de881897a5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMetricsBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMetricsBase.scala @@ -237,7 +237,8 @@ trait MergeIntoMetricsBase mergeCmdFn: MergeCmd, expectedOpMetrics: Map[String, Int], testConfig: MergeTestConfiguration, - overrideExpectedOpMetrics: Seq[((Boolean, Boolean), (String, Int))] = Seq.empty + overrideExpectedOpMetrics: Seq[((Boolean, Boolean), (String, Int))] = Seq.empty, + customMetricsChecker: Option[Map[String, String] => Unit] = None ): Unit = { withSQLConf( DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true", @@ -307,6 +308,8 @@ trait MergeIntoMetricsBase checkOperationTimeMetricsInvariant(mergeTimeMetrics, operationMetrics) // Check CDF metrics invariants. checkMergeOperationCdfMetricsInvariants(operationMetrics, testConfig.cdfEnabled) + // Perform custom checks on operationMetrics. + customMetricsChecker.map(f => f(operationMetrics)) } } } @@ -1002,7 +1005,6 @@ trait MergeIntoMetricsBase "numTargetRowsMatchedDeleted" -> 50, "numTargetRowsRemoved" -> -1, "numTargetRowsCopied" -> 10, - "numTargetFilesAdded" -> 2, "numTargetFilesRemoved" -> 3 ) runMergeCmdAndTestMetrics( @@ -1011,13 +1013,20 @@ trait MergeIntoMetricsBase mergeCmdFn = mergeCmdFn, expectedOpMetrics = expectedOpMetrics, testConfig = testConfig, - // When cdf=true in this test we hit the corner case where there are duplicate matches with a - // delete clause and we generate duplicate cdc data. This is further detailed in - // MergeIntoCommand at the definition of isDeleteWithDuplicateMatchesAndCdc. Our fix for this - // scenario includes deduplicating the output data which reshuffles the output data. - // Thus when the table is not partitioned, the data is rewritten into 1 new file rather than 2 - overrideExpectedOpMetrics = Seq(((false, true), ("numTargetFilesAdded", 1))) - ) + customMetricsChecker = Some(operationMetrics => { + val metricValue = operationMetrics("numTargetFilesAdded") + (testConfig.partitioned, testConfig.cdfEnabled) match { + // When cdf=true in this test we hit the corner case where there are duplicate matches + // with a delete clause and we generate duplicate cdc data. This is further detailed in + // MergeIntoCommand at the definition of isDeleteWithDuplicateMatchesAndCdc. Our fix for + // this scenario includes deduplicating the output data which reshuffles the output data. + // Thus when the table is not partitioned, the data is rewritten into 1 new file rather + // than 2. + case (false, true) => assert(metricValue == "1") + // Depending on the Spark version, for non-partitioned tables we may add 1 or 2 files. + case (false, false) => assert(metricValue == "1" || metricValue == "2") + case _ => assert(metricValue == "2") + }})) }} ///////////////////////////// diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala index 17ee49433e3..eb22f92ba02 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala @@ -183,7 +183,8 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar // Guava cache wraps the root cause assert(e.isInstanceOf[SparkException] && e.getMessage.contains("0001.checkpoint") && - e.getMessage.contains(".parquet is not a Parquet file")) + (e.getMessage.contains(".parquet is not a Parquet file") || + e.getMessage.contains("Encountered error while reading file"))) } } } @@ -240,7 +241,8 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar val e = intercept[SparkException] { staleLog.update() } val version = if (testEmptyCheckpoint) 0 else 1 assert(e.getMessage.contains(f"$version%020d.checkpoint") && - e.getMessage.contains(".parquet is not a Parquet file")) + (e.getMessage.contains(".parquet is not a Parquet file") || + e.getMessage.contains("Encountered error while reading file"))) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala index 4bd3441621e..878cfea54a5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.delta.deletionvectors import java.io.{File, FileNotFoundException} +import java.net.URISyntaxException import org.apache.spark.sql.delta.{DeletionVectorsTableFeature, DeletionVectorsTestUtils, DeltaChecksumException, DeltaConfigs, DeltaLog, DeltaMetricsUtils, DeltaTestUtilsForTempViews} import org.apache.spark.sql.delta.DeltaTestUtils.{createTestAddFile, BOOLEAN_DOMAIN} @@ -32,7 +33,7 @@ import io.delta.tables.DeltaTable import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.Path -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkFileNotFoundException} import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Subquery} import org.apache.spark.sql.functions.col @@ -675,11 +676,14 @@ class DeletionVectorsSuite extends QueryTest assert(filesWithDvs.size > 0) deleteDVFile(targetPath, filesWithDvs(0)) - val se = intercept[SparkException] { + try { spark.sql(s"SELECT * FROM delta.`$targetPath`").collect() + } catch { + case _: SparkFileNotFoundException => // All good. + case se: SparkException => + assert(findIfResponsible[FileNotFoundException](se).nonEmpty, + s"Expected a file not found exception as the cause, but got: [${se}]") } - assert(findIfResponsible[FileNotFoundException](se).nonEmpty, - s"Expected a file not found exception as the cause, but got: [${se}]") } } @@ -702,7 +706,10 @@ class DeletionVectorsSuite extends QueryTest val e = intercept[SparkException] { spark.read.format("delta").load(dir.getCanonicalPath).collect() } - assert(e.getMessage.contains("URISyntaxException: Malformed escape pair")) + assert(e.getMessage.contains("URISyntaxException: Malformed escape pair") || + (e.getCause.isInstanceOf[URISyntaxException] && + e.getCause.getMessage.contains("Malformed escape pair")) + ) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/stats/StatsCollectionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/stats/StatsCollectionSuite.scala index 1fad73de8f5..a36e98bc968 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/stats/StatsCollectionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/stats/StatsCollectionSuite.scala @@ -446,9 +446,9 @@ class StatsCollectionSuite exceptOne.getMessageParametersArray.toSeq == Seq(columnName, typename) ) sql(s"create table $tableName2 (c1 long, c2 $invalidType) using delta") - val exceptTwo = intercept[Throwable] { + val exceptTwo = intercept[DeltaIllegalArgumentException] { sql(s"ALTER TABLE $tableName2 SET TBLPROPERTIES('delta.dataSkippingStatsColumns' = 'c2')") - }.getCause.asInstanceOf[DeltaIllegalArgumentException] + } assert( exceptTwo.getErrorClass == "DELTA_COLUMN_DATA_SKIPPING_NOT_SUPPORTED_TYPE" && exceptTwo.getMessageParametersArray.toSeq == Seq(columnName, typename) @@ -480,18 +480,18 @@ class StatsCollectionSuite exceptTwo.getMessageParametersArray.toSeq == Seq(columnName, typename) ) sql(s"create table $tableName2 (c1 long, c2 STRUCT) using delta") - val exceptThree = intercept[Throwable] { + val exceptThree = intercept[DeltaIllegalArgumentException] { sql( s"ALTER TABLE $tableName2 SET TBLPROPERTIES('delta.dataSkippingStatsColumns'='c2.c21')" ) - }.getCause.asInstanceOf[DeltaIllegalArgumentException] + } assert( exceptThree.getErrorClass == "DELTA_COLUMN_DATA_SKIPPING_NOT_SUPPORTED_TYPE" && exceptThree.getMessageParametersArray.toSeq == Seq(columnName, typename) ) - val exceptFour = intercept[Throwable] { + val exceptFour = intercept[DeltaIllegalArgumentException] { sql(s"ALTER TABLE $tableName2 SET TBLPROPERTIES('delta.dataSkippingStatsColumns'='c2')") - }.getCause.asInstanceOf[DeltaIllegalArgumentException] + } assert( exceptFour.getErrorClass == "DELTA_COLUMN_DATA_SKIPPING_NOT_SUPPORTED_TYPE" && exceptFour.getMessageParametersArray.toSeq == Seq(columnName, typename) @@ -589,11 +589,11 @@ class StatsCollectionSuite ) } else { sql("create table delta_table(c0 int, c1 int) using delta partitioned by(c1)") - val except = intercept[Throwable] { + val except = intercept[DeltaIllegalArgumentException] { sql( "ALTER TABLE delta_table SET TBLPROPERTIES ('delta.dataSkippingStatsColumns' = 'c1')" ) - }.getCause.asInstanceOf[DeltaIllegalArgumentException] + } assert( except.getErrorClass == "DELTA_COLUMN_DATA_SKIPPING_NOT_SUPPORTED_PARTITIONED_COLUMN" && except.getMessageParametersArray.toSeq == Seq("c1") @@ -790,12 +790,12 @@ class StatsCollectionSuite ("'c1.c11,c1.c11'", "c1.c11"), ("'c1,c1'", "c1.c11,c1.c12") ).foreach { case (statsColumns, duplicatedColumns) => - val exception = intercept[SparkException] { + val exception = intercept[DeltaIllegalArgumentException] { sql( s"ALTER TABLE delta_table_t1 " + s"SET TBLPROPERTIES('delta.dataSkippingStatsColumns'=$statsColumns)" ) - }.getCause.asInstanceOf[DeltaIllegalArgumentException] + } assert( exception.getErrorClass == "DELTA_DUPLICATE_DATA_SKIPPING_COLUMNS" && exception.getMessageParametersArray.toSeq == Seq(duplicatedColumns) diff --git a/storage/src/main/java/io/delta/storage/internal/S3LogStoreUtil.java b/storage/src/main/java/io/delta/storage/internal/S3LogStoreUtil.java index 198d18ab9b7..9557337283d 100644 --- a/storage/src/main/java/io/delta/storage/internal/S3LogStoreUtil.java +++ b/storage/src/main/java/io/delta/storage/internal/S3LogStoreUtil.java @@ -94,7 +94,7 @@ public static FileStatus[] s3ListFromArray( "The Hadoop file system used for the S3LogStore must be castable to " + "org.apache.hadoop.fs.s3a.S3AFileSystem.", e); } - return iteratorToStatuses(S3LogStoreUtil.s3ListFrom(s3afs, resolvedPath, parentPath), new HashSet<>()); + return iteratorToStatuses(S3LogStoreUtil.s3ListFrom(s3afs, resolvedPath, parentPath)); } /**