Skip to content

Commit 6a879b5

Browse files
committed
remove write changes and add toKernelType to read variants with differnt physical storage representations
1 parent fe955fd commit 6a879b5

File tree

2 files changed

+93
-13
lines changed

2 files changed

+93
-13
lines changed

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnReaders.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,13 @@ public static Converter createConverter(
8383
} else if (typeFromClient instanceof TimestampNTZType) {
8484
return createTimestampNtzConverter(initialBatchSize, typeFromFile);
8585
} else if (typeFromClient instanceof VariantType) {
86+
// TODO(r.chen): Is converting the typeFromFile to the readSchea ok?
87+
// We lose the field metadata from the client.
8688
return new RowColumnReader(
8789
initialBatchSize,
88-
new StructType()
89-
.add("value", BinaryType.BINARY, false)
90-
.add("metadata", BinaryType.BINARY, false),
90+
// The physical schema representing variants can be different per file so we must
91+
// infer the read schema from the type from file.
92+
(StructType) ParquetSchemaUtils.toKernelType(typeFromFile),
9193
(GroupType) typeFromFile);
9294
}
9395

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetSchemaUtils.java

Lines changed: 88 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import static java.lang.String.format;
2121

2222
import org.apache.parquet.schema.*;
23-
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
23+
import org.apache.parquet.schema.LogicalTypeAnnotation.*;
2424
import org.apache.parquet.schema.Type.Repetition;
2525
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
2626
import static org.apache.parquet.schema.LogicalTypeAnnotation.decimalType;
@@ -157,6 +157,93 @@ public static MessageType toParquetSchema(StructType structType) {
157157
return new MessageType("Default Kernel Schema", types);
158158
}
159159

160+
/**
161+
* Convert the given Parquet data type to a Kernel data type.
162+
*
163+
* TODO(r.chen): Test this function.
164+
*
165+
* @param type Parquet type object
166+
* @return {@link DataType} representing the Parquet type in Kernel.
167+
*/
168+
public static DataType toKernelType(Type type) {
169+
if (type.isPrimitive()) {
170+
PrimitiveType pt = type.asPrimitiveType();
171+
172+
if (pt.getOriginalType() == OriginalType.DECIMAL) {
173+
DecimalLogicalTypeAnnotation dlta =
174+
(DecimalLogicalTypeAnnotation) pt.getLogicalTypeAnnotation();
175+
return new DecimalType(dlta.getPrecision(), dlta.getScale());
176+
} else if (pt.getPrimitiveTypeName() == BOOLEAN) {
177+
return BooleanType.BOOLEAN;
178+
} else if (pt.getPrimitiveTypeName() == INT32) {
179+
if (pt.getOriginalType() == OriginalType.INT_8) {
180+
return ByteType.BYTE;
181+
} else if (pt.getOriginalType() == OriginalType.INT_16) {
182+
return ShortType.SHORT;
183+
} else if (pt.getLogicalTypeAnnotation() == LogicalTypeAnnotation.dateType()) {
184+
return DateType.DATE;
185+
}
186+
return IntegerType.INTEGER;
187+
} else if (pt.getPrimitiveTypeName() == INT64) {
188+
if (pt.getOriginalType() == OriginalType.TIMESTAMP_MICROS) {
189+
TimestampLogicalTypeAnnotation tlta =
190+
(TimestampLogicalTypeAnnotation) pt.getLogicalTypeAnnotation();
191+
return tlta.isAdjustedToUTC() ?
192+
TimestampType.TIMESTAMP : TimestampNTZType.TIMESTAMP_NTZ;
193+
}
194+
return LongType.LONG;
195+
} else if (pt.getPrimitiveTypeName() == FLOAT) {
196+
return FloatType.FLOAT;
197+
} else if (pt.getPrimitiveTypeName() == DOUBLE) {
198+
return DoubleType.DOUBLE;
199+
} else if (pt.getPrimitiveTypeName() == BINARY) {
200+
if (pt.getLogicalTypeAnnotation() == LogicalTypeAnnotation.stringType()) {
201+
return StringType.STRING;
202+
} else {
203+
return BinaryType.BINARY;
204+
}
205+
} else {
206+
throw new UnsupportedOperationException(
207+
"Converting the given Parquet data type to Kernel is not supported: " + type);
208+
}
209+
} else {
210+
if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.listType()) {
211+
GroupType gt = (GroupType) type;
212+
Type childType = gt.getType(0);
213+
return new ArrayType(
214+
toKernelType(childType), childType.getRepetition() == OPTIONAL);
215+
} else if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.mapType()) {
216+
GroupType gt = (GroupType) type;
217+
Type keyType = gt.getType(0);
218+
Type valueType = gt.getType(1);
219+
return new MapType(
220+
toKernelType(keyType),
221+
toKernelType(valueType),
222+
valueType.getRepetition() == OPTIONAL
223+
);
224+
} else {
225+
List<StructField> kernelFields = new ArrayList<>();
226+
GroupType gt = (GroupType) type;
227+
for (Type parquetType : gt.getFields()) {
228+
FieldMetadata.Builder metadataBuilder = FieldMetadata.builder();
229+
if (type.getId() != null) {
230+
metadataBuilder.putLong(
231+
ColumnMapping.PARQUET_FIELD_ID_KEY,
232+
(long) (type.getId().intValue())
233+
);
234+
}
235+
kernelFields.add(new StructField(
236+
parquetType.getName(),
237+
toKernelType(parquetType),
238+
parquetType.getRepetition() == OPTIONAL,
239+
metadataBuilder.build()
240+
));
241+
}
242+
return new StructType(kernelFields);
243+
}
244+
}
245+
}
246+
160247
private static List<Type> pruneFields(
161248
GroupType type, StructType deltaDataType, boolean hasFieldIds) {
162249
// prune fields including nested pruning like in pruneSchema
@@ -253,8 +340,6 @@ private static Type toParquetType(
253340
type = toParquetMapType((MapType) dataType, name, repetition);
254341
} else if (dataType instanceof StructType) {
255342
type = toParquetStructType((StructType) dataType, name, repetition);
256-
} else if (dataType instanceof VariantType) {
257-
type = toParquetVariantType(name, repetition);
258343
} else {
259344
throw new UnsupportedOperationException(
260345
"Writing given type data to Parquet is not supported: " + dataType);
@@ -316,13 +401,6 @@ private static Type toParquetStructType(StructType structType, String name,
316401
return new GroupType(repetition, name, fields);
317402
}
318403

319-
private static Type toParquetVariantType(String name, Repetition repetition) {
320-
return Types.buildGroup(repetition)
321-
.addField(toParquetType(BinaryType.BINARY, "value", REQUIRED, Optional.empty()))
322-
.addField(toParquetType(BinaryType.BINARY, "metadata", REQUIRED, Optional.empty()))
323-
.named(name);
324-
}
325-
326404
/**
327405
* Recursively checks whether the given data type has any Parquet field ids in it.
328406
*/

0 commit comments

Comments
 (0)