Skip to content

Commit 17ba8c3

Browse files
committed
add tests
1 parent 651b6e5 commit 17ba8c3

File tree

2 files changed

+59
-22
lines changed

2 files changed

+59
-22
lines changed

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnWriters.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
3333

3434
import io.delta.kernel.defaults.internal.DefaultKernelUtils;
35-
import io.delta.kernel.defaults.internal.data.vector.DefaultVariantVector;
3635
import static io.delta.kernel.defaults.internal.parquet.ParquetSchemaUtils.MAX_BYTES_PER_PRECISION;
3736

3837
/**
@@ -487,12 +486,12 @@ static class VariantWriter extends ColumnWriter {
487486
valueWriter = new BinaryWriter(
488487
"value",
489488
0,
490-
((DefaultVariantVector) variantColumnVector).getValueVector()
489+
variantColumnVector.getChild(0)
491490
);
492491
metadataWriter = new BinaryWriter(
493492
"metadata",
494493
1,
495-
((DefaultVariantVector) variantColumnVector).getMetadataVector()
494+
variantColumnVector.getChild(1)
496495
);
497496
}
498497

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileWriterSuite.scala

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ package io.delta.kernel.defaults.internal.parquet
1717

1818
import java.lang.{Double => DoubleJ, Float => FloatJ}
1919

20+
import org.apache.spark.sql.DataFrame
21+
2022
import io.delta.golden.GoldenTableUtils.{goldenTableFile, goldenTablePath}
2123
import io.delta.kernel.data.{ColumnarBatch, FilteredColumnarBatch}
2224
import io.delta.kernel.defaults.internal.DefaultKernelUtils
@@ -189,28 +191,64 @@ class ParquetFileWriterSuite extends AnyFunSuite
189191
}
190192
}
191193

192-
test("write variant") {
193-
withTempDir { tempPath =>
194-
val variantFilePath = goldenTableFile("variantbasic").getAbsolutePath
195-
val schema = tableSchema(variantFilePath)
196-
197-
val physicalSchema = if (hasColumnMappingId(variantFilePath)) {
198-
convertToPhysicalSchema(schema, schema, ColumnMapping.COLUMN_MAPPING_MODE_ID)
199-
} else {
200-
schema
194+
def testWrite(testName: String)(df: => DataFrame): Unit = {
195+
test(testName) {
196+
withTable("test_table") {
197+
withTempDir { writeDir =>
198+
df.write
199+
.format("delta")
200+
.mode("overwrite")
201+
.saveAsTable("test_table")
202+
val filePath = spark.sql("describe table extended `test_table`")
203+
.where("col_name = 'Location'")
204+
.collect()(0)
205+
.getString(1)
206+
.replace("file:", "")
207+
208+
val schema = tableSchema(filePath)
209+
210+
val physicalSchema = if (hasColumnMappingId(filePath)) {
211+
convertToPhysicalSchema(schema, schema, ColumnMapping.COLUMN_MAPPING_MODE_ID)
212+
} else {
213+
schema
214+
}
215+
val readData = readParquetUsingKernelAsColumnarBatches(filePath, physicalSchema)
216+
.map(_.toFiltered(Option.empty[Predicate]))
217+
val writePath = writeDir.getAbsolutePath
218+
val writeOutput = writeToParquetUsingKernel(readData, writePath)
219+
// TODO(richardc-db): Test without read from Spark because Spark's Parquet reader relies
220+
// on spark-produced footer metadata to properly read variant types.
221+
verifyContentUsingKernelReader(writePath, readData)
222+
}
201223
}
202-
val readData = readParquetUsingKernelAsColumnarBatches(variantFilePath, physicalSchema)
203-
// Convert the schema of the data to the physical schema with field ids
204-
.map(_.withNewSchema(physicalSchema))
205-
// convert the data to filtered columnar batches
206-
.map(_.toFiltered(Option.empty[Predicate]))
224+
}
225+
}
207226

208-
val writeOutput = writeToParquetUsingKernel(readData, tempPath.getAbsolutePath)
227+
testWrite("basic write variant") {
228+
spark.range(0, 10, 1, 1).selectExpr(
229+
"parse_json(cast(id as string)) as basic_v",
230+
"named_struct('v', parse_json(cast(id as string))) as struct_v",
231+
"""array(
232+
parse_json(cast(id as string)),
233+
parse_json(cast(id as string)),
234+
parse_json(cast(id as string))
235+
) as array_v""",
236+
"map('test', parse_json(cast(id as string))) as map_value_v",
237+
"map(parse_json(cast(id as string)), parse_json(cast(id as string))) as map_key_v"
238+
)
239+
}
209240

210-
val readWrittenData =
211-
readParquetUsingKernelAsColumnarBatches(tempPath.getAbsolutePath, physicalSchema)
212-
// TODO(r.chen): Finish this and make assertions.
213-
}
241+
testWrite("basic write null variant") {
242+
spark.range(0, 10, 1, 1).selectExpr(
243+
"cast(null as variant) basic_v",
244+
"named_struct('v', cast(null as variant)) as struct_v",
245+
"""array(
246+
parse_json(cast(id as string)),
247+
parse_json(cast(id as string)),
248+
null
249+
) as array_v""",
250+
"map('test', cast(null as variant)) as map_value_v"
251+
)
214252
}
215253

216254
test("columnar batches containing different schema") {

0 commit comments

Comments
 (0)