@@ -17,6 +17,8 @@ package io.delta.kernel.defaults.internal.parquet
17
17
18
18
import java .lang .{Double => DoubleJ , Float => FloatJ }
19
19
20
+ import org .apache .spark .sql .DataFrame
21
+
20
22
import io .delta .golden .GoldenTableUtils .{goldenTableFile , goldenTablePath }
21
23
import io .delta .kernel .data .{ColumnarBatch , FilteredColumnarBatch }
22
24
import io .delta .kernel .defaults .internal .DefaultKernelUtils
@@ -189,28 +191,64 @@ class ParquetFileWriterSuite extends AnyFunSuite
189
191
}
190
192
}
191
193
192
- test(" write variant" ) {
193
- withTempDir { tempPath =>
194
- val variantFilePath = goldenTableFile(" variantbasic" ).getAbsolutePath
195
- val schema = tableSchema(variantFilePath)
196
-
197
- val physicalSchema = if (hasColumnMappingId(variantFilePath)) {
198
- convertToPhysicalSchema(schema, schema, ColumnMapping .COLUMN_MAPPING_MODE_ID )
199
- } else {
200
- schema
194
+ def testWrite (testName : String )(df : => DataFrame ): Unit = {
195
+ test(testName) {
196
+ withTable(" test_table" ) {
197
+ withTempDir { writeDir =>
198
+ df.write
199
+ .format(" delta" )
200
+ .mode(" overwrite" )
201
+ .saveAsTable(" test_table" )
202
+ val filePath = spark.sql(" describe table extended `test_table`" )
203
+ .where(" col_name = 'Location'" )
204
+ .collect()(0 )
205
+ .getString(1 )
206
+ .replace(" file:" , " " )
207
+
208
+ val schema = tableSchema(filePath)
209
+
210
+ val physicalSchema = if (hasColumnMappingId(filePath)) {
211
+ convertToPhysicalSchema(schema, schema, ColumnMapping .COLUMN_MAPPING_MODE_ID )
212
+ } else {
213
+ schema
214
+ }
215
+ val readData = readParquetUsingKernelAsColumnarBatches(filePath, physicalSchema)
216
+ .map(_.toFiltered(Option .empty[Predicate ]))
217
+ val writePath = writeDir.getAbsolutePath
218
+ val writeOutput = writeToParquetUsingKernel(readData, writePath)
219
+ // TODO(richardc-db): Test without read from Spark because Spark's Parquet reader relies
220
+ // on spark-produced footer metadata to properly read variant types.
221
+ verifyContentUsingKernelReader(writePath, readData)
222
+ }
201
223
}
202
- val readData = readParquetUsingKernelAsColumnarBatches(variantFilePath, physicalSchema)
203
- // Convert the schema of the data to the physical schema with field ids
204
- .map(_.withNewSchema(physicalSchema))
205
- // convert the data to filtered columnar batches
206
- .map(_.toFiltered(Option .empty[Predicate ]))
224
+ }
225
+ }
207
226
208
- val writeOutput = writeToParquetUsingKernel(readData, tempPath.getAbsolutePath)
227
+ testWrite(" basic write variant" ) {
228
+ spark.range(0 , 10 , 1 , 1 ).selectExpr(
229
+ " parse_json(cast(id as string)) as basic_v" ,
230
+ " named_struct('v', parse_json(cast(id as string))) as struct_v" ,
231
+ """ array(
232
+ parse_json(cast(id as string)),
233
+ parse_json(cast(id as string)),
234
+ parse_json(cast(id as string))
235
+ ) as array_v""" ,
236
+ " map('test', parse_json(cast(id as string))) as map_value_v" ,
237
+ " map(parse_json(cast(id as string)), parse_json(cast(id as string))) as map_key_v"
238
+ )
239
+ }
209
240
210
- val readWrittenData =
211
- readParquetUsingKernelAsColumnarBatches(tempPath.getAbsolutePath, physicalSchema)
212
- // TODO(r.chen): Finish this and make assertions.
213
- }
241
+ testWrite(" basic write null variant" ) {
242
+ spark.range(0 , 10 , 1 , 1 ).selectExpr(
243
+ " cast(null as variant) basic_v" ,
244
+ " named_struct('v', cast(null as variant)) as struct_v" ,
245
+ """ array(
246
+ parse_json(cast(id as string)),
247
+ parse_json(cast(id as string)),
248
+ null
249
+ ) as array_v""" ,
250
+ " map('test', cast(null as variant)) as map_value_v"
251
+ )
214
252
}
215
253
216
254
test(" columnar batches containing different schema" ) {
0 commit comments