Skip to content

Commit 79c4a88

Browse files
committed
Fix cherry-pick error
1 parent b891bf2 commit 79c4a88

File tree

34 files changed

+685
-520
lines changed

34 files changed

+685
-520
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java

Lines changed: 1 addition & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
4747
import org.apache.hudi.common.model.HoodieFileFormat;
4848
import org.apache.hudi.common.model.HoodieKey;
49+
import org.apache.hudi.common.model.HoodieRecordLocation;
4950
import org.apache.hudi.common.model.HoodieWriteStat;
5051
import org.apache.hudi.common.table.HoodieTableConfig;
5152
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -108,7 +109,6 @@
108109
import java.util.stream.Collectors;
109110
import java.util.stream.Stream;
110111

111-
import static org.apache.hudi.avro.AvroSchemaUtils.getNonNullTypeFromUnion;
112112
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
113113
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.LAZY;
114114
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
@@ -891,70 +891,6 @@ private void validateSchema() throws HoodieUpsertException, HoodieInsertExceptio
891891
}
892892
}
893893

894-
/**
895-
* Validates that columns with secondary indexes are not evolved in an incompatible way.
896-
*
897-
* @param tableSchema the current table schema
898-
* @param writerSchema the new writer schema
899-
* @param indexMetadata the index metadata containing all index definitions
900-
* @throws SchemaCompatibilityException if a secondary index column has incompatible evolution
901-
*/
902-
static void validateSecondaryIndexSchemaEvolution(
903-
Schema tableSchema,
904-
Schema writerSchema,
905-
HoodieIndexMetadata indexMetadata) throws SchemaCompatibilityException {
906-
907-
// Filter for secondary index definitions
908-
List<HoodieIndexDefinition> secondaryIndexDefs = indexMetadata.getIndexDefinitions().values().stream()
909-
.filter(indexDef -> MetadataPartitionType.fromPartitionPath(indexDef.getIndexName()).equals(MetadataPartitionType.SECONDARY_INDEX))
910-
.collect(Collectors.toList());
911-
912-
if (secondaryIndexDefs.isEmpty()) {
913-
return;
914-
}
915-
916-
// Create a map from source field to index name for efficient lookup
917-
Map<String, String> columnToIndexName = new HashMap<>();
918-
for (HoodieIndexDefinition indexDef : secondaryIndexDefs) {
919-
String indexName = indexDef.getIndexName();
920-
for (String sourceField : indexDef.getSourceFields()) {
921-
// Note: If a column is part of multiple indexes, this will use the last one
922-
// This is fine since we just need any index name for error reporting
923-
columnToIndexName.put(sourceField, indexName);
924-
}
925-
}
926-
927-
// Check each indexed column for schema evolution
928-
for (Map.Entry<String, String> entry : columnToIndexName.entrySet()) {
929-
String columnName = entry.getKey();
930-
String indexName = entry.getValue();
931-
932-
Schema.Field tableField = tableSchema.getField(columnName);
933-
934-
if (tableField == null) {
935-
// This shouldn't happen as indexed columns should exist in table schema
936-
LOG.warn("Secondary index '{}' references non-existent column: {}", indexName, columnName);
937-
continue;
938-
}
939-
940-
// Use AvroSchemaCompatibility's field lookup logic to handle aliases
941-
Schema.Field writerField = AvroSchemaCompatibility.lookupWriterField(writerSchema, tableField);
942-
943-
if (writerField != null && !tableField.schema().equals(writerField.schema())) {
944-
// Check if this is just making the field nullable/non-nullable, which is safe from SI perspective
945-
if (getNonNullTypeFromUnion(tableField.schema()).equals(getNonNullTypeFromUnion(writerField.schema()))) {
946-
continue;
947-
}
948-
949-
String errorMessage = String.format(
950-
"Column '%s' has secondary index '%s' and cannot evolve from schema '%s' to '%s'. "
951-
+ "Please drop the secondary index before changing the column type.",
952-
columnName, indexName, tableField.schema(), writerField.schema());
953-
throw new SchemaCompatibilityException(errorMessage);
954-
}
955-
}
956-
}
957-
958894
public void validateUpsertSchema() throws HoodieUpsertException {
959895
if (isMetadataTable) {
960896
return;

hudi-client/hudi-spark-client/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,26 @@
281281
<groupId>org.apache.rat</groupId>
282282
<artifactId>apache-rat-plugin</artifactId>
283283
</plugin>
284+
<plugin>
285+
<groupId>org.codehaus.mojo</groupId>
286+
<artifactId>build-helper-maven-plugin</artifactId>
287+
<version>3.5.0</version>
288+
<executions>
289+
<execution>
290+
<id>add-spark32plus-parquet-sources</id>
291+
<phase>generate-sources</phase>
292+
<goals>
293+
<goal>add-source</goal>
294+
</goals>
295+
<configuration>
296+
<skipAddSource>${spark31orEarlier}</skipAddSource>
297+
<sources>
298+
<source>src/parquet/scala</source>
299+
</sources>
300+
</configuration>
301+
</execution>
302+
</executions>
303+
</plugin>
284304
</plugins>
285305

286306
<resources>

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ private StructType convertToStruct(MessageType messageType) {
177177

178178
@Override
179179
public void close() {
180-
readerIterators.forEach(ParquetReaderIterator::close);
180+
readerIterators.forEach(it -> it.close());
181181
}
182182

183183
@Override

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,19 @@
1818

1919
package org.apache.hudi.io.storage.row;
2020

21-
import org.apache.avro.LogicalTypes;
22-
import org.apache.avro.Schema;
23-
import org.apache.hadoop.conf.Configuration;
24-
25-
import org.apache.hudi.SparkAdapterSupport$;
2621
import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
2722
import org.apache.hudi.common.bloom.BloomFilter;
2823
import org.apache.hudi.common.config.HoodieConfig;
2924
import org.apache.hudi.common.config.HoodieStorageConfig;
3025
import org.apache.hudi.common.util.Option;
3126
import org.apache.hudi.common.util.ReflectionUtils;
3227

28+
import org.apache.hadoop.conf.Configuration;
3329
import org.apache.parquet.hadoop.api.WriteSupport;
34-
import org.apache.parquet.schema.GroupType;
35-
import org.apache.parquet.schema.LogicalTypeAnnotation;
36-
import org.apache.parquet.schema.Type;
37-
import org.apache.parquet.schema.Types;
38-
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils;
3930
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
40-
import org.apache.spark.sql.types.DataTypes;
41-
import org.apache.spark.sql.types.Decimal;
42-
import org.apache.spark.sql.types.Metadata;
4331
import org.apache.spark.sql.types.StructType;
4432
import org.apache.spark.unsafe.types.UTF8String;
4533

46-
import java.util.Arrays;
4734
import java.util.Collections;
4835
import java.util.Map;
4936

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,15 @@
1919
package org.apache.spark.sql.hudi
2020

2121
import org.apache.avro.Schema
22-
import org.apache.hadoop.fs.{FileStatus, Path}
2322
import org.apache.hudi.client.utils.SparkRowSerDe
2423
import org.apache.hudi.common.table.HoodieTableMetaClient
2524
import org.apache.hudi.storage.StoragePath
2625

27-
import org.apache.avro.Schema
28-
import org.apache.hadoop.conf.Configuration
2926
import org.apache.spark.sql._
3027
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer}
3128
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
3229
import org.apache.spark.sql.catalyst.catalog.CatalogTable
33-
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
34-
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, InterpretedPredicate}
30+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate}
3531
import org.apache.spark.sql.catalyst.parser.ParserInterface
3632
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
3733
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
@@ -53,7 +49,7 @@ import java.util.{Locale, TimeZone}
5349
trait SparkAdapter extends Serializable {
5450

5551
/**
56-
* Checks whether provided instance of [[InternalRow]] is actually an instance of [[ColumnarBatchRow]]
52+
* Checks whether provided instance of [[InternalRow]] is actually an instance of [[org.apache.spark.sql.vectorized.ColumnarBatchRow]]
5753
*/
5854
def isColumnarBatchRow(r: InternalRow): Boolean
5955

@@ -72,7 +68,7 @@ trait SparkAdapter extends Serializable {
7268

7369
/**
7470
* Returns an instance of [[HoodieCatalogUtils]] providing for common utils operating on Spark's
75-
* [[TableCatalog]]s
71+
* [[org.apache.spark.sql.connector.catalog.TableCatalog]]s
7672
*/
7773
def getCatalogUtils: HoodieCatalogUtils
7874

@@ -207,7 +203,7 @@ trait SparkAdapter extends Serializable {
207203
metadataColumns: Seq[AttributeReference] = Seq.empty): FileScanRDD
208204

209205
/**
210-
* Extract condition in [[DeleteFromTable]]
206+
* Extract condition in [[org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable]]
211207
* SPARK-38626 condition is no longer Option in Spark 3.3
212208
*/
213209
def extractDeleteCondition(deleteFromTable: Command): Expression
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.execution.datasources.parquet
21+
22+
import org.apache.hudi.SparkAdapterSupport
23+
import org.apache.hudi.common.util.ValidationUtils
24+
import org.apache.parquet.hadoop.api.InitContext
25+
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
26+
import org.apache.parquet.schema.{GroupType, MessageType, Type, Types}
27+
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
28+
29+
import java.time.ZoneId
30+
import scala.collection.JavaConverters._
31+
32+
class HoodieParquetReadSupport(
33+
convertTz: Option[ZoneId],
34+
enableVectorizedReader: Boolean,
35+
val enableTimestampFieldRepair: Boolean,
36+
datetimeRebaseSpec: RebaseSpec,
37+
int96RebaseSpec: RebaseSpec,
38+
tableSchemaOpt: org.apache.hudi.common.util.Option[MessageType] = org.apache.hudi.common.util.Option.empty())
39+
extends ParquetReadSupport(convertTz, enableVectorizedReader, datetimeRebaseSpec, int96RebaseSpec) with SparkAdapterSupport {
40+
41+
override def init(context: InitContext): ReadContext = {
42+
val readContext = super.init(context)
43+
// repair is needed here because this is the schema that is used by the reader to decide what
44+
// conversions are necessary
45+
val requestedParquetSchema = if (enableTimestampFieldRepair) {
46+
HoodieParquetReadSupport.getRepairedSchema(readContext.getRequestedSchema, tableSchemaOpt)
47+
} else {
48+
readContext.getRequestedSchema
49+
}
50+
val trimmedParquetSchema = HoodieParquetReadSupport.trimParquetSchema(requestedParquetSchema, context.getFileSchema)
51+
new ReadContext(trimmedParquetSchema, readContext.getReadSupportMetadata)
52+
}
53+
}
54+
55+
object HoodieParquetReadSupport {
56+
/**
57+
* Removes any fields from the parquet schema that do not have any child fields in the actual file schema after the
58+
* schema is trimmed down to the requested fields. This can happen when the table schema evolves and only a subset of
59+
* the nested fields are required by the query.
60+
*
61+
* @param requestedSchema the initial parquet schema requested by Spark
62+
* @param fileSchema the actual parquet schema of the file
63+
* @return a potentially updated schema with empty struct fields removed
64+
*/
65+
def trimParquetSchema(requestedSchema: MessageType, fileSchema: MessageType): MessageType = {
66+
val trimmedFields = requestedSchema.getFields.asScala.map(field => {
67+
if (fileSchema.containsField(field.getName)) {
68+
trimParquetType(field, fileSchema.asGroupType().getType(field.getName))
69+
} else {
70+
Some(field)
71+
}
72+
}).filter(_.isDefined).map(_.get).toArray[Type]
73+
Types.buildMessage().addFields(trimmedFields: _*).named(requestedSchema.getName)
74+
}
75+
76+
private def trimParquetType(requestedType: Type, fileType: Type): Option[Type] = {
77+
if (requestedType.equals(fileType)) {
78+
Some(requestedType)
79+
} else {
80+
requestedType match {
81+
case groupType: GroupType =>
82+
ValidationUtils.checkState(!fileType.isPrimitive,
83+
"Group type provided by requested schema but existing type in the file is a primitive")
84+
val fileTypeGroup = fileType.asGroupType()
85+
var hasMatchingField = false
86+
val fields = groupType.getFields.asScala.map(field => {
87+
if (fileTypeGroup.containsField(field.getName)) {
88+
hasMatchingField = true
89+
trimParquetType(field, fileType.asGroupType().getType(field.getName))
90+
} else {
91+
Some(field)
92+
}
93+
}).filter(_.isDefined).map(_.get).asJava
94+
if (hasMatchingField && !fields.isEmpty) {
95+
Some(groupType.withNewFields(fields))
96+
} else {
97+
None
98+
}
99+
case _ => Some(requestedType)
100+
}
101+
}
102+
}
103+
104+
def getRepairedSchema(fileSchema: MessageType, tableSchema: org.apache.hudi.common.util.Option[MessageType]): MessageType = {
105+
try {
106+
val schemaRepairClass = Class.forName("org.apache.parquet.schema.SchemaRepair")
107+
val repairMethod = schemaRepairClass.getMethod(
108+
"repairLogicalTypes", classOf[MessageType], classOf[org.apache.hudi.common.util.Option[MessageType]])
109+
repairMethod.invoke(null, fileSchema, tableSchema).asInstanceOf[MessageType]
110+
} catch {
111+
case _: Exception => fileSchema
112+
}
113+
}
114+
}

hudi-common/pom.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,39 @@
5454
<skip>false</skip>
5555
</configuration>
5656
</plugin>
57+
<plugin>
58+
<groupId>org.codehaus.mojo</groupId>
59+
<artifactId>build-helper-maven-plugin</artifactId>
60+
<version>3.5.0</version>
61+
<executions>
62+
<execution>
63+
<id>add-spark34plus-avro-sources</id>
64+
<phase>generate-sources</phase>
65+
<goals>
66+
<goal>add-source</goal>
67+
</goals>
68+
<configuration>
69+
<skipAddSource>${spark33orEarlier}</skipAddSource>
70+
<sources>
71+
<source>src/avro/java</source>
72+
</sources>
73+
</configuration>
74+
</execution>
75+
<execution>
76+
<id>add-spark34plus-avro-test-sources</id>
77+
<phase>generate-test-sources</phase>
78+
<goals>
79+
<goal>add-test-source</goal>
80+
</goals>
81+
<configuration>
82+
<skipAddTestSource>${spark33orEarlier}</skipAddTestSource>
83+
<sources>
84+
<source>src/avro/test/java</source>
85+
</sources>
86+
</configuration>
87+
</execution>
88+
</executions>
89+
</plugin>
5790
<plugin>
5891
<groupId>org.apache.rat</groupId>
5992
<artifactId>apache-rat-plugin</artifactId>

0 commit comments

Comments
 (0)