Skip to content

Commit c06180d

Browse files
committed
remove write changes and add toKernelType to read variants with differnt physical storage representations
1 parent 8071cb1 commit c06180d

File tree

2 files changed

+93
-12
lines changed

2 files changed

+93
-12
lines changed

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnReaders.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,13 @@ public static Converter createConverter(
8383
} else if (typeFromClient instanceof TimestampNTZType) {
8484
return createTimestampNtzConverter(initialBatchSize, typeFromFile);
8585
} else if (typeFromClient instanceof VariantType) {
86+
// TODO(r.chen): Is converting the typeFromFile to the readSchea ok?
87+
// We lose the field metadata from the client.
8688
return new RowColumnReader(
8789
initialBatchSize,
88-
new StructType()
89-
.add("value", BinaryType.BINARY, false)
90-
.add("metadata", BinaryType.BINARY, false),
90+
// The physical schema representing variants can be different per file so we must
91+
// infer the read schema from the type from file.
92+
(StructType) ParquetSchemaUtils.toKernelType(typeFromFile),
9193
(GroupType) typeFromFile);
9294
}
9395

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetSchemaUtils.java

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static java.lang.String.format;
2121

2222
import org.apache.parquet.schema.*;
23+
import org.apache.parquet.schema.LogicalTypeAnnotation.*;
2324
import org.apache.parquet.schema.Type.Repetition;
2425
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
2526
import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
@@ -155,6 +156,93 @@ public static MessageType toParquetSchema(StructType structType) {
155156
return new MessageType("Default Kernel Schema", types);
156157
}
157158

159+
/**
160+
* Convert the given Parquet data type to a Kernel data type.
161+
*
162+
* TODO(r.chen): Test this function.
163+
*
164+
* @param type Parquet type object
165+
* @return {@link DataType} representing the Parquet type in Kernel.
166+
*/
167+
public static DataType toKernelType(Type type) {
168+
if (type.isPrimitive()) {
169+
PrimitiveType pt = type.asPrimitiveType();
170+
171+
if (pt.getOriginalType() == OriginalType.DECIMAL) {
172+
DecimalLogicalTypeAnnotation dlta =
173+
(DecimalLogicalTypeAnnotation) pt.getLogicalTypeAnnotation();
174+
return new DecimalType(dlta.getPrecision(), dlta.getScale());
175+
} else if (pt.getPrimitiveTypeName() == BOOLEAN) {
176+
return BooleanType.BOOLEAN;
177+
} else if (pt.getPrimitiveTypeName() == INT32) {
178+
if (pt.getOriginalType() == OriginalType.INT_8) {
179+
return ByteType.BYTE;
180+
} else if (pt.getOriginalType() == OriginalType.INT_16) {
181+
return ShortType.SHORT;
182+
} else if (pt.getLogicalTypeAnnotation() == LogicalTypeAnnotation.dateType()) {
183+
return DateType.DATE;
184+
}
185+
return IntegerType.INTEGER;
186+
} else if (pt.getPrimitiveTypeName() == INT64) {
187+
if (pt.getOriginalType() == OriginalType.TIMESTAMP_MICROS) {
188+
TimestampLogicalTypeAnnotation tlta =
189+
(TimestampLogicalTypeAnnotation) pt.getLogicalTypeAnnotation();
190+
return tlta.isAdjustedToUTC() ?
191+
TimestampType.TIMESTAMP : TimestampNTZType.TIMESTAMP_NTZ;
192+
}
193+
return LongType.LONG;
194+
} else if (pt.getPrimitiveTypeName() == FLOAT) {
195+
return FloatType.FLOAT;
196+
} else if (pt.getPrimitiveTypeName() == DOUBLE) {
197+
return DoubleType.DOUBLE;
198+
} else if (pt.getPrimitiveTypeName() == BINARY) {
199+
if (pt.getLogicalTypeAnnotation() == LogicalTypeAnnotation.stringType()) {
200+
return StringType.STRING;
201+
} else {
202+
return BinaryType.BINARY;
203+
}
204+
} else {
205+
throw new UnsupportedOperationException(
206+
"Converting the given Parquet data type to Kernel is not supported: " + type);
207+
}
208+
} else {
209+
if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.listType()) {
210+
GroupType gt = (GroupType) type;
211+
Type childType = gt.getType(0);
212+
return new ArrayType(
213+
toKernelType(childType), childType.getRepetition() == OPTIONAL);
214+
} else if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.mapType()) {
215+
GroupType gt = (GroupType) type;
216+
Type keyType = gt.getType(0);
217+
Type valueType = gt.getType(1);
218+
return new MapType(
219+
toKernelType(keyType),
220+
toKernelType(valueType),
221+
valueType.getRepetition() == OPTIONAL
222+
);
223+
} else {
224+
List<StructField> kernelFields = new ArrayList<>();
225+
GroupType gt = (GroupType) type;
226+
for (Type parquetType : gt.getFields()) {
227+
FieldMetadata.Builder metadataBuilder = FieldMetadata.builder();
228+
if (type.getId() != null) {
229+
metadataBuilder.putLong(
230+
ColumnMapping.PARQUET_FIELD_ID_KEY,
231+
(long) (type.getId().intValue())
232+
);
233+
}
234+
kernelFields.add(new StructField(
235+
parquetType.getName(),
236+
toKernelType(parquetType),
237+
parquetType.getRepetition() == OPTIONAL,
238+
metadataBuilder.build()
239+
));
240+
}
241+
return new StructType(kernelFields);
242+
}
243+
}
244+
}
245+
158246
private static List<Type> pruneFields(
159247
GroupType type, StructType deltaDataType, boolean hasFieldIds) {
160248
// prune fields including nested pruning like in pruneSchema
@@ -248,8 +336,6 @@ private static Type toParquetType(
248336
type = toParquetMapType((MapType) dataType, name, repetition);
249337
} else if (dataType instanceof StructType) {
250338
type = toParquetStructType((StructType) dataType, name, repetition);
251-
} else if (dataType instanceof VariantType) {
252-
type = toParquetVariantType(name, repetition);
253339
} else {
254340
throw new UnsupportedOperationException(
255341
"Writing given type data to Parquet is not supported: " + dataType);
@@ -311,13 +397,6 @@ private static Type toParquetStructType(StructType structType, String name,
311397
return new GroupType(repetition, name, fields);
312398
}
313399

314-
private static Type toParquetVariantType(String name, Repetition repetition) {
315-
return Types.buildGroup(repetition)
316-
.addField(toParquetType(BinaryType.BINARY, "value", REQUIRED, Optional.empty()))
317-
.addField(toParquetType(BinaryType.BINARY, "metadata", REQUIRED, Optional.empty()))
318-
.named(name);
319-
}
320-
321400
/**
322401
* Recursively checks whether the given data type has any Parquet field ids in it.
323402
*/

0 commit comments

Comments
 (0)