Skip to content

Commit 8583da1

Browse files
committed
Fix more issues
1 parent 8559d6a commit 8583da1

File tree

19 files changed

+360
-282
lines changed

19 files changed

+360
-282
lines changed

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hudi.io.storage;
2020

2121
import org.apache.hudi.SparkAdapterSupport$;
22+
import org.apache.hudi.avro.AvroSchemaUtils;
2223
import org.apache.hudi.avro.HoodieAvroUtils;
2324
import org.apache.hudi.common.bloom.BloomFilter;
2425
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -31,6 +32,7 @@
3132
import org.apache.hudi.common.util.StringUtils;
3233
import org.apache.hudi.common.util.collection.ClosableIterator;
3334
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
35+
import org.apache.hudi.common.util.collection.Pair;
3436
import org.apache.hudi.storage.HoodieStorage;
3537
import org.apache.hudi.storage.StoragePath;
3638

@@ -56,6 +58,7 @@
5658

5759
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
5860
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
61+
import static org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;
5962

6063
public class HoodieSparkParquetReader implements HoodieSparkFileReader {
6164

@@ -126,13 +129,19 @@ private ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema
126129
if (requestedSchema == null) {
127130
requestedSchema = readerSchema;
128131
}
129-
StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(readerSchema);
130-
StructType requestedStructType = HoodieInternalRowUtils.getCachedSchema(requestedSchema);
131-
storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, readerStructType.json());
132-
storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), requestedStructType.json());
132+
133+
MessageType fileSchema = getFileSchema();
134+
Schema nonNullSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
135+
Option<MessageType> messageSchema = Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
136+
Pair<StructType, StructType> readerSchemas =
137+
SparkAdapterSupport$.MODULE$.sparkAdapter().getReaderSchemas(storage, readerSchema, requestedSchema, fileSchema);
138+
storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, readerSchemas.getLeft().json());
139+
storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), readerSchemas.getRight().json());
133140
storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(), SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
134141
storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
135-
ParquetReader<InternalRow> reader = ParquetReader.<InternalRow>builder((ReadSupport) new ParquetReadSupport(), new Path(path.toUri()))
142+
ParquetReader<InternalRow> reader = ParquetReader.<InternalRow>builder(
143+
(ReadSupport) SparkAdapterSupport$.MODULE$.sparkAdapter().getParquetReadSupport(messageSchema),
144+
new Path(path.toUri()))
136145
.withConf(storage.getConf().unwrapAs(Configuration.class))
137146
.build();
138147
ParquetReaderIterator<InternalRow> parquetReaderIterator = new ParquetReaderIterator<>(reader);

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala renamed to hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,41 @@
11
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one or more
3-
* contributor license agreements. See the NOTICE file distributed with
4-
* this work for additional information regarding copyright ownership.
5-
* The ASF licenses this file to You under the Apache License, Version 2.0
6-
* (the "License"); you may not use this file except in compliance with
7-
* the License. You may obtain a copy of the License at
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
89
*
9-
* http://www.apache.org/licenses/LICENSE-2.0
10+
* http://www.apache.org/licenses/LICENSE-2.0
1011
*
11-
* Unless required by applicable law or agreed to in writing, software
12-
* distributed under the License is distributed on an "AS IS" BASIS,
13-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14-
* See the License for the specific language governing permissions and
15-
* limitations under the License.
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
1618
*/
1719

1820
package org.apache.spark.sql.execution.datasources.parquet
1921

2022
import org.apache.hadoop.conf.Configuration
2123
import org.apache.parquet.hadoop.metadata.FileMetaData
22-
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType}
24+
import org.apache.spark.sql.types._
2325

2426
object HoodieParquetFileFormatHelper {
25-
2627
def buildImplicitSchemaChangeInfo(hadoopConf: Configuration,
2728
parquetFileMetaData: FileMetaData,
2829
requiredSchema: StructType): (java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]], StructType) = {
29-
val implicitTypeChangeInfo: java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]] = new java.util.HashMap()
3030
val convert = new ParquetToSparkSchemaConverter(hadoopConf)
3131
val fileStruct = convert.convert(parquetFileMetaData.getSchema)
32+
buildImplicitSchemaChangeInfo(fileStruct, requiredSchema)
33+
}
34+
35+
def buildImplicitSchemaChangeInfo(fileStruct: StructType,
36+
requiredSchema: StructType): (java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]], StructType) = {
37+
val implicitTypeChangeInfo: java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]] = new java.util.HashMap()
38+
3239
val fileStructMap = fileStruct.fields.map(f => (f.name, f.dataType)).toMap
3340
// if there are missing fields or if field's data type needs to be changed while reading, we handle it here.
3441
val sparkRequestStructFields = requiredSchema.map(f => {
@@ -45,6 +52,7 @@ object HoodieParquetFileFormatHelper {
4552
}
4653

4754
def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean = (requiredType, fileType) match {
55+
4856
case (requiredType, fileType) if requiredType == fileType => true
4957

5058
case (ArrayType(rt, _), ArrayType(ft, _)) =>

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818

1919
package org.apache.spark.sql.hudi
2020

21-
import org.apache.avro.Schema
2221
import org.apache.hudi.client.utils.SparkRowSerDe
2322
import org.apache.hudi.common.table.HoodieTableMetaClient
24-
import org.apache.hudi.storage.StoragePath
23+
import org.apache.hudi.storage.{HoodieStorage, StoragePath}
2524

25+
import org.apache.avro.Schema
26+
import org.apache.parquet.schema.MessageType
2627
import org.apache.spark.sql._
2728
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer}
2829
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
@@ -55,6 +56,15 @@ trait SparkAdapter extends Serializable {
5556

5657
def isTimestampNTZType(dataType: DataType): Boolean
5758

59+
def getParquetReadSupport(messageSchema: org.apache.hudi.common.util.Option[MessageType]): org.apache.parquet.hadoop.api.ReadSupport[_]
60+
61+
def repairSchemaIfSpecified(shouldRepair: Boolean,
62+
fileSchema: MessageType,
63+
tableSchemaOpt: org.apache.hudi.common.util.Option[MessageType]): MessageType
64+
65+
def getReaderSchemas(storage: HoodieStorage, readerSchema: Schema, requestedSchema: Schema, fileSchema: MessageType):
66+
org.apache.hudi.common.util.collection.Pair[StructType, StructType]
67+
5868
/**
5969
* Creates Catalyst [[Metadata]] for Hudi's meta-fields (designating these w/
6070
* [[METADATA_COL_ATTR_KEY]] if available (available in Spark >= 3.2)
@@ -172,7 +182,7 @@ trait SparkAdapter extends Serializable {
172182
/**
173183
* Create instance of [[ParquetFileFormat]]
174184
*/
175-
def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat]
185+
def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean, tableAvroSchema: Schema): Option[ParquetFileFormat]
176186

177187
def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int): ColumnarBatch
178188

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkBasicSchemaEvolution.scala renamed to hudi-client/hudi-spark-client/src/parquet/scala/org/apache/spark/sql/execution/datasources/parquet/SparkBasicSchemaEvolution.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ package org.apache.spark.sql.execution.datasources.parquet
2121

2222
import org.apache.spark.sql.types.StructType
2323

24-
2524
/**
2625
* Intended to be used just with HoodieSparkParquetReader to avoid any java/scala issues
2726
*/

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
244244
case HoodieFileFormat.PARQUET =>
245245
// We're delegating to Spark to append partition values to every row only in cases
246246
// when these corresponding partition-values are not persisted w/in the data file itself
247-
val parquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
247+
val parquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat(
248+
shouldExtractPartitionValuesFromPartitionPath, tableAvroSchema).get
248249
(parquetFileFormat, LegacyHoodieParquetFileFormat.FILE_FORMAT_ID)
249250
}
250251

@@ -552,7 +553,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
552553
hadoopConf = hadoopConf,
553554
// We're delegating to Spark to append partition values to every row only in cases
554555
// when these corresponding partition-values are not persisted w/in the data file itself
555-
appendPartitionValues = shouldAppendPartitionValuesOverride.getOrElse(shouldExtractPartitionValuesFromPartitionPath)
556+
appendPartitionValues = shouldAppendPartitionValuesOverride.getOrElse(shouldExtractPartitionValuesFromPartitionPath),
557+
tableAvroSchema
556558
)
557559
// Since partition values by default are omitted, and not persisted w/in data-files by Spark,
558560
// data-file readers (such as [[ParquetFileFormat]]) have to inject partition values while reading

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
5252
filters: Seq[Filter],
5353
options: Map[String, String],
5454
hadoopConf: Configuration,
55-
appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = {
56-
val parquetFileFormat: ParquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat(appendPartitionValues).get
55+
appendPartitionValues: Boolean = false,
56+
avroTableSchema: Schema): PartitionedFile => Iterator[InternalRow] = {
57+
val parquetFileFormat: ParquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat(appendPartitionValues, avroTableSchema).get
5758
val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues(
5859
sparkSession = sparkSession,
5960
dataSchema = dataSchema,

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ class CDCRelation(
102102
requiredSchema = tableStructSchema,
103103
filters = Nil,
104104
options = options,
105-
hadoopConf = spark.sessionState.newHadoopConf()
105+
hadoopConf = spark.sessionState.newHadoopConf(),
106+
avroTableSchema = tableAvroSchema
106107
)
107108

108109
val changes = cdcExtractor.extractCDCFileSplits().values().asScala.map { splits =>

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterS
3939

4040
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
4141
sparkAdapter
42-
.createLegacyHoodieParquetFileFormat(true).get.supportBatch(sparkSession, schema)
42+
.createLegacyHoodieParquetFileFormat(true, null).get.supportBatch(sparkSession, schema)
4343
}
4444

4545
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
@@ -54,7 +54,7 @@ class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterS
5454
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
5555

5656
sparkAdapter
57-
.createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
57+
.createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath, null).get
5858
.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
5959
}
6060
}

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,13 @@ import org.apache.hudi.hive.HiveSyncConfigHolder
4545
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
4646
import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, GlobalDeleteKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator, TimestampBasedKeyGenerator}
4747
import org.apache.hudi.metrics.{Metrics, MetricsReporterType}
48-
import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath, StoragePathFilter}
48+
import org.apache.hudi.storage.{StoragePath, StoragePathFilter}
4949
import org.apache.hudi.table.HoodieSparkTable
5050
import org.apache.hudi.testutils.HoodieSparkClientTestBase
5151
import org.apache.hudi.util.JFunction
52-
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, QuickstartUtils, ScalaAssertionSupport}
5352

5453
import org.apache.hadoop.fs.FileSystem
5554
import org.apache.spark.sql.functions.{col, concat, lit, udf, when}
56-
import org.apache.hadoop.conf.Configuration
57-
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
5855
import org.apache.spark.sql._
5956
import org.apache.spark.sql.functions._
6057
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
@@ -66,6 +63,7 @@ import org.junit.jupiter.api.function.Executable
6663
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
6764
import org.junit.jupiter.params.ParameterizedTest
6865
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
66+
import org.slf4j.LoggerFactory
6967

7068
import java.net.URI
7169
import java.nio.file.Paths
@@ -1699,7 +1697,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
16991697
}
17001698

17011699
@ParameterizedTest
1702-
@CsvSource(Array("true, 6", "false, 6", "true, 8", "false, 8", "true, 9", "false, 9"))
1700+
@CsvSource(Array("true, 6", "false, 6"))
17031701
def testLogicalTypesReadRepair(vectorizedReadEnabled: Boolean, tableVersion: Int): Unit = {
17041702
// Note: for spark 3.3 and 3.4 we should fall back to nonvectorized reader
17051703
// if that is not happening then this test will fail
@@ -1712,6 +1710,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
17121710
}
17131711
spark.conf.set("spark.sql.parquet.enableVectorizedReader", vectorizedReadEnabled.toString)
17141712
spark.conf.set("spark.sql.session.timeZone", "UTC")
1713+
spark.conf.set("spark.sql.parquet.inferTimestampNTZ.enabled", "true")
17151714
val tableName = "trips_logical_types_json_cow_read_v" + tableVersion
17161715
val dataPath = "file://" + basePath + "/" + tableName
17171716
val zipOutput = Paths.get(new URI(dataPath))
@@ -1725,15 +1724,15 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
17251724
for (row <- rows) {
17261725
val hash = row.get(6).asInstanceOf[String].hashCode()
17271726
if ((hash & 1) == 0) {
1728-
assertEquals("2020-01-01T00:00:00.001Z", row.get(15).asInstanceOf[Timestamp].toInstant.toString)
1729-
assertEquals("2020-06-01T12:00:00.000001Z", row.get(16).asInstanceOf[Timestamp].toInstant.toString)
1730-
assertEquals("2015-05-20T12:34:56.001", row.get(17).toString)
1731-
assertEquals("2017-07-07T07:07:07.000001", row.get(18).toString)
1727+
assertEquals("2020-01-01T00:00:00.001Z", row.get(14).asInstanceOf[Timestamp].toInstant.toString)
1728+
assertEquals("2020-06-01T12:00:00.000001Z", row.get(15).asInstanceOf[Timestamp].toInstant.toString)
1729+
assertEquals("2015-05-20T12:34:56.001", row.get(16).toString)
1730+
assertEquals("2017-07-07T07:07:07.000001", row.get(17).toString)
17321731
} else {
1733-
assertEquals("2019-12-31T23:59:59.999Z", row.get(15).asInstanceOf[Timestamp].toInstant.toString)
1734-
assertEquals("2020-06-01T11:59:59.999999Z", row.get(16).asInstanceOf[Timestamp].toInstant.toString)
1735-
assertEquals("2015-05-20T12:34:55.999", row.get(17).toString)
1736-
assertEquals("2017-07-07T07:07:06.999999", row.get(18).toString)
1732+
assertEquals("2019-12-31T23:59:59.999Z", row.get(14).asInstanceOf[Timestamp].toInstant.toString)
1733+
assertEquals("2020-06-01T11:59:59.999999Z", row.get(15).asInstanceOf[Timestamp].toInstant.toString)
1734+
assertEquals("2015-05-20T12:34:55.999", row.get(16).toString)
1735+
assertEquals("2017-07-07T07:07:06.999999", row.get(17).toString)
17371736
}
17381737
}
17391738

hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@
1818

1919
package org.apache.spark.sql.adapter
2020

21-
import org.apache.avro.Schema
22-
import org.apache.hadoop.fs.FileStatus
23-
import org.apache.hadoop.fs.Path
2421
import org.apache.hudi.client.utils.SparkRowSerDe
2522
import org.apache.hudi.common.table.HoodieTableMetaClient
26-
import org.apache.hudi.storage.StoragePath
23+
import org.apache.hudi.storage.{HoodieStorage, StoragePath}
2724
import org.apache.hudi.{AvroConversionUtils, DefaultSource, Spark2HoodieFileScanRDD, Spark2RowSerDe}
2825

2926
import org.apache.avro.Schema
3027
import org.apache.hadoop.conf.Configuration
28+
import org.apache.hadoop.fs.FileStatus
29+
import org.apache.hadoop.fs.Path
30+
import org.apache.parquet.schema.MessageType
3131
import org.apache.spark.sql._
3232
import org.apache.spark.sql.avro._
3333
import org.apache.spark.sql.catalyst.InternalRow
@@ -83,6 +83,19 @@ class Spark2Adapter extends SparkAdapter {
8383
dataType.getClass.getSimpleName.startsWith("TimestampNTZType")
8484
}
8585

86+
override def getParquetReadSupport(messageScheme: org.apache.hudi.common.util.Option[MessageType]):
87+
org.apache.parquet.hadoop.api.ReadSupport[_] = {
88+
// ParquetReadSupport is package-private in Spark 2.4, so we use reflection to instantiate it
89+
val clazz = Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport")
90+
clazz.getDeclaredConstructor().newInstance().asInstanceOf[org.apache.parquet.hadoop.api.ReadSupport[_]]
91+
}
92+
93+
override def repairSchemaIfSpecified(shouldRepair: Boolean,
94+
fileSchema: MessageType,
95+
tableSchemaOpt: org.apache.hudi.common.util.Option[MessageType]): MessageType = {
96+
fileSchema
97+
}
98+
8699
override def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark2CatalystPlanUtils
87100

88101
override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark2CatalystExpressionUtils
@@ -153,7 +166,7 @@ class Spark2Adapter extends SparkAdapter {
153166
partitions.toSeq
154167
}
155168

156-
override def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
169+
override def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean, tableAvroSchema: Schema): Option[ParquetFileFormat] = {
157170
Some(new Spark24LegacyHoodieParquetFileFormat(appendPartitionValues))
158171
}
159172

@@ -216,4 +229,12 @@ class Spark2Adapter extends SparkAdapter {
216229
batch.setNumRows(numRows)
217230
batch
218231
}
232+
233+
override def getReaderSchemas(storage: HoodieStorage, readerSchema: Schema, requestedSchema: Schema, fileSchema: MessageType):
234+
org.apache.hudi.common.util.collection.Pair[StructType, StructType] = {
235+
org.apache.hudi.common.util.collection.Pair.of(
236+
HoodieInternalRowUtils.getCachedSchema(readerSchema),
237+
HoodieInternalRowUtils.getCachedSchema(requestedSchema)
238+
)
239+
}
219240
}

0 commit comments

Comments
 (0)