Skip to content

Commit 2c6db0c

Browse files
committed
cleanup
1 parent d2bdf92 commit 2c6db0c

File tree

7 files changed

+57
-12
lines changed

7 files changed

+57
-12
lines changed

build.sbt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ lazy val commonSettings = Seq(
153153
2) delta-spark unidoc fails to compile. spark 3.5 is on its classpath. likely due to iceberg
154154
issue above.
155155
*/
156-
def crossSparkProjectSettings(): Seq[Setting[_]] = getSparkVersion() match {
156+
def crossSparkUniDocSettings(): Seq[Setting[_]] = getSparkVersion() match {
157157
case LATEST_RELEASED_SPARK_VERSION => Seq(
158158
// Java-/Scala-/Uni-Doc Settings
159159
scalacOptions ++= Seq(
@@ -248,6 +248,7 @@ lazy val connectCommon = (project in file("spark-connect/common"))
248248
name := "delta-connect-common",
249249
commonSettings,
250250
crossSparkSettings(),
251+
crossSparkUniDocSettings(),
251252
releaseSettings,
252253
Compile / compile := runTaskOnlyOnSparkMaster(
253254
task = Compile / compile,
@@ -307,6 +308,7 @@ lazy val connectServer = (project in file("spark-connect/server"))
307308
emptyValue = ()
308309
).value,
309310
crossSparkSettings(),
311+
crossSparkUniDocSettings(),
310312
libraryDependencies ++= Seq(
311313
"com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf",
312314

@@ -334,7 +336,7 @@ lazy val spark = (project in file("spark"))
334336
sparkMimaSettings,
335337
releaseSettings,
336338
crossSparkSettings(),
337-
crossDeltaSparkProjectSettings(),
339+
crossSparkUniDocSettings(),
338340
libraryDependencies ++= Seq(
339341
// Adding test classifier seems to break transitive resolution of the core dependencies
340342
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided",

kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,10 +205,12 @@ public FilteredColumnarBatch next() {
205205

206206
// Transform physical variant columns (struct of binaries) into logical variant
207207
// columns.
208-
nextDataBatch = VariantUtils.withVariantColumns(
209-
engine.getExpressionHandler(),
210-
nextDataBatch
211-
);
208+
if (ScanStateRow.getVariantFeatureEnabled(scanState)) {
209+
nextDataBatch = VariantUtils.withVariantColumns(
210+
engine.getExpressionHandler(),
211+
nextDataBatch
212+
);
213+
}
212214

213215
// Add partition columns
214216
nextDataBatch =

kernel/kernel-api/src/main/java/io/delta/kernel/data/ColumnarBatch.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,22 @@ default ColumnarBatch slice(int start, int end) {
104104
throw new UnsupportedOperationException("Not yet implemented!");
105105
}
106106

107+
/**
108+
* Return a copy of this {@link ColumnarBatch} with the column at given {@code ordinal}
109+
* replaced with {@code newVector} and the schema field at given {@code ordinal} replaced
110+
* with {@code newColumnSchema}.
111+
*
112+
* @param ordinal Ordinal of the column vector to replace.
113+
* @param newColumnSchema The schema field of the new column.
114+
* @param newVector New column vector that will replace the column vector at the given
115+
* {@code ordinal}.
116+
* @return {@link ColumnarBatch} with a new column vector at the given ordinal.
117+
*/
118+
default ColumnarBatch withReplacedColumnVector(int ordinal, StructField newColumnSchema,
119+
ColumnVector newVector) {
120+
throw new UnsupportedOperationException("Not yet implemented!");
121+
}
122+
107123
/**
108124
* @return iterator of {@link Row}s in this batch
109125
*/

kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/ScanStateRow.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class ScanStateRow extends GenericRow {
4242
.add("partitionColumns", new ArrayType(StringType.STRING, false))
4343
.add("minReaderVersion", IntegerType.INTEGER)
4444
.add("minWriterVersion", IntegerType.INTEGER)
45+
.add("variantFeatureEnabled", BooleanType.BOOLEAN)
4546
.add("tablePath", StringType.STRING);
4647

4748
private static final Map<String, Integer> COL_NAME_TO_ORDINAL =
@@ -65,6 +66,10 @@ public static ScanStateRow of(
6566
valueMap.put(COL_NAME_TO_ORDINAL.get("partitionColumns"), metadata.getPartitionColumns());
6667
valueMap.put(COL_NAME_TO_ORDINAL.get("minReaderVersion"), protocol.getMinReaderVersion());
6768
valueMap.put(COL_NAME_TO_ORDINAL.get("minWriterVersion"), protocol.getMinWriterVersion());
69+
valueMap.put(
70+
COL_NAME_TO_ORDINAL.get("variantFeatureEnabled"),
71+
protocol.getReaderFeatures().contains("variantType-preview")
72+
);
6873
valueMap.put(COL_NAME_TO_ORDINAL.get("tablePath"), tablePath);
6974
return new ScanStateRow(valueMap);
7075
}
@@ -156,4 +161,15 @@ private static StructType parseSchema(Engine engine, String serializedSchema) {
156161
serializedSchema
157162
);
158163
}
164+
165+
/**
166+
* Get whether the "variantType" table feature is enabled from scan state {@link Row} returned
167+
* by {@link Scan#getScanState(Engine)}
168+
*
169+
* @param scanState Scan state {@link Row}
170+
* @return Boolean indicating whether "variantType" is enabled.
171+
*/
172+
public static Boolean getVariantFeatureEnabled(Row scanState) {
173+
return scanState.getBoolean(COL_NAME_TO_ORDINAL.get("variantFeatureEnabled"));
174+
}
159175
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/VariantUtils.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public static ColumnarBatch withVariantColumns(
4040

4141
ExpressionEvaluator evaluator = expressionHandler.getEvaluator(
4242
// Field here is variant type if its actually a variant.
43-
// TODO: probably better to pass in the schema as an argument
43+
// TODO: probably better to pass in the schema as an expression argument
4444
// so the schema is enforced at the expression level. Need to pass in a literal
4545
// schema
4646
new StructType().add(field),
@@ -51,11 +51,8 @@ public static ColumnarBatch withVariantColumns(
5151
VariantType.VARIANT
5252
);
5353

54-
// TODO: don't need to pass in the entire batch.
5554
ColumnVector variantCol = evaluator.eval(dataBatch);
56-
// TODO: make a more efficient way to do this.
57-
dataBatch =
58-
dataBatch.withDeletedColumnAt(i).withNewColumn(i, field, variantCol);
55+
dataBatch = dataBatch.withReplacedColumnVector(i, field, variantCol);
5956
}
6057
return dataBatch;
6158
}

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/data/DefaultColumnarBatch.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,19 @@ public ColumnarBatch withNewSchema(StructType newSchema) {
109109
size, newSchema, columnVectors.toArray(new ColumnVector[0]));
110110
}
111111

112+
@Override
113+
public ColumnarBatch withReplacedColumnVector(int ordinal, StructField newColumnSchema,
114+
ColumnVector newVector) {
115+
ArrayList<StructField> newStructFields = new ArrayList<>(schema.fields());
116+
newStructFields.set(ordinal, newColumnSchema);
117+
StructType newSchema = new StructType(newStructFields);
118+
119+
ArrayList<ColumnVector> newColumnVectors = new ArrayList<>(columnVectors);
120+
newColumnVectors.set(ordinal, newVector);
121+
return new DefaultColumnarBatch(
122+
size, newSchema, newColumnVectors.toArray(new ColumnVector[0]));
123+
}
124+
112125
@Override
113126
public int getSize() {
114127
return size;

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package io.delta.kernel.defaults.internal.parquet
1717

1818
import java.math.BigDecimal
19-
2019
import io.delta.golden.GoldenTableUtils.goldenTableFile
2120
import io.delta.kernel.defaults.utils.{ExpressionTestUtils, TestRow}
2221
import io.delta.kernel.test.VectorTestUtils

0 commit comments

Comments
 (0)