Skip to content

Commit 3b017f5

Browse files
committed
init
1 parent 843995c commit 3b017f5

File tree

2 files changed

+54
-0
lines changed

2 files changed

+54
-0
lines changed

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnWriters.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
3333

3434
import io.delta.kernel.defaults.internal.DefaultKernelUtils;
35+
import io.delta.kernel.defaults.internal.data.vector.DefaultVariantVector;
3536
import static io.delta.kernel.defaults.internal.parquet.ParquetSchemaUtils.MAX_BYTES_PER_PRECISION;
3637

3738
/**
@@ -142,6 +143,8 @@ private static ColumnWriter createColumnWriter(
142143
return new MapWriter(colName, fieldIndex, columnVector);
143144
} else if (dataType instanceof StructType) {
144145
return new StructWriter(colName, fieldIndex, columnVector);
146+
} else if (dataType instanceof VariantType) {
147+
return new VariantWriter(colName, fieldIndex, columnVector);
145148
}
146149

147150
throw new IllegalArgumentException("Unsupported column vector type: " + dataType);
@@ -474,4 +477,31 @@ void writeNonNullRowValue(RecordConsumer recordConsumer, int rowId) {
474477
recordConsumer.endGroup();
475478
}
476479
}
480+
481+
static class VariantWriter extends ColumnWriter {
482+
private ColumnWriter valueWriter;
483+
private ColumnWriter metadataWriter;
484+
485+
VariantWriter(String name, int fieldId, ColumnVector variantColumnVector) {
486+
super(name, fieldId, variantColumnVector);
487+
valueWriter = new BinaryWriter(
488+
"value",
489+
0,
490+
((DefaultVariantVector) variantColumnVector).getValueVector()
491+
);
492+
metadataWriter = new BinaryWriter(
493+
"metadata",
494+
1,
495+
((DefaultVariantVector) variantColumnVector).getMetadataVector()
496+
);
497+
}
498+
499+
@Override
500+
void writeNonNullRowValue(RecordConsumer recordConsumer, int rowId) {
501+
recordConsumer.startGroup();
502+
valueWriter.writeRowValue(recordConsumer, rowId);
503+
metadataWriter.writeRowValue(recordConsumer, rowId);
504+
recordConsumer.endGroup();
505+
}
506+
}
477507
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileWriterSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,30 @@ class ParquetFileWriterSuite extends AnyFunSuite
189189
}
190190
}
191191

192+
test("write variant") {
193+
withTempDir { tempPath =>
194+
val variantFilePath = goldenTableFile("variantbasic").getAbsolutePath
195+
val schema = tableSchema(variantFilePath)
196+
197+
val physicalSchema = if (hasColumnMappingId(variantFilePath)) {
198+
convertToPhysicalSchema(schema, schema, ColumnMapping.COLUMN_MAPPING_MODE_ID)
199+
} else {
200+
schema
201+
}
202+
val readData = readParquetUsingKernelAsColumnarBatches(variantFilePath, physicalSchema)
203+
// Convert the schema of the data to the physical schema with field ids
204+
.map(_.withNewSchema(physicalSchema))
205+
// convert the data to filtered columnar batches
206+
.map(_.toFiltered(Option.empty[Predicate]))
207+
208+
val writeOutput = writeToParquetUsingKernel(readData, tempPath.getAbsolutePath)
209+
210+
val readWrittenData =
211+
readParquetUsingKernelAsColumnarBatches(tempPath.getAbsolutePath, physicalSchema)
212+
// TODO(r.chen): Finish this and make assertions.
213+
}
214+
}
215+
192216
test("columnar batches containing different schema") {
193217
withTempDir { tempPath =>
194218
val targetDir = tempPath.getAbsolutePath

0 commit comments

Comments
 (0)