Skip to content
Closed
53 changes: 32 additions & 21 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ val LATEST_RELEASED_SPARK_VERSION = "3.5.0"
val SPARK_MASTER_VERSION = "4.0.0-SNAPSHOT"
val sparkVersion = settingKey[String]("Spark version")
spark / sparkVersion := getSparkVersion()
kernelDefaults / sparkVersion := getSparkVersion()
goldenTables / sparkVersion := getSparkVersion()

// Dependent library versions
val defaultSparkVersion = LATEST_RELEASED_SPARK_VERSION
Expand Down Expand Up @@ -126,6 +128,25 @@ lazy val commonSettings = Seq(
unidocSourceFilePatterns := Nil,
)

/**
* Java-/Scala-/Uni-Doc settings aren't working yet against Spark Master.
1) delta-spark on Spark Master uses JDK 17. delta-iceberg uses JDK 8 or 11. For some reason,
generating delta-spark unidoc compiles delta-iceberg
2) delta-spark unidoc fails to compile. spark 3.5 is on its classpath. likely due to iceberg
issue above.
*/
def crossSparkProjectSettings(): Seq[Setting[_]] = getSparkVersion() match {
case LATEST_RELEASED_SPARK_VERSION => Seq(
// Java-/Scala-/Uni-Doc Settings
scalacOptions ++= Seq(
"-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc
),
unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/"))
)

case SPARK_MASTER_VERSION => Seq()
}

/**
* Note: we cannot access sparkVersion.value here, since that can only be used within a task or
* setting macro.
Expand All @@ -140,12 +161,6 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-3.5",
Test / unmanagedSourceDirectories += (Test / baseDirectory).value / "src" / "test" / "scala-spark-3.5",
Antlr4 / antlr4Version := "4.9.3",

// Java-/Scala-/Uni-Doc Settings
scalacOptions ++= Seq(
"-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc
),
unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/"))
)

case SPARK_MASTER_VERSION => Seq(
Expand All @@ -170,13 +185,6 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED"
)

// Java-/Scala-/Uni-Doc Settings
// This isn't working yet against Spark Master.
// 1) delta-spark on Spark Master uses JDK 17. delta-iceberg uses JDK 8 or 11. For some reason,
// generating delta-spark unidoc compiles delta-iceberg
// 2) delta-spark unidoc fails to compile. spark 3.5 is on its classpath. likely due to iceberg
// issue above.
)
}

Expand All @@ -190,6 +198,7 @@ lazy val spark = (project in file("spark"))
sparkMimaSettings,
releaseSettings,
crossSparkSettings(),
crossSparkProjectSettings(),
libraryDependencies ++= Seq(
// Adding test classifier seems to break transitive resolution of the core dependencies
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided",
Expand Down Expand Up @@ -358,6 +367,7 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
scalaStyleSettings,
javaOnlyReleaseSettings,
Test / javaOptions ++= Seq("-ea"),
crossSparkSettings(),
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client-runtime" % hadoopVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5",
Expand All @@ -374,10 +384,10 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
"org.openjdk.jmh" % "jmh-core" % "1.37" % "test",
"org.openjdk.jmh" % "jmh-generator-annprocess" % "1.37" % "test",

"org.apache.spark" %% "spark-hive" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
),
javaCheckstyleSettings("kernel/dev/checkstyle.xml"),
// Unidoc settings
Expand Down Expand Up @@ -1072,14 +1082,15 @@ lazy val goldenTables = (project in file("connectors/golden-tables"))
name := "golden-tables",
commonSettings,
skipReleaseSettings,
crossSparkSettings(),
libraryDependencies ++= Seq(
// Test Dependencies
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"commons-io" % "commons-io" % "2.8.0" % "test",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test",
"org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests"
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests"
)
)

Expand Down
8 changes: 8 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.PartitionUtils;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.internal.util.VariantUtils;

/**
* Represents a scan of a Delta table.
Expand Down Expand Up @@ -194,6 +195,13 @@ public FilteredColumnarBatch next() {
nextDataBatch = nextDataBatch.withDeletedColumnAt(rowIndexOrdinal);
}

// Transform physical variant columns (struct of binaries) into logical variant
// columns.
nextDataBatch = VariantUtils.withVariantColumns(
tableClient.getExpressionHandler(),
nextDataBatch
);

// Add partition columns
nextDataBatch =
PartitionUtils.withPartitionColumns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ default ArrayValue getArray(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Return the variant value located at {@code rowId}. Returns null if the slot for {@code rowId}
* is null
*/
default VariantValue getVariant(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Get the child vector associated with the given ordinal. This method is applicable only to the
* {@code struct} type columns.
Expand Down
6 changes: 6 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/data/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,10 @@ public interface Row {
* Throws error if the column at given ordinal is not of map type,
*/
MapValue getMap(int ordinal);

/**
* Return variant value of the column located at the given ordinal.
* Throws error if the column at given ordinal is not of variant type.
*/
VariantValue getVariant(int ordinal);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.data;

/**
* Abstraction to represent a single Variant value in a {@link ColumnVector}.
*/
public interface VariantValue {
byte[] getValue();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may need a design decision. Is there a better interface we could provide that hides the complexity of the knowing metadata format?

(... rough idea ... - haven't thought about all details)

  • API to expose the paths (and potentially type of each path as well)
  • API to get value. If the path is an int VariantValue.getInt(String path) -> return integer.


byte[] getMetadata();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public static void validateReadSupportedTable(Protocol protocol, Metadata metada
break;
case "deletionVectors": // fall through
case "timestampNtz": // fall through
case "vacuumProtocolCheck": // fall through
case "v2Checkpoint":
case "variantType-dev": // fall through
case "vacuumProtocolCheck":
break;
default:
throw DeltaErrors.unsupportedReadFeature(3, readerFeature);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.data.VariantValue;
import io.delta.kernel.types.StructType;

/**
Expand Down Expand Up @@ -111,5 +112,10 @@ public MapValue getMap(int ordinal) {
return getChild(ordinal).getMap(rowId);
}

@Override
public VariantValue getVariant(int ordinal) {
return getChild(ordinal).getVariant(rowId);
}

protected abstract ColumnVector getChild(int ordinal);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.data.VariantValue;
import io.delta.kernel.types.*;

/**
Expand Down Expand Up @@ -134,6 +135,12 @@ public MapValue getMap(int ordinal) {
return (MapValue) getValue(ordinal);
}

@Override
public VariantValue getVariant(int ordinal) {
throwIfUnsafeAccess(ordinal, VariantType.class, "variant");
return (VariantValue) getValue(ordinal);
}

private Object getValue(int ordinal) {
return ordinalToValue.get(ordinal);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.internal.util;

import java.util.Arrays;

import io.delta.kernel.client.ExpressionHandler;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.expressions.*;
import io.delta.kernel.types.*;

public class VariantUtils {
public static ColumnarBatch withVariantColumns(
ExpressionHandler expressionHandler,
ColumnarBatch dataBatch) {
for (int i = 0; i < dataBatch.getSchema().length(); i++) {
StructField field = dataBatch.getSchema().at(i);
if (!(field.getDataType() instanceof StructType) &&
!(field.getDataType() instanceof ArrayType) &&
!(field.getDataType() instanceof MapType) &&
(field.getDataType() != VariantType.VARIANT ||
dataBatch.getColumnVector(i).getDataType() == VariantType.VARIANT)) {
continue;
}

ExpressionEvaluator evaluator = expressionHandler.getEvaluator(
// Field here is variant type if its actually a variant.
// TODO: probably better to pass in the schema as an argument
// so the schema is enforced at the expression level. Need to pass in a literal
// schema
new StructType().add(field),
new ScalarExpression(
"variant_coalesce",
Arrays.asList(new Column(field.getName()))
),
VariantType.VARIANT
);

// TODO: don't need to pass in the entire batch.
ColumnVector variantCol = evaluator.eval(dataBatch);
// TODO: make a more efficient way to do this.
dataBatch =
dataBatch.withDeletedColumnAt(i).withNewColumn(i, field, variantCol);
}
return dataBatch;
}

private static ColumnVector[] getColumnBatchVectors(ColumnarBatch batch) {
ColumnVector[] res = new ColumnVector[batch.getSchema().length()];
for (int i = 0; i < batch.getSchema().length(); i++) {
res[i] = batch.getColumnVector(i);
}
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ private static Object getValueAsObject(
return toJavaList(columnVector.getArray(rowId));
} else if (dataType instanceof MapType) {
return toJavaMap(columnVector.getMap(rowId));
} else if (dataType instanceof VariantType) {
return columnVector.getVariant(rowId);
} else {
throw new UnsupportedOperationException("unsupported data type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public static List<DataType> getAllPrimitiveTypes() {
put("timestamp_ntz", TimestampNTZType.TIMESTAMP_NTZ);
put("binary", BinaryType.BINARY);
put("string", StringType.STRING);
put("variant", VariantType.VARIANT);
}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.types;

import io.delta.kernel.annotation.Evolving;

/**
* A logical variant type.
* @since 4.0.0
*/
@Evolving
public class VariantType extends BasePrimitiveType {
public static final VariantType VARIANT = new VariantType();

private VariantType() {
super("variant");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
// corrupt incomplete multi-part checkpoint
val corruptedCheckpointStatuses = FileNames.checkpointFileWithParts(logPath, 10, 5).asScala
.map(p => FileStatus.of(p.toString, 10, 10))
.take(4)
.take(4).toSeq
val deltas = deltaFileStatuses(10L to 13L)
testExpectedError[RuntimeException](
corruptedCheckpointStatuses ++ deltas,
Expand Down Expand Up @@ -808,7 +808,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
// _last_checkpoint refers to incomplete multi-part checkpoint
val corruptedCheckpointStatuses = FileNames.checkpointFileWithParts(logPath, 20, 5).asScala
.map(p => FileStatus.of(p.toString, 10, 10))
.take(4)
.take(4).toSeq
testExpectedError[RuntimeException](
files = corruptedCheckpointStatuses ++ deltaFileStatuses(10L to 20L) ++
singularCheckpointFileStatuses(Seq(10L)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.data.VariantValue;
import io.delta.kernel.types.*;
import io.delta.kernel.internal.util.InternalUtils;

Expand Down Expand Up @@ -128,6 +129,11 @@ public MapValue getMap(int ordinal) {
return (MapValue) parsedValues[ordinal];
}

@Override
public VariantValue getVariant(int ordinal) {
throw new UnsupportedOperationException("not yet implemented");
}

private static void throwIfTypeMismatch(String expType, boolean hasExpType, JsonNode jsonNode) {
if (!hasExpType) {
throw new RuntimeException(
Expand Down
Loading