Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9fa3259
Compile with Spark master snapshot and fix compile errors
allisonport-db Feb 23, 2024
8436a40
Resolve more compilation issues
allisonport-db Mar 4, 2024
93dce95
update github action java version
allisonport-db Mar 4, 2024
061c9cf
Add back getMessageParametersArray
allisonport-db Mar 5, 2024
6b367d5
Fix whitespace
allisonport-db Mar 5, 2024
7a7aeb7
support unit tests with java 17
allisonport-db Mar 6, 2024
e31420b
Upgrade hadoop version to match spark
allisonport-db Mar 6, 2024
109db39
Fix ConvertToDelta[Scala/sql]Suite - Part 1
mingdai-db Mar 7, 2024
77c159d
Fix DeltaInvariantCheckerExec
andreaschat-db Mar 12, 2024
591cb52
Fix DeltaInsertIntoSQLSuite
andreaschat-db Mar 14, 2024
772ffc2
fix DeltaColumnDefaultsInsertSuite
andreaschat-db Mar 14, 2024
7d7b969
fix DeletionVectorsSuite
andreaschat-db Mar 14, 2024
4ab595c
fix RemoveColumnMappingSuite
andreaschat-db Mar 14, 2024
98b264a
fixes DeltaHistoryManagerSuite
Mar 14, 2024
75548f0
fixes SnapshotManagementSuite
Mar 14, 2024
8011e67
fixes StatsCollectionSuite
Mar 14, 2024
69b9db2
Fix some test suites
allisonport-db Mar 19, 2024
a57465a
add java 17 test options for sharing module
allisonport-db Mar 19, 2024
e14d2cf
Fix DescribeDeltaHistorySuite tests for DESCRIBE HISTORY
allisonport-db Mar 19, 2024
a8ec23d
Merge remote-tracking branch 'delta-io/master' into spark-4.0-upgrade…
allisonport-db Mar 19, 2024
3bfc70e
fix CloneParquetByPathSuite
allisonport-db Mar 20, 2024
12fb378
Merge remote-tracking branch 'delta-io/master' into spark-4.0-upgrade…
allisonport-db Mar 21, 2024
3cdeed7
fix ImplicitDMLCastingSuite and DeletionVectorsSuite
allisonport-db Mar 21, 2024
2978f8b
fix DescribeDeltaHistorySuite
allisonport-db Mar 21, 2024
c089ca6
Merge remote-tracking branch 'delta-io/master' into spark-4.0-upgrade…
allisonport-db Mar 21, 2024
c20b7b5
fix compile
richardc-db Mar 20, 2024
c34f00d
minimal support
richardc-db Mar 12, 2024
7a30942
test
richardc-db Mar 19, 2024
f9182c2
init no tests yet
richardc-db Mar 12, 2024
696a79e
fix reads
richardc-db Mar 14, 2024
99f64df
changename
richardc-db Mar 15, 2024
c2dd42e
use get child instead
richardc-db Mar 17, 2024
1e21fbb
fix compile
richardc-db Mar 18, 2024
ece859a
fix
richardc-db Mar 18, 2024
29e1568
changes
richardc-db Mar 20, 2024
7bc802d
use defaultvariantvalue
richardc-db Mar 21, 2024
a9a1c5b
rebase
richardc-db Mar 25, 2024
7e197e8
init no tests yet
richardc-db Mar 12, 2024
651b6e5
init
richardc-db Mar 12, 2024
17ba8c3
add tests
richardc-db Mar 18, 2024
c76743e
remove todo
richardc-db Mar 20, 2024
cd199d6
delete
richardc-db Mar 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/spark_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
strategy:
matrix:
# These Scala versions must match those in the build.sbt
scala: [2.12.17, 2.13.8]
scala: [2.13.8]
env:
SCALA_VERSION: ${{ matrix.scala }}
steps:
Expand All @@ -23,7 +23,7 @@ jobs:
uses: actions/setup-java@v3
with:
distribution: "zulu"
java-version: "8"
java-version: "17"
- name: Cache Scala, SBT
uses: actions/cache@v3
with:
Expand Down
36 changes: 29 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ val all_scala_versions = Seq(scala212, scala213)
// sbt 'set default_scala_version := 2.13.8' [commands]
// FIXME Why not use scalaVersion?
val default_scala_version = settingKey[String]("Default Scala version")
Global / default_scala_version := scala212
Global / default_scala_version := scala213
// TODO set scala only to 2.13 for spark but keep 2.12 for other projects?

// Dependent library versions
val sparkVersion = "3.5.0"
val sparkVersion = "4.0.0-SNAPSHOT"
val flinkVersion = "1.16.1"
val hadoopVersion = "3.3.4"
val hadoopVersion = "3.3.6"
val scalaTestVersion = "3.2.15"
val scalaTestVersionForConnectors = "3.0.8"
val parquet4sVersion = "1.9.4"
Expand Down Expand Up @@ -90,6 +91,20 @@ lazy val commonSettings = Seq(
unidocSourceFilePatterns := Nil,
)

// Copied from SparkBuild.scala to support Java 17 for unit tests (see apache/spark#34153)
val extraJavaTestArgs = Seq(
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
"--add-opens=java.base/java.io=ALL-UNNAMED",
"--add-opens=java.base/java.net=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED",
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.cs=ALL-UNNAMED",
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED")

lazy val spark = (project in file("spark"))
.dependsOn(storage)
.enablePlugins(Antlr4Plugin)
Expand Down Expand Up @@ -117,17 +132,18 @@ lazy val spark = (project in file("spark"))
"org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests",
),
// For adding staged Spark RC versions, Ex:
// resolvers += "Apche Spark 3.5.0 (RC1) Staging" at "https://repository.apache.org/content/repositories/orgapachespark-1444/",
resolvers += "Spark master staging" at "https://repository.apache.org/content/groups/snapshots/",
Compile / packageBin / mappings := (Compile / packageBin / mappings).value ++
listPythonFiles(baseDirectory.value.getParentFile / "python"),

Antlr4 / antlr4Version:= "4.9.3",
Antlr4 / antlr4Version:= "4.13.1",
Antlr4 / antlr4PackageName := Some("io.delta.sql.parser"),
Antlr4 / antlr4GenListener := true,
Antlr4 / antlr4GenVisitor := true,

Test / testOptions += Tests.Argument("-oDF"),
Test / testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
Test / javaOptions ++= extraJavaTestArgs, // Required for UTs with Java 17

// Don't execute in parallel since we can't have multiple Sparks in the same JVM
Test / parallelExecution := false,
Expand Down Expand Up @@ -238,7 +254,8 @@ lazy val sharing = (project in file("sharing"))
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests",
)
),
Test / javaOptions ++= extraJavaTestArgs // Required for UTs with Java 17
).configureUnidoc()

lazy val kernelApi = (project in file("kernel/kernel-api"))
Expand Down Expand Up @@ -349,6 +366,7 @@ val icebergSparkRuntimeArtifactName = {
s"iceberg-spark-runtime-$expMaj.$expMin"
}

/*
lazy val testDeltaIcebergJar = (project in file("testDeltaIcebergJar"))
// delta-iceberg depends on delta-spark! So, we need to include it during our test.
.dependsOn(spark % "test")
Expand All @@ -364,6 +382,7 @@ lazy val testDeltaIcebergJar = (project in file("testDeltaIcebergJar"))
"org.apache.spark" %% "spark-core" % sparkVersion % "test"
)
)
*/

val deltaIcebergSparkIncludePrefixes = Seq(
// We want everything from this package
Expand All @@ -376,6 +395,7 @@ val deltaIcebergSparkIncludePrefixes = Seq(
"org/apache/spark/sql/delta/commands/convert/IcebergTable"
)

/*
// Build using: build/sbt clean icebergShaded/compile iceberg/compile
// It will fail the first time, just re-run it.
// scalastyle:off println
Expand Down Expand Up @@ -450,6 +470,7 @@ lazy val iceberg = (project in file("iceberg"))
assemblyPackageScala / assembleArtifact := false
)
// scalastyle:on println
*/

lazy val generateIcebergJarsTask = TaskKey[Unit]("generateIcebergJars", "Generate Iceberg JARs")

Expand Down Expand Up @@ -1119,7 +1140,8 @@ val createTargetClassesDir = taskKey[Unit]("create target classes dir")

// Don't use these groups for any other projects
lazy val sparkGroup = project
.aggregate(spark, contribs, storage, storageS3DynamoDB, iceberg, testDeltaIcebergJar, sharing)
.aggregate(spark, contribs, storage, storageS3DynamoDB, sharing)
// .aggregate(spark, contribs, storage, storageS3DynamoDB, iceberg, testDeltaIcebergJar, sharing)
.settings(
// crossScalaVersions must be set to Nil on the aggregating project
crossScalaVersions := Nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ default ArrayValue getArray(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Return the variant value located at {@code rowId}. Returns null if the slot for {@code rowId}
* is null
*/
default VariantValue getVariant(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Get the child vector associated with the given ordinal. This method is applicable only to the
* {@code struct} type columns.
Expand Down
6 changes: 6 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/data/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,10 @@ public interface Row {
* Throws error if the column at given ordinal is not of map type,
*/
MapValue getMap(int ordinal);

/**
* Return variant value of the column located at the given ordinal.
* Throws error if the column at given ordinal is not of variant type.
*/
VariantValue getVariant(int ordinal);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.data;

/**
* Abstraction to represent a single Variant value in a {@link ColumnVector}.
*/
public interface VariantValue {
byte[] getValue();

byte[] getMetadata();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.data.VariantValue;
import io.delta.kernel.types.StructType;

/**
Expand Down Expand Up @@ -111,5 +112,10 @@ public MapValue getMap(int ordinal) {
return getChild(ordinal).getMap(rowId);
}

@Override
public VariantValue getVariant(int ordinal) {
return getChild(ordinal).getVariant(rowId);
}

protected abstract ColumnVector getChild(int ordinal);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.data.VariantValue;
import io.delta.kernel.types.*;

/**
Expand Down Expand Up @@ -134,6 +135,12 @@ public MapValue getMap(int ordinal) {
return (MapValue) getValue(ordinal);
}

@Override
public VariantValue getVariant(int ordinal) {
throwIfUnsafeAccess(ordinal, VariantType.class, "variant");
return (VariantValue) getValue(ordinal);
}

private Object getValue(int ordinal) {
return ordinalToValue.get(ordinal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ private void validateSupportedTable(Protocol protocol, Metadata metadata) {
case "columnMapping":
ColumnMapping.throwOnUnsupportedColumnMappingMode(metadata);
break;
case "variantType-dev":
break;
default:
throw new UnsupportedOperationException(
"Unsupported table feature: " + readerFeature);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ private static Object getValueAsObject(
return toJavaList(columnVector.getArray(rowId));
} else if (dataType instanceof MapType) {
return toJavaMap(columnVector.getMap(rowId));
} else if (dataType instanceof VariantType) {
return columnVector.getVariant(rowId);
} else {
throw new UnsupportedOperationException("unsupported data type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public static List<DataType> getAllPrimitiveTypes() {
put("timestamp", TimestampType.TIMESTAMP);
put("binary", BinaryType.BINARY);
put("string", StringType.STRING);
put("variant", VariantType.VARIANT);
}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.types;

import io.delta.kernel.annotation.Evolving;

/**
* A logical variant type.
* @since 4.0.0
*/
@Evolving
public class VariantType extends BasePrimitiveType {
public static final VariantType VARIANT = new VariantType();

private VariantType() {
super("variant");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
// corrupt incomplete multi-part checkpoint
val corruptedCheckpointStatuses = FileNames.checkpointFileWithParts(logPath, 10, 5).asScala
.map(p => FileStatus.of(p.toString, 10, 10))
.take(4)
.take(4).toSeq
val deltas = deltaFileStatuses(10L to 13L)
testExpectedError[RuntimeException](
corruptedCheckpointStatuses ++ deltas,
Expand Down Expand Up @@ -667,7 +667,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
// _last_checkpoint refers to incomplete multi-part checkpoint
val corruptedCheckpointStatuses = FileNames.checkpointFileWithParts(logPath, 20, 5).asScala
.map(p => FileStatus.of(p.toString, 10, 10))
.take(4)
.take(4).toSeq
testExpectedError[RuntimeException](
files = corruptedCheckpointStatuses ++ deltaFileStatuses(10L to 20L) ++
singularCheckpointFileStatuses(Seq(10L)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.data.VariantValue;
import io.delta.kernel.types.*;
import io.delta.kernel.internal.util.InternalUtils;

Expand Down Expand Up @@ -128,6 +129,11 @@ public MapValue getMap(int ordinal) {
return (MapValue) parsedValues[ordinal];
}

@Override
public VariantValue getVariant(int ordinal) {
throw new UnsupportedOperationException("not yet implemented");
}

private static void throwIfTypeMismatch(String expType, boolean hasExpType, JsonNode jsonNode) {
if (!hasExpType) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults.internal.data.value;

import java.util.Arrays;

import io.delta.kernel.data.VariantValue;

/**
* Default implementation of a Delta kernel VariantValue.
*/
public class DefaultVariantValue implements VariantValue {
private final byte[] value;
private final byte[] metadata;

public DefaultVariantValue(byte[] value, byte[] metadata) {
this.value = value;
this.metadata = metadata;
}

@Override
public byte[] getValue() {
return value;
}

@Override
public byte[] getMetadata() {
return metadata;
}

@Override
public String toString() {
return "VariantValue{value=" + Arrays.toString(value) +
", metadata=" + Arrays.toString(metadata) + '}';
}

/**
* Compare two variants in bytes. The variant equality is more complex than it, and we haven't
* supported it in the user surface yet. This method is only intended for tests.
*/
@Override
public boolean equals(Object other) {
if (other instanceof DefaultVariantValue) {
return Arrays.equals(value, ((DefaultVariantValue) other).getValue()) &&
Arrays.equals(metadata, ((DefaultVariantValue) other).getMetadata());
} else {
return false;
}
}
}
Loading