Skip to content

Commit 2032fa3

Browse files
committed
use getschema function
1 parent 076517c commit 2032fa3

File tree

6 files changed

+142
-26
lines changed

6 files changed

+142
-26
lines changed

build.sbt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ spark / sparkVersion := getSparkVersion()
4848
connectCommon / sparkVersion := getSparkVersion()
4949
connectServer / sparkVersion := getSparkVersion()
5050
kernelDefaults / sparkVersion := getSparkVersion()
51+
storage / sparkVersion := getSparkVersion()
5152
goldenTables / sparkVersion := getSparkVersion()
5253

5354
// Dependent library versions
@@ -552,6 +553,8 @@ lazy val storage = (project in file("storage"))
552553
name := "delta-storage",
553554
commonSettings,
554555
javaOnlyReleaseSettings,
556+
crossSparkSettings(),
557+
555558
libraryDependencies ++= Seq(
556559
// User can provide any 2.x or 3.x version. We don't use any new fancy APIs. Watch out for
557560
// versions with known vulnerabilities.

kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@
2323
import io.delta.kernel.data.*;
2424
import io.delta.kernel.engine.Engine;
2525
import io.delta.kernel.expressions.Predicate;
26-
import io.delta.kernel.types.StructField;
27-
import io.delta.kernel.types.StructType;
26+
import io.delta.kernel.types.*;
2827
import io.delta.kernel.utils.CloseableIterator;
2928

3029
import io.delta.kernel.internal.InternalScanFileUtils;
@@ -115,6 +114,45 @@ public interface Scan {
115114
*/
116115
Row getScanState(Engine engine);
117116

117+
/**
118+
* Uses the physical parquet schema provided by the engine to construct the physical parquet
119+
* read schema.
120+
*
121+
* For example, a read shcmea like "struct<v: variant>" is transformed into
122+
* "struct<v: struct<value: binary, metadata:binary>>".
123+
*/
124+
static StructType getParquetPhysicalReadSchema(
125+
StructType parquetSchema,
126+
StructType readSchema) {
127+
// Uses the physical parquet schema to transform a read schema like "struct<v: variant>"
128+
// into "struct<v: struct<value: binary, metadata:binary>>".
129+
StructType parquetSchemaToRead = new StructType();
130+
for (StructField field : readSchema.fields()) {
131+
DataType physicalDataType;
132+
if (field.getDataType() instanceof VariantType ||
133+
field.getDataType() instanceof StructType ||
134+
field.getDataType() instanceof MapType ||
135+
field.getDataType() instanceof ArrayType) {
136+
// TODO(r.chen): Get rid of the O(n^2) lookup.
137+
StructField parquetField = parquetSchema.get(field.getName());
138+
physicalDataType =
139+
getParquetPhysicalType(parquetField.getDataType(), field.getDataType());
140+
} else {
141+
physicalDataType = field.getDataType();
142+
}
143+
144+
// TODO(r.chen): Should we add in metadata that the field "isVariant"?
145+
// What would this mean for array<array<variant>> for instance?
146+
parquetSchemaToRead = parquetSchemaToRead.add(new StructField(
147+
field.getName(),
148+
physicalDataType,
149+
field.isNullable(),
150+
field.getMetadata()
151+
));
152+
}
153+
return parquetSchemaToRead;
154+
}
155+
118156
/**
119157
* Transform the physical data read from the table data file into the logical data that expected
120158
* out of the Delta table.
@@ -208,7 +246,8 @@ public FilteredColumnarBatch next() {
208246
if (ScanStateRow.getVariantFeatureEnabled(scanState)) {
209247
nextDataBatch = VariantUtils.withVariantColumns(
210248
engine.getExpressionHandler(),
211-
nextDataBatch
249+
nextDataBatch,
250+
physicalReadSchema
212251
);
213252
}
214253

@@ -239,4 +278,49 @@ public FilteredColumnarBatch next() {
239278
}
240279
};
241280
}
281+
282+
/**
283+
* If necessary, converts the read schema data type to the parquet data type.
284+
* Currently only used to convert "variant" to "struct<value: binary, metadata: binary>".
285+
*/
286+
private static DataType getParquetPhysicalType(DataType parquetType, DataType schemaType) {
287+
if (schemaType instanceof VariantType) {
288+
return parquetType;
289+
} else if (schemaType instanceof StructType) {
290+
StructType readStructType = (StructType) schemaType;
291+
StructType parquetStructType = (StructType) parquetType;
292+
assert readStructType.length() == parquetStructType.length();
293+
294+
StructType res = new StructType();
295+
for (int i = 0; i < readStructType.length(); i++) {
296+
res = res.add(
297+
readStructType.at(i).getName(),
298+
getParquetPhysicalType(
299+
parquetStructType.at(i).getDataType(),
300+
readStructType.at(i).getDataType()),
301+
readStructType.at(i).isNullable(),
302+
readStructType.at(i).getMetadata()
303+
);
304+
}
305+
return res;
306+
} else if (schemaType instanceof MapType) {
307+
MapType readMapType = (MapType) schemaType;
308+
MapType parquetMapType = (MapType) parquetType;
309+
return new MapType(
310+
getParquetPhysicalType(parquetMapType.getKeyType(), readMapType.getKeyType()),
311+
getParquetPhysicalType(parquetMapType.getValueType(), readMapType.getValueType()),
312+
readMapType.isValueContainsNull());
313+
} else if (schemaType instanceof ArrayType) {
314+
ArrayType readArrayType = (ArrayType) schemaType;
315+
ArrayType parquetArrayType = (ArrayType) parquetType;
316+
return new ArrayType(
317+
getParquetPhysicalType(
318+
parquetArrayType.getElementType(),
319+
readArrayType.getElementType()),
320+
readArrayType.containsNull()
321+
);
322+
}
323+
324+
return schemaType;
325+
}
242326
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/VariantUtils.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,14 @@
2727
public class VariantUtils {
2828
public static ColumnarBatch withVariantColumns(
2929
ExpressionHandler expressionHandler,
30-
ColumnarBatch dataBatch) {
30+
ColumnarBatch dataBatch,
31+
StructType physicalReadSchema) {
3132
for (int i = 0; i < dataBatch.getSchema().length(); i++) {
32-
StructField field = dataBatch.getSchema().at(i);
33-
if (!(field.getDataType() instanceof StructType) &&
34-
!(field.getDataType() instanceof ArrayType) &&
35-
!(field.getDataType() instanceof MapType) &&
36-
(field.getDataType() != VariantType.VARIANT ||
33+
StructField kernelField = physicalReadSchema.at(i);
34+
if (!(kernelField.getDataType() instanceof StructType) &&
35+
!(kernelField.getDataType() instanceof ArrayType) &&
36+
!(kernelField.getDataType() instanceof MapType) &&
37+
(kernelField.getDataType() != VariantType.VARIANT ||
3738
dataBatch.getColumnVector(i).getDataType() == VariantType.VARIANT)) {
3839
continue;
3940
}
@@ -43,16 +44,16 @@ public static ColumnarBatch withVariantColumns(
4344
// TODO: probably better to pass in the schema as an expression argument
4445
// so the schema is enforced at the expression level. Need to pass in a literal
4546
// schema
46-
new StructType().add(field),
47+
new StructType().add(kernelField),
4748
new ScalarExpression(
4849
"variant_coalesce",
49-
Arrays.asList(new Column(field.getName()))
50+
Arrays.asList(new Column(kernelField.getName()))
5051
),
5152
VariantType.VARIANT
5253
);
5354

5455
ColumnVector variantCol = evaluator.eval(dataBatch);
55-
dataBatch = dataBatch.withReplacedColumnVector(i, field, variantCol);
56+
dataBatch = dataBatch.withReplacedColumnVector(i, kernelField, variantCol);
5657
}
5758
return dataBatch;
5859
}

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnReaders.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,6 @@ public static Converter createConverter(
8383
} else if (typeFromClient instanceof TimestampNTZType) {
8484
return createTimestampConverter(initialBatchSize, typeFromFile,
8585
TimestampNTZType.TIMESTAMP_NTZ);
86-
} else if (typeFromClient instanceof VariantType) {
87-
// TODO(r.chen): Is converting the typeFromFile to the readSchea ok?
88-
// We lose the field metadata from the client.
89-
return new RowColumnReader(
90-
initialBatchSize,
91-
// The physical schema representing variants can be different per file so we must
92-
// infer the read schema from the type from file.
93-
(StructType) ParquetSchemaUtils.toKernelType(typeFromFile),
94-
(GroupType) typeFromFile);
9586
}
9687

9788
throw new UnsupportedOperationException(typeFromClient + " is not supported");

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileReader.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.parquet.schema.MessageType;
3232
import static org.apache.parquet.hadoop.ParquetInputFormat.*;
3333

34+
import io.delta.kernel.Scan;
3435
import io.delta.kernel.data.ColumnarBatch;
3536
import io.delta.kernel.exceptions.KernelEngineException;
3637
import io.delta.kernel.expressions.Predicate;
@@ -181,7 +182,11 @@ public RecordMaterializer<Object> prepareForRead(
181182
Map<String, String> keyValueMetaData,
182183
MessageType fileSchema,
183184
ReadContext readContext) {
184-
rowRecordCollector = new RowRecordCollector(maxBatchSize, readSchema, fileSchema);
185+
StructType parquetSchemaToRead = Scan.getParquetPhysicalReadSchema(
186+
(StructType) ParquetSchemaUtils.toKernelType(fileSchema),
187+
readSchema);
188+
rowRecordCollector =
189+
new RowRecordCollector(maxBatchSize, parquetSchemaToRead, fileSchema);
185190
return rowRecordCollector;
186191
}
187192

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetSchemaUtils.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,46 @@ public static DataType toKernelType(Type type) {
208208
}
209209
} else {
210210
if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.listType()) {
211-
GroupType gt = (GroupType) type;
211+
// Parquet's message type includes an additional group type between the array
212+
// and the child element type. For example:
213+
// optional group array_map_v (LIST) {
214+
// // The "list" field is between the array and child map type.
215+
// repeated group list {
216+
// optional group element (MAP) {
217+
// repeated group key_value {
218+
// required binary key (STRING);
219+
// required group value {
220+
// required binary value;
221+
// required binary metadata;
222+
// }
223+
// }
224+
// }
225+
// }
226+
// }
227+
GroupType gt = (GroupType) ((GroupType) type).getType(0);
212228
Type childType = gt.getType(0);
213229
return new ArrayType(
214230
toKernelType(childType), childType.getRepetition() == OPTIONAL);
215231
} else if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.mapType()) {
216-
GroupType gt = (GroupType) type;
217-
Type keyType = gt.getType(0);
218-
Type valueType = gt.getType(1);
232+
// Parquet's message type includes an additional group type between the map type
233+
// and the key and value types. For example:
234+
// optional group array_map_v (LIST) {
235+
// repeated group list {
236+
// optional group element (MAP) {
237+
// // The "key_value" field is between the map and child types.
238+
// repeated group key_value {
239+
// required binary key (STRING);
240+
// required group value {
241+
// required binary value;
242+
// required binary metadata;
243+
// }
244+
// }
245+
// }
246+
// }
247+
// }
248+
GroupType keyValueGroup = (GroupType) ((GroupType) type).getType(0);
249+
Type keyType = keyValueGroup.getType(0);
250+
Type valueType = keyValueGroup.getType(1);
219251
return new MapType(
220252
toKernelType(keyType),
221253
toKernelType(valueType),

0 commit comments

Comments
 (0)