Skip to content

Commit f02f7b8

Browse files
committed
Fix more issues
1 parent 8559d6a commit f02f7b8

File tree

21 files changed

+603
-392
lines changed

21 files changed

+603
-392
lines changed

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hudi.io.storage;
2020

2121
import org.apache.hudi.SparkAdapterSupport$;
22+
import org.apache.hudi.avro.AvroSchemaUtils;
2223
import org.apache.hudi.avro.HoodieAvroUtils;
2324
import org.apache.hudi.common.bloom.BloomFilter;
2425
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -31,6 +32,7 @@
3132
import org.apache.hudi.common.util.StringUtils;
3233
import org.apache.hudi.common.util.collection.ClosableIterator;
3334
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
35+
import org.apache.hudi.common.util.collection.Pair;
3436
import org.apache.hudi.storage.HoodieStorage;
3537
import org.apache.hudi.storage.StoragePath;
3638

@@ -56,6 +58,7 @@
5658

5759
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
5860
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
61+
import static org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;
5962

6063
public class HoodieSparkParquetReader implements HoodieSparkFileReader {
6164

@@ -126,13 +129,19 @@ private ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema
126129
if (requestedSchema == null) {
127130
requestedSchema = readerSchema;
128131
}
129-
StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(readerSchema);
130-
StructType requestedStructType = HoodieInternalRowUtils.getCachedSchema(requestedSchema);
131-
storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, readerStructType.json());
132-
storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), requestedStructType.json());
132+
133+
MessageType fileSchema = getFileSchema();
134+
Schema nonNullSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
135+
Option<MessageType> messageSchema = Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
136+
Pair<StructType, StructType> readerSchemas =
137+
SparkAdapterSupport$.MODULE$.sparkAdapter().getReaderSchemas(storage, readerSchema, requestedSchema, fileSchema);
138+
storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, readerSchemas.getLeft().json());
139+
storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), readerSchemas.getRight().json());
133140
storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(), SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
134141
storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
135-
ParquetReader<InternalRow> reader = ParquetReader.<InternalRow>builder((ReadSupport) new ParquetReadSupport(), new Path(path.toUri()))
142+
ParquetReader<InternalRow> reader = ParquetReader.<InternalRow>builder(
143+
(ReadSupport) SparkAdapterSupport$.MODULE$.sparkAdapter().getParquetReadSupport(messageSchema),
144+
new Path(path.toUri()))
136145
.withConf(storage.getConf().unwrapAs(Configuration.class))
137146
.build();
138147
ParquetReaderIterator<InternalRow> parquetReaderIterator = new ParquetReaderIterator<>(reader);

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala renamed to hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,41 @@
11
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one or more
3-
* contributor license agreements. See the NOTICE file distributed with
4-
* this work for additional information regarding copyright ownership.
5-
* The ASF licenses this file to You under the Apache License, Version 2.0
6-
* (the "License"); you may not use this file except in compliance with
7-
* the License. You may obtain a copy of the License at
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
89
*
9-
* http://www.apache.org/licenses/LICENSE-2.0
10+
* http://www.apache.org/licenses/LICENSE-2.0
1011
*
11-
* Unless required by applicable law or agreed to in writing, software
12-
* distributed under the License is distributed on an "AS IS" BASIS,
13-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14-
* See the License for the specific language governing permissions and
15-
* limitations under the License.
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
1618
*/
1719

1820
package org.apache.spark.sql.execution.datasources.parquet
1921

2022
import org.apache.hadoop.conf.Configuration
2123
import org.apache.parquet.hadoop.metadata.FileMetaData
22-
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType}
24+
import org.apache.spark.sql.types._
2325

2426
object HoodieParquetFileFormatHelper {
25-
2627
def buildImplicitSchemaChangeInfo(hadoopConf: Configuration,
2728
parquetFileMetaData: FileMetaData,
2829
requiredSchema: StructType): (java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]], StructType) = {
29-
val implicitTypeChangeInfo: java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]] = new java.util.HashMap()
3030
val convert = new ParquetToSparkSchemaConverter(hadoopConf)
3131
val fileStruct = convert.convert(parquetFileMetaData.getSchema)
32+
buildImplicitSchemaChangeInfo(fileStruct, requiredSchema)
33+
}
34+
35+
def buildImplicitSchemaChangeInfo(fileStruct: StructType,
36+
requiredSchema: StructType): (java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]], StructType) = {
37+
val implicitTypeChangeInfo: java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]] = new java.util.HashMap()
38+
3239
val fileStructMap = fileStruct.fields.map(f => (f.name, f.dataType)).toMap
3340
// if there are missing fields or if field's data type needs to be changed while reading, we handle it here.
3441
val sparkRequestStructFields = requiredSchema.map(f => {
@@ -45,6 +52,7 @@ object HoodieParquetFileFormatHelper {
4552
}
4653

4754
def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean = (requiredType, fileType) match {
55+
4856
case (requiredType, fileType) if requiredType == fileType => true
4957

5058
case (ArrayType(rt, _), ArrayType(ft, _)) =>

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818

1919
package org.apache.spark.sql.hudi
2020

21-
import org.apache.avro.Schema
2221
import org.apache.hudi.client.utils.SparkRowSerDe
2322
import org.apache.hudi.common.table.HoodieTableMetaClient
24-
import org.apache.hudi.storage.StoragePath
23+
import org.apache.hudi.storage.{HoodieStorage, StoragePath}
2524

25+
import org.apache.avro.Schema
26+
import org.apache.parquet.schema.MessageType
2627
import org.apache.spark.sql._
2728
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer}
2829
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
@@ -55,6 +56,15 @@ trait SparkAdapter extends Serializable {
5556

5657
def isTimestampNTZType(dataType: DataType): Boolean
5758

59+
def getParquetReadSupport(messageSchema: org.apache.hudi.common.util.Option[MessageType]): org.apache.parquet.hadoop.api.ReadSupport[_]
60+
61+
def repairSchemaIfSpecified(shouldRepair: Boolean,
62+
fileSchema: MessageType,
63+
tableSchemaOpt: org.apache.hudi.common.util.Option[MessageType]): MessageType
64+
65+
def getReaderSchemas(storage: HoodieStorage, readerSchema: Schema, requestedSchema: Schema, fileSchema: MessageType):
66+
org.apache.hudi.common.util.collection.Pair[StructType, StructType]
67+
5868
/**
5969
* Creates Catalyst [[Metadata]] for Hudi's meta-fields (designating these w/
6070
* [[METADATA_COL_ATTR_KEY]] if available (available in Spark >= 3.2)
@@ -172,7 +182,7 @@ trait SparkAdapter extends Serializable {
172182
/**
173183
* Create instance of [[ParquetFileFormat]]
174184
*/
175-
def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat]
185+
def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean, tableAvroSchema: Schema): Option[ParquetFileFormat]
176186

177187
def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int): ColumnarBatch
178188

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.execution.datasources.parquet
21+
22+
import org.apache.spark.sql.HoodieSchemaUtils
23+
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
24+
import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, Attribute, Cast, CreateNamedStruct, CreateStruct, Expression, GetStructField, LambdaFunction, Literal, MapEntries, MapFromEntries, NamedLambdaVariable, UnsafeProjection}
25+
import org.apache.spark.sql.types._
26+
27+
object HoodieLegacyParquetFileFormatHelper {
28+
def generateUnsafeProjection(fullSchema: Seq[Attribute],
29+
timeZoneId: Option[String],
30+
typeChangeInfos: java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]],
31+
requiredSchema: StructType,
32+
partitionSchema: StructType,
33+
schemaUtils: HoodieSchemaUtils): UnsafeProjection = {
34+
val addedCastCache = scala.collection.mutable.HashMap.empty[(DataType, DataType), Boolean]
35+
36+
def hasUnsupportedConversion(src: DataType, dst: DataType): Boolean = {
37+
addedCastCache.getOrElseUpdate((src, dst), {
38+
(src, dst) match {
39+
case (FloatType, DoubleType) => true
40+
case (IntegerType, DecimalType()) => true
41+
case (LongType, DecimalType()) => true
42+
case (FloatType, DecimalType()) => true
43+
case (DoubleType, DecimalType()) => true
44+
case (StringType, DecimalType()) => true
45+
case (StringType, DateType) => true
46+
case (StructType(srcFields), StructType(dstFields)) =>
47+
srcFields.zip(dstFields).exists { case (sf, df) => hasUnsupportedConversion(sf.dataType, df.dataType) }
48+
case (ArrayType(sElem, _), ArrayType(dElem, _)) =>
49+
hasUnsupportedConversion(sElem, dElem)
50+
case (MapType(sKey, sVal, _), MapType(dKey, dVal, _)) =>
51+
hasUnsupportedConversion(sKey, dKey) || hasUnsupportedConversion(sVal, dVal)
52+
case _ => false
53+
}
54+
})
55+
}
56+
57+
def recursivelyCastExpressions(expr: Expression, srcType: DataType, dstType: DataType): Expression = {
58+
lazy val needTimeZone = Cast.needsTimeZone(srcType, dstType)
59+
(srcType, dstType) match {
60+
case (FloatType, DoubleType) =>
61+
val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else None)
62+
Cast(toStr, dstType, if (needTimeZone) timeZoneId else None)
63+
case (IntegerType | LongType | FloatType | DoubleType, dec: DecimalType) =>
64+
val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else None)
65+
Cast(toStr, dec, if (needTimeZone) timeZoneId else None)
66+
case (StringType, dec: DecimalType) =>
67+
Cast(expr, dec, if (needTimeZone) timeZoneId else None)
68+
case (StringType, DateType) =>
69+
Cast(expr, DateType, if (needTimeZone) timeZoneId else None)
70+
case (s: StructType, d: StructType) if hasUnsupportedConversion(s, d) =>
71+
val structFields = s.fields.zip(d.fields).zipWithIndex.map {
72+
case ((srcField, dstField), i) =>
73+
val child = GetStructField(expr, i, Some(dstField.name))
74+
recursivelyCastExpressions(child, srcField.dataType, dstField.dataType)
75+
}
76+
CreateNamedStruct(d.fields.zip(structFields).flatMap {
77+
case (f, c) => Seq(Literal(f.name), c)
78+
})
79+
case (ArrayType(sElementType, containsNull), ArrayType(dElementType, _)) if hasUnsupportedConversion(sElementType, dElementType) =>
80+
val lambdaVar = NamedLambdaVariable("element", sElementType, containsNull)
81+
val body = recursivelyCastExpressions(lambdaVar, sElementType, dElementType)
82+
val func = LambdaFunction(body, Seq(lambdaVar))
83+
ArrayTransform(expr, func)
84+
case (MapType(sKeyType, sValType, vnull), MapType(dKeyType, dValType, _))
85+
if hasUnsupportedConversion(sKeyType, dKeyType) || hasUnsupportedConversion(sValType, dValType) =>
86+
val kv = NamedLambdaVariable("kv", new StructType()
87+
.add("key", sKeyType, nullable = false)
88+
.add("value", sValType, nullable = vnull), nullable = false)
89+
val newKey = recursivelyCastExpressions(GetStructField(kv, 0), sKeyType, dKeyType)
90+
val newVal = recursivelyCastExpressions(GetStructField(kv, 1), sValType, dValType)
91+
val entry = CreateStruct(Seq(newKey, newVal))
92+
val func = LambdaFunction(entry, Seq(kv))
93+
val transformed = ArrayTransform(MapEntries(expr), func)
94+
MapFromEntries(transformed)
95+
case _ =>
96+
// most cases should be covered here we only need to do the recursive work for float to double
97+
Cast(expr, dstType, if (needTimeZone) timeZoneId else None)
98+
}
99+
}
100+
101+
if (typeChangeInfos.isEmpty) {
102+
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
103+
} else {
104+
// find type changed.
105+
val newSchema = new StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
106+
if (typeChangeInfos.containsKey(i)) {
107+
StructField(f.name, typeChangeInfos.get(i).getRight, f.nullable, f.metadata)
108+
} else f
109+
})
110+
val newFullSchema = schemaUtils.toAttributes(newSchema) ++ schemaUtils.toAttributes(partitionSchema)
111+
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
112+
if (typeChangeInfos.containsKey(i)) {
113+
val srcType = typeChangeInfos.get(i).getRight
114+
val dstType = typeChangeInfos.get(i).getLeft
115+
recursivelyCastExpressions(attr, srcType, dstType)
116+
} else attr
117+
}
118+
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
119+
}
120+
}
121+
}

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkBasicSchemaEvolution.scala renamed to hudi-client/hudi-spark-client/src/parquet/scala/org/apache/spark/sql/execution/datasources/parquet/SparkBasicSchemaEvolution.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ package org.apache.spark.sql.execution.datasources.parquet
2121

2222
import org.apache.spark.sql.types.StructType
2323

24-
2524
/**
2625
* Intended to be used just with HoodieSparkParquetReader to avoid any java/scala issues
2726
*/

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
244244
case HoodieFileFormat.PARQUET =>
245245
// We're delegating to Spark to append partition values to every row only in cases
246246
// when these corresponding partition-values are not persisted w/in the data file itself
247-
val parquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
247+
val parquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat(
248+
shouldExtractPartitionValuesFromPartitionPath, tableAvroSchema).get
248249
(parquetFileFormat, LegacyHoodieParquetFileFormat.FILE_FORMAT_ID)
249250
}
250251

@@ -552,7 +553,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
552553
hadoopConf = hadoopConf,
553554
// We're delegating to Spark to append partition values to every row only in cases
554555
// when these corresponding partition-values are not persisted w/in the data file itself
555-
appendPartitionValues = shouldAppendPartitionValuesOverride.getOrElse(shouldExtractPartitionValuesFromPartitionPath)
556+
appendPartitionValues = shouldAppendPartitionValuesOverride.getOrElse(shouldExtractPartitionValuesFromPartitionPath),
557+
tableAvroSchema
556558
)
557559
// Since partition values by default are omitted, and not persisted w/in data-files by Spark,
558560
// data-file readers (such as [[ParquetFileFormat]]) have to inject partition values while reading

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
5252
filters: Seq[Filter],
5353
options: Map[String, String],
5454
hadoopConf: Configuration,
55-
appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = {
56-
val parquetFileFormat: ParquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat(appendPartitionValues).get
55+
appendPartitionValues: Boolean = false,
56+
avroTableSchema: Schema): PartitionedFile => Iterator[InternalRow] = {
57+
val parquetFileFormat: ParquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat(appendPartitionValues, avroTableSchema).get
5758
val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues(
5859
sparkSession = sparkSession,
5960
dataSchema = dataSchema,

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ class CDCRelation(
102102
requiredSchema = tableStructSchema,
103103
filters = Nil,
104104
options = options,
105-
hadoopConf = spark.sessionState.newHadoopConf()
105+
hadoopConf = spark.sessionState.newHadoopConf(),
106+
avroTableSchema = tableAvroSchema
106107
)
107108

108109
val changes = cdcExtractor.extractCDCFileSplits().values().asScala.map { splits =>

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterS
3939

4040
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
4141
sparkAdapter
42-
.createLegacyHoodieParquetFileFormat(true).get.supportBatch(sparkSession, schema)
42+
.createLegacyHoodieParquetFileFormat(true, null).get.supportBatch(sparkSession, schema)
4343
}
4444

4545
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
@@ -54,7 +54,7 @@ class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterS
5454
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
5555

5656
sparkAdapter
57-
.createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
57+
.createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath, null).get
5858
.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
5959
}
6060
}

0 commit comments

Comments
 (0)