Skip to content
57 changes: 36 additions & 21 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ val LATEST_RELEASED_SPARK_VERSION = "3.5.0"
val SPARK_MASTER_VERSION = "4.0.0-SNAPSHOT"
val sparkVersion = settingKey[String]("Spark version")
spark / sparkVersion := getSparkVersion()
kernelDefaults / sparkVersion := getSparkVersion()
goldenTables / sparkVersion := getSparkVersion()

// Dependent library versions
val defaultSparkVersion = LATEST_RELEASED_SPARK_VERSION
Expand Down Expand Up @@ -126,6 +128,25 @@ lazy val commonSettings = Seq(
unidocSourceFilePatterns := Nil,
)

/**
* Java-/Scala-/Uni-Doc settings aren't working yet against Spark Master.
1) delta-spark on Spark Master uses JDK 17. delta-iceberg uses JDK 8 or 11. For some reason,
generating delta-spark unidoc compiles delta-iceberg
2) delta-spark unidoc fails to compile. spark 3.5 is on its classpath. likely due to iceberg
issue above.
*/
def crossSparkProjectSettings(): Seq[Setting[_]] = getSparkVersion() match {
case LATEST_RELEASED_SPARK_VERSION => Seq(
// Java-/Scala-/Uni-Doc Settings
scalacOptions ++= Seq(
"-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc
),
unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/"))
)

case SPARK_MASTER_VERSION => Seq()
}

/**
* Note: we cannot access sparkVersion.value here, since that can only be used within a task or
* setting macro.
Expand All @@ -138,13 +159,8 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
// For adding staged Spark RC versions, e.g.:
// resolvers += "Apache Spark 3.5.0 (RC1) Staging" at "https://repository.apache.org/content/repositories/orgapachespark-1444/",
Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-3.5",
Test / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "test" / "scala-spark-3.5",
Antlr4 / antlr4Version := "4.9.3",

// Java-/Scala-/Uni-Doc Settings
scalacOptions ++= Seq(
"-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc
),
unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/"))
)

case SPARK_MASTER_VERSION => Seq(
Expand All @@ -153,6 +169,7 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
targetJvm := "17",
resolvers += "Spark master staging" at "https://repository.apache.org/content/groups/snapshots/",
Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-master",
Test / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "test" / "scala-spark-master",
Antlr4 / antlr4Version := "4.13.1",
Test / javaOptions ++= Seq(
// Copied from SparkBuild.scala to support Java 17 for unit tests (see apache/spark#34153)
Expand All @@ -168,13 +185,6 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED"
)

// Java-/Scala-/Uni-Doc Settings
// This isn't working yet against Spark Master.
// 1) delta-spark on Spark Master uses JDK 17. delta-iceberg uses JDK 8 or 11. For some reason,
// generating delta-spark unidoc compiles delta-iceberg
// 2) delta-spark unidoc fails to compile. spark 3.5 is on its classpath. likely due to iceberg
// issue above.
)
}

Expand All @@ -188,6 +198,7 @@ lazy val spark = (project in file("spark"))
sparkMimaSettings,
releaseSettings,
crossSparkSettings(),
crossSparkProjectSettings(),
libraryDependencies ++= Seq(
// Adding test classifier seems to break transitive resolution of the core dependencies
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided",
Expand Down Expand Up @@ -355,8 +366,11 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
scalaStyleSettings,
javaOnlyReleaseSettings,
Test / javaOptions ++= Seq("-ea"),
crossSparkSettings(),
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client-runtime" % hadoopVersion,
// can we cross compile spark-variant?
// "org.apache.spark" %% "spark-variant" % SPARK_MASTER_VERSION % "provided",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5",
"org.apache.parquet" % "parquet-hadoop" % "1.12.3",

Expand All @@ -371,10 +385,10 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
"org.openjdk.jmh" % "jmh-core" % "1.37" % "test",
"org.openjdk.jmh" % "jmh-generator-annprocess" % "1.37" % "test",

"org.apache.spark" %% "spark-hive" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
),
javaCheckstyleSettings("kernel/dev/checkstyle.xml"),
// Unidoc settings
Expand Down Expand Up @@ -1069,14 +1083,15 @@ lazy val goldenTables = (project in file("connectors/golden-tables"))
name := "golden-tables",
commonSettings,
skipReleaseSettings,
crossSparkSettings(),
libraryDependencies ++= Seq(
// Test Dependencies
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"commons-io" % "commons-io" % "2.8.0" % "test",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test",
"org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests"
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests"
)
)

Expand Down
39 changes: 39 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package io.delta.kernel;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.client.TableClient;
Expand All @@ -27,6 +29,7 @@
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;

import io.delta.kernel.internal.ExtractedVariantOptions;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
import io.delta.kernel.internal.data.ScanStateRow;
Expand All @@ -36,6 +39,7 @@
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.PartitionUtils;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.internal.util.VariantUtils;

/**
* Represents a scan of a Delta table.
Expand Down Expand Up @@ -134,6 +138,7 @@ static CloseableIterator<FilteredColumnarBatch> transformPhysicalData(
StructType physicalReadSchema = null;
StructType logicalReadSchema = null;
String tablePath = null;
List<ExtractedVariantOptions> extractedVariantOptions = null;

RoaringBitmapArray currBitmap = null;
DeletionVectorDescriptor currDV = null;
Expand All @@ -144,6 +149,7 @@ private void initIfRequired() {
}
physicalReadSchema = ScanStateRow.getPhysicalSchema(tableClient, scanState);
logicalReadSchema = ScanStateRow.getLogicalSchema(tableClient, scanState);
extractedVariantOptions = ScanStateRow.getExtractedVariantFields(scanState);

tablePath = ScanStateRow.getTableRoot(scanState);
inited = true;
Expand Down Expand Up @@ -203,6 +209,19 @@ public FilteredColumnarBatch next() {
physicalReadSchema
);

// Add extracted variant columns.=
if (extractedVariantOptions.size() > 0) {
nextDataBatch = VariantUtils.withExtractedVariantFields(
tableClient.getExpressionHandler(),
nextDataBatch,
extractedVariantOptions
);
}

// TODO: Implement default columnarBatch slice methods to make this more efficient.
// Remove added variant columns required for scan.
nextDataBatch = removeInternallyAddedVariantCols(nextDataBatch, physicalReadSchema);

// Change back to logical schema
String columnMappingMode = ScanStateRow.getColumnMappingMode(scanState);
switch (columnMappingMode) {
Expand All @@ -219,6 +238,26 @@ public FilteredColumnarBatch next() {

return new FilteredColumnarBatch(nextDataBatch, selectionVector);
}

private ColumnarBatch removeInternallyAddedVariantCols(
ColumnarBatch batch,
StructType schema) {
int numToRemove = (int) schema.fields().stream()
.filter(field -> field.isInternallyAddedVariant())
.count();

// There is no guarantee that `ColumnarBatch.withDeletedColumnAt` doesn't reorder
// the schema so the added variant columns must be removed one by one.
for (int i = 0; i < numToRemove; i++) {
Optional<Integer> idxToRemove = IntStream.range(0, schema.length())
.filter(idx -> schema.at(idx).isInternallyAddedVariant())
.boxed()
.findFirst();

batch = batch.withDeletedColumnAt(idxToRemove.get().intValue());
}
return batch;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructType;

/**
Expand Down Expand Up @@ -49,6 +50,12 @@ public interface ScanBuilder {
*/
ScanBuilder withReadSchema(TableClient tableClient, StructType readSchema);

ScanBuilder withExtractedVariantField(
TableClient tableClient,
String path,
DataType type,
String extractedFieldName);

/**
* @return Build the {@link Scan instance}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ default ArrayValue getArray(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Return the variant value located at {@code rowId}. Returns null if the slot for {@code rowId}
* is null
*/
default VariantValue getVariant(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Get the child vector associated with the given ordinal. This method is applicable only to the
* {@code struct} type columns.
Expand Down
6 changes: 6 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/data/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,10 @@ public interface Row {
* Throws error if the column at given ordinal is not of map type,
*/
MapValue getMap(int ordinal);

/**
* Return variant value of the column located at the given ordinal.
* Throws error if the column at given ordinal is not of variant type.
*/
VariantValue getVariant(int ordinal);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.data;

/**
* Abstraction to represent a single Variant value in a {@link ColumnVector}.
*/
public interface VariantValue {
byte[] getValue();

byte[] getMetadata();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.internal;

import io.delta.kernel.expressions.Column;
import io.delta.kernel.types.DataType;

public class ExtractedVariantOptions {
public Column path;
public String fieldName;
public DataType type;

public ExtractedVariantOptions(Column path, DataType type, String fieldName) {
this.path = path;
this.fieldName = fieldName;
this.type = type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

package io.delta.kernel.internal;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import io.delta.kernel.Scan;
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;
import io.delta.kernel.types.*;

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
Expand All @@ -44,6 +47,7 @@ public class ScanBuilderImpl

private StructType readSchema;
private Optional<Predicate> predicate;
private List<ExtractedVariantOptions> extractedVariantFields;

public ScanBuilderImpl(
Path dataPath,
Expand All @@ -60,6 +64,7 @@ public ScanBuilderImpl(
this.tableClient = tableClient;
this.readSchema = snapshotSchema;
this.predicate = Optional.empty();
this.extractedVariantFields = new ArrayList();
}

@Override
Expand All @@ -78,6 +83,25 @@ public ScanBuilder withReadSchema(TableClient tableClient, StructType readSchema
return this;
}

@Override
public ScanBuilder withExtractedVariantField(
TableClient tableClient,
String path,
DataType type,
String extractedFieldName) {
String[] splitPath = splitVariantPath(path);
extractedVariantFields.add(new ExtractedVariantOptions(
new Column(splitPath), type, extractedFieldName));

// TODO: were attaching the actual variant column name right now.
// Will this work with column mapping/is there a more robust way?
if (readSchema.indexOf(splitPath[0]) == -1) {
readSchema = readSchema.add(StructField.internallyAddedVariantSchema(splitPath[0]));
}

return this;
}

@Override
public Scan build() {
return new ScanImpl(
Expand All @@ -87,6 +111,12 @@ public Scan build() {
metadata,
logReplay,
predicate,
extractedVariantFields,
dataPath);
}

private String[] splitVariantPath(String path) {
// TODO: account for square brackets and array indices later.
return path.split("\\.");
}
}
Loading