Skip to content

Commit 783de47

Browse files
committed
cleanup
1 parent da8ebc7 commit 783de47

File tree

7 files changed

+57
-12
lines changed

7 files changed

+57
-12
lines changed

build.sbt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ lazy val commonSettings = Seq(
153153
2) delta-spark unidoc fails to compile. spark 3.5 is on its classpath. likely due to iceberg
154154
issue above.
155155
*/
156-
def crossSparkProjectSettings(): Seq[Setting[_]] = getSparkVersion() match {
156+
def crossSparkUniDocSettings(): Seq[Setting[_]] = getSparkVersion() match {
157157
case LATEST_RELEASED_SPARK_VERSION => Seq(
158158
// Java-/Scala-/Uni-Doc Settings
159159
scalacOptions ++= Seq(
@@ -248,6 +248,7 @@ lazy val connectCommon = (project in file("spark-connect/common"))
248248
name := "delta-connect-common",
249249
commonSettings,
250250
crossSparkSettings(),
251+
crossSparkUniDocSettings(),
251252
releaseSettings,
252253
Compile / compile := runTaskOnlyOnSparkMaster(
253254
task = Compile / compile,
@@ -307,6 +308,7 @@ lazy val connectServer = (project in file("spark-connect/server"))
307308
emptyValue = ()
308309
).value,
309310
crossSparkSettings(),
311+
crossSparkUniDocSettings(),
310312
libraryDependencies ++= Seq(
311313
"com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf",
312314

@@ -334,7 +336,7 @@ lazy val spark = (project in file("spark"))
334336
sparkMimaSettings,
335337
releaseSettings,
336338
crossSparkSettings(),
337-
crossDeltaSparkProjectSettings(),
339+
crossSparkUniDocSettings(),
338340
libraryDependencies ++= Seq(
339341
// Adding test classifier seems to break transitive resolution of the core dependencies
340342
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided",

kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,12 @@ public FilteredColumnarBatch next() {
199199

200200
// Transform physical variant columns (struct of binaries) into logical variant
201201
// columns.
202-
nextDataBatch = VariantUtils.withVariantColumns(
203-
engine.getExpressionHandler(),
204-
nextDataBatch
205-
);
202+
if (ScanStateRow.getVariantFeatureEnabled(scanState)) {
203+
nextDataBatch = VariantUtils.withVariantColumns(
204+
engine.getExpressionHandler(),
205+
nextDataBatch
206+
);
207+
}
206208

207209
// Add partition columns
208210
nextDataBatch =

kernel/kernel-api/src/main/java/io/delta/kernel/data/ColumnarBatch.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,22 @@ default ColumnarBatch slice(int start, int end) {
104104
throw new UnsupportedOperationException("Not yet implemented!");
105105
}
106106

107+
/**
108+
* Return a copy of this {@link ColumnarBatch} with the column at given {@code ordinal}
109+
* replaced with {@code newVector} and the schema field at given {@code ordinal} replaced
110+
* with {@code newColumnSchema}.
111+
*
112+
* @param ordinal Ordinal of the column vector to replace.
113+
* @param newColumnSchema The schema field of the new column.
114+
* @param newVector New column vector that will replace the column vector at the given
115+
* {@code ordinal}.
116+
* @return {@link ColumnarBatch} with a new column vector at the given ordinal.
117+
*/
118+
default ColumnarBatch withReplacedColumnVector(int ordinal, StructField newColumnSchema,
119+
ColumnVector newVector) {
120+
throw new UnsupportedOperationException("Not yet implemented!");
121+
}
122+
107123
/**
108124
* @return iterator of {@link Row}s in this batch
109125
*/

kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/ScanStateRow.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class ScanStateRow extends GenericRow {
4141
.add("partitionColumns", new ArrayType(StringType.STRING, false))
4242
.add("minReaderVersion", IntegerType.INTEGER)
4343
.add("minWriterVersion", IntegerType.INTEGER)
44+
.add("variantFeatureEnabled", BooleanType.BOOLEAN)
4445
.add("tablePath", StringType.STRING);
4546

4647
private static final Map<String, Integer> COL_NAME_TO_ORDINAL =
@@ -64,6 +65,10 @@ public static ScanStateRow of(
6465
valueMap.put(COL_NAME_TO_ORDINAL.get("partitionColumns"), metadata.getPartitionColumns());
6566
valueMap.put(COL_NAME_TO_ORDINAL.get("minReaderVersion"), protocol.getMinReaderVersion());
6667
valueMap.put(COL_NAME_TO_ORDINAL.get("minWriterVersion"), protocol.getMinWriterVersion());
68+
valueMap.put(
69+
COL_NAME_TO_ORDINAL.get("variantFeatureEnabled"),
70+
protocol.getReaderFeatures().contains("variantType-preview")
71+
);
6772
valueMap.put(COL_NAME_TO_ORDINAL.get("tablePath"), tablePath);
6873
return new ScanStateRow(valueMap);
6974
}
@@ -147,4 +152,15 @@ public static String getColumnMappingMode(Row scanState) {
147152
public static String getTableRoot(Row scanState) {
148153
return scanState.getString(COL_NAME_TO_ORDINAL.get("tablePath"));
149154
}
155+
156+
/**
157+
* Get whether the "variantType" table feature is enabled from scan state {@link Row} returned
158+
* by {@link Scan#getScanState(Engine)}
159+
*
160+
* @param scanState Scan state {@link Row}
161+
* @return Boolean indicating whether "variantType" is enabled.
162+
*/
163+
public static Boolean getVariantFeatureEnabled(Row scanState) {
164+
return scanState.getBoolean(COL_NAME_TO_ORDINAL.get("variantFeatureEnabled"));
165+
}
150166
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/VariantUtils.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public static ColumnarBatch withVariantColumns(
4040

4141
ExpressionEvaluator evaluator = expressionHandler.getEvaluator(
4242
// Field here is variant type if its actually a variant.
43-
// TODO: probably better to pass in the schema as an argument
43+
// TODO: probably better to pass in the schema as an expression argument
4444
// so the schema is enforced at the expression level. Need to pass in a literal
4545
// schema
4646
new StructType().add(field),
@@ -51,11 +51,8 @@ public static ColumnarBatch withVariantColumns(
5151
VariantType.VARIANT
5252
);
5353

54-
// TODO: don't need to pass in the entire batch.
5554
ColumnVector variantCol = evaluator.eval(dataBatch);
56-
// TODO: make a more efficient way to do this.
57-
dataBatch =
58-
dataBatch.withDeletedColumnAt(i).withNewColumn(i, field, variantCol);
55+
dataBatch = dataBatch.withReplacedColumnVector(i, field, variantCol);
5956
}
6057
return dataBatch;
6158
}

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/DefaultColumnarBatch.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,19 @@ public ColumnarBatch withNewSchema(StructType newSchema) {
109109
size, newSchema, columnVectors.toArray(new ColumnVector[0]));
110110
}
111111

112+
@Override
113+
public ColumnarBatch withReplacedColumnVector(int ordinal, StructField newColumnSchema,
114+
ColumnVector newVector) {
115+
ArrayList<StructField> newStructFields = new ArrayList<>(schema.fields());
116+
newStructFields.set(ordinal, newColumnSchema);
117+
StructType newSchema = new StructType(newStructFields);
118+
119+
ArrayList<ColumnVector> newColumnVectors = new ArrayList<>(columnVectors);
120+
newColumnVectors.set(ordinal, newVector);
121+
return new DefaultColumnarBatch(
122+
size, newSchema, newColumnVectors.toArray(new ColumnVector[0]));
123+
}
124+
112125
@Override
113126
public int getSize() {
114127
return size;

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package io.delta.kernel.defaults.internal.parquet
1717

1818
import java.math.BigDecimal
19-
2019
import io.delta.golden.GoldenTableUtils.goldenTableFile
2120
import io.delta.kernel.defaults.utils.{ExpressionTestUtils, TestRow}
2221
import io.delta.kernel.test.VectorTestUtils

0 commit comments

Comments
 (0)