Skip to content

Commit cab988c

Browse files
jonvexnsivabalanVamsiyihualinliu-code
committed
fix(ingest): Repair affected logical timestamp milli tables (#14161)
Co-authored-by: Jonathan Vexler <=> Co-authored-by: sivabalan <[email protected]> Co-authored-by: Vamsi <[email protected]> Co-authored-by: Y Ethan Guo <[email protected]> Co-authored-by: Lin Liu <[email protected]>
1 parent f11bef6 commit cab988c

File tree

99 files changed

+5326
-271
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+5326
-271
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@
117117
import java.util.stream.Collectors;
118118
import java.util.stream.Stream;
119119

120-
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
120+
import static org.apache.hudi.avro.AvroSchemaUtils.getNonNullTypeFromUnion;
121121
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
122122
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.LAZY;
123123
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
@@ -1019,7 +1019,7 @@ static void validateSecondaryIndexSchemaEvolution(
10191019

10201020
if (writerField != null && !tableField.schema().equals(writerField.schema())) {
10211021
// Check if this is just making the field nullable/non-nullable, which is safe from SI perspective
1022-
if (resolveNullableSchema(tableField.schema()).equals(resolveNullableSchema(writerField.schema()))) {
1022+
if (getNonNullTypeFromUnion(tableField.schema()).equals(getNonNullTypeFromUnion(writerField.schema()))) {
10231023
continue;
10241024
}
10251025

hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public StorageConfiguration<?> getStorageConf() {
9494

9595
@Override
9696
public HoodieReaderContext<ArrayWritable> getHoodieReaderContext(String tablePath, Schema avroSchema, StorageConfiguration<?> storageConf, HoodieTableMetaClient metaClient) {
97-
HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator = (inputSplit, jobConf) -> new MapredParquetInputFormat().getRecordReader(inputSplit, jobConf, null);
97+
HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator = (inputSplit, jobConf, dataSchema) -> new MapredParquetInputFormat().getRecordReader(inputSplit, jobConf, null);
9898
JobConf jobConf = new JobConf(storageConf.unwrapAs(Configuration.class));
9999
setupJobconf(jobConf, avroSchema);
100100
return new HiveHoodieReaderContext(readerCreator,

hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.avro.Schema;
2424
import org.apache.hadoop.hive.serde2.io.DateWritable;
2525
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
26+
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
2627
import org.apache.hadoop.io.ArrayWritable;
2728
import org.apache.hadoop.io.BooleanWritable;
2829
import org.apache.hadoop.io.BytesWritable;
@@ -268,7 +269,11 @@ private static void assertWritablePrimaryTypeMatchesSchema(Schema schema, Writab
268269
break;
269270

270271
case LONG:
271-
assertInstanceOf(LongWritable.class, writable);
272+
if (schema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
273+
assertInstanceOf(TimestampWritable.class, writable);
274+
} else {
275+
assertInstanceOf(LongWritable.class, writable);
276+
}
272277
break;
273278

274279
case FLOAT:

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hudi.io.storage;
2020

2121
import org.apache.hudi.SparkAdapterSupport$;
22+
import org.apache.hudi.avro.AvroSchemaUtils;
2223
import org.apache.hudi.avro.HoodieAvroUtils;
2324
import org.apache.hudi.common.bloom.BloomFilter;
2425
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -40,6 +41,7 @@
4041
import org.apache.hadoop.fs.Path;
4142
import org.apache.parquet.hadoop.ParquetReader;
4243
import org.apache.parquet.schema.MessageType;
44+
import org.apache.parquet.schema.SchemaRepair;
4345
import org.apache.spark.sql.HoodieInternalRowUtils;
4446
import org.apache.spark.sql.catalyst.InternalRow;
4547
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
@@ -60,13 +62,16 @@
6062

6163
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
6264
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
65+
import static org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;
6366

6467
public class HoodieSparkParquetReader implements HoodieSparkFileReader {
6568

69+
public static final String ENABLE_LOGICAL_TIMESTAMP_REPAIR = "spark.hudi.logicalTimestampField.repair.enable";
6670
private final StoragePath path;
6771
private final HoodieStorage storage;
6872
private final FileFormatUtils parquetUtils;
6973
private final List<ClosableIterator> readerIterators = new ArrayList<>();
74+
private Option<MessageType> fileSchemaOption = Option.empty();
7075
private Option<StructType> structTypeOption = Option.empty();
7176
private Option<Schema> schemaOption = Option.empty();
7277

@@ -116,19 +121,20 @@ public ClosableIterator<String> getRecordKeyIterator() throws IOException {
116121
}
117122

118123
public ClosableIterator<UnsafeRow> getUnsafeRowIterator(Schema requestedSchema) throws IOException {
119-
return getUnsafeRowIterator(HoodieInternalRowUtils.getCachedSchema(requestedSchema));
120-
}
121-
122-
public ClosableIterator<UnsafeRow> getUnsafeRowIterator(StructType requestedSchema) throws IOException {
123-
SparkBasicSchemaEvolution evolution = new SparkBasicSchemaEvolution(getStructSchema(), requestedSchema, SQLConf.get().sessionLocalTimeZone());
124+
Schema nonNullSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
125+
StructType structSchema = HoodieInternalRowUtils.getCachedSchema(nonNullSchema);
126+
Option<MessageType> messageSchema = Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
127+
boolean enableTimestampFieldRepair = storage.getConf().getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true);
128+
StructType dataStructType = convertToStruct(enableTimestampFieldRepair ? SchemaRepair.repairLogicalTypes(getFileSchema(), messageSchema) : getFileSchema());
129+
SparkBasicSchemaEvolution evolution = new SparkBasicSchemaEvolution(dataStructType, structSchema, SQLConf.get().sessionLocalTimeZone());
124130
String readSchemaJson = evolution.getRequestSchema().json();
125131
storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, readSchemaJson);
126132
storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), readSchemaJson);
127133
storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(), SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
128134
storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
129-
ParquetReader<InternalRow> reader = ParquetReader.builder(new HoodieParquetReadSupport(Option$.MODULE$.empty(), true,
135+
ParquetReader<InternalRow> reader = ParquetReader.builder(new HoodieParquetReadSupport(Option$.MODULE$.empty(), true, true,
130136
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("CORRECTED"),
131-
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY")),
137+
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"), messageSchema),
132138
new Path(path.toUri()))
133139
.withConf(storage.getConf().unwrapAs(Configuration.class))
134140
.build();
@@ -139,15 +145,22 @@ public ClosableIterator<UnsafeRow> getUnsafeRowIterator(StructType requestedSche
139145
return projectedIterator;
140146
}
141147

148+
private MessageType getFileSchema() {
149+
if (fileSchemaOption.isEmpty()) {
150+
MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(storage, path);
151+
fileSchemaOption = Option.of(messageType);
152+
}
153+
return fileSchemaOption.get();
154+
}
155+
142156
@Override
143157
public Schema getSchema() {
144158
if (schemaOption.isEmpty()) {
145159
// Some types in avro are not compatible with parquet.
146160
// Avro only supports representing Decimals as fixed byte array
147161
// and therefore if we convert to Avro directly we'll lose logical type-info.
148-
MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(storage, path);
149-
StructType structType = new ParquetToSparkSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(messageType);
150-
structTypeOption = Option.of(structType);
162+
MessageType messageType = getFileSchema();
163+
StructType structType = getStructSchema();
151164
schemaOption = Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter()
152165
.getAvroSchemaConverters()
153166
.toAvroType(structType, true, messageType.getName(), StringUtils.EMPTY_STRING));
@@ -157,11 +170,16 @@ public Schema getSchema() {
157170

158171
protected StructType getStructSchema() {
159172
if (structTypeOption.isEmpty()) {
160-
getSchema();
173+
MessageType messageType = getFileSchema();
174+
structTypeOption = Option.of(convertToStruct(messageType));
161175
}
162176
return structTypeOption.get();
163177
}
164178

179+
private StructType convertToStruct(MessageType messageType) {
180+
return new ParquetToSparkSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(messageType);
181+
}
182+
165183
@Override
166184
public void close() {
167185
readerIterators.forEach(ClosableIterator::close);

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
import scala.Enumeration;
7777
import scala.Function1;
7878

79-
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
79+
import static org.apache.hudi.avro.AvroSchemaUtils.getNonNullTypeFromUnion;
8080
import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_FIELD_ID_WRITE_ENABLED;
8181
import static org.apache.hudi.config.HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD;
8282
import static org.apache.hudi.config.HoodieWriteConfig.AVRO_SCHEMA_STRING;
@@ -226,7 +226,7 @@ private void writeFields(InternalRow row, StructType schema, ValueWriter[] field
226226
}
227227

228228
private ValueWriter makeWriter(Schema avroSchema, DataType dataType) {
229-
Schema resolvedSchema = avroSchema == null ? null : resolveNullableSchema(avroSchema);
229+
Schema resolvedSchema = avroSchema == null ? null : getNonNullTypeFromUnion(avroSchema);
230230
LogicalType logicalType = resolvedSchema != null ? resolvedSchema.getLogicalType() : null;
231231

232232
if (dataType == DataTypes.BooleanType) {
@@ -429,7 +429,7 @@ private Type convertField(Schema avroFieldSchema, StructField structField) {
429429
}
430430

431431
private Type convertField(Schema avroFieldSchema, StructField structField, Type.Repetition repetition) {
432-
Schema resolvedSchema = avroFieldSchema == null ? null : resolveNullableSchema(avroFieldSchema);
432+
Schema resolvedSchema = avroFieldSchema == null ? null : getNonNullTypeFromUnion(avroFieldSchema);
433433
LogicalType logicalType = resolvedSchema != null ? resolvedSchema.getLogicalType() : null;
434434

435435
DataType dataType = structField.dataType();

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ object AvroConversionUtils {
101101
recordNamespace: String): Row => GenericRecord = {
102102
val serde = getCatalystRowSerDe(sourceSqlType)
103103
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(sourceSqlType, structName, recordNamespace)
104-
val nullable = AvroSchemaUtils.resolveNullableSchema(avroSchema) != avroSchema
104+
val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(avroSchema) != avroSchema
105105

106106
val converter = AvroConversionUtils.createInternalRowToAvroConverter(sourceSqlType, avroSchema, nullable)
107107

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
9191
// making Spark deserialize its internal representation [[InternalRow]] into [[Row]] for subsequent conversion
9292
// (and back)
9393
val sameSchema = writerAvroSchema.equals(readerAvroSchema)
94-
val nullable = AvroSchemaUtils.resolveNullableSchema(writerAvroSchema) != writerAvroSchema
94+
val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(writerAvroSchema) != writerAvroSchema
9595

9696
// NOTE: We have to serialize Avro schema, and then subsequently parse it on the executor node, since Spark
9797
// serializer is not able to digest it
@@ -160,7 +160,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
160160
// making Spark deserialize its internal representation [[InternalRow]] into [[Row]] for subsequent conversion
161161
// (and back)
162162
val sameSchema = writerAvroSchema.equals(readerAvroSchema)
163-
val nullable = AvroSchemaUtils.resolveNullableSchema(writerAvroSchema) != writerAvroSchema
163+
val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(writerAvroSchema) != writerAvroSchema
164164

165165
// NOTE: We have to serialize Avro schema, and then subsequently parse it on the executor node, since Spark
166166
// serializer is not able to digest it

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator
3333
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, HoodieSparkParquetReader}
3434
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath}
3535
import org.apache.hudi.util.CloseableInternalRowIterator
36+
37+
import org.apache.parquet.avro.AvroSchemaConverter
38+
import org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter
3639
import org.apache.spark.sql.HoodieInternalRowUtils
3740
import org.apache.spark.sql.catalyst.InternalRow
3841
import org.apache.spark.sql.catalyst.expressions.JoinedRow
@@ -68,26 +71,35 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
6871
override def getFileRecordIterator(filePath: StoragePath,
6972
start: Long,
7073
length: Long,
71-
dataSchema: Schema,
74+
dataSchema: Schema, // dataSchema refers to table schema in most cases(non log file reads).
7275
requiredSchema: Schema,
7376
storage: HoodieStorage): ClosableIterator[InternalRow] = {
7477
val hasRowIndexField = AvroSchemaUtils.containsFieldInSchema(requiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME)
7578
if (hasRowIndexField) {
7679
assert(getRecordContext.supportsParquetRowIndex())
7780
}
78-
val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
7981
if (FSUtils.isLogFile(filePath)) {
8082
new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
81-
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(structType).asInstanceOf[ClosableIterator[InternalRow]]
83+
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema).asInstanceOf[ClosableIterator[InternalRow]]
8284
} else {
85+
val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
8386
// partition value is empty because the spark parquet reader will append the partition columns to
8487
// each row if they are given. That is the only usage of the partition values in the reader.
8588
val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
8689
.createPartitionedFile(InternalRow.empty, filePath, start, length)
8790
val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType, hasRowIndexField)
91+
92+
// Convert Avro dataSchema to Parquet MessageType for timestamp precision conversion
93+
val tableSchemaOpt = if (dataSchema != null) {
94+
val hadoopConf = storage.getConf.unwrapAs(classOf[Configuration])
95+
val parquetSchema = getAvroSchemaConverter(hadoopConf).convert(dataSchema)
96+
org.apache.hudi.common.util.Option.of(parquetSchema)
97+
} else {
98+
org.apache.hudi.common.util.Option.empty[org.apache.parquet.schema.MessageType]()
99+
}
88100
new CloseableInternalRowIterator(baseFileReader.read(fileInfo,
89101
readSchema, StructType(Seq.empty), getSchemaHandler.getInternalSchemaOpt,
90-
readFilters, storage.getConf.asInstanceOf[StorageConfiguration[Configuration]]))
102+
readFilters, storage.getConf.asInstanceOf[StorageConfiguration[Configuration]], tableSchemaOpt))
91103
}
92104
}
93105

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkColumnarFileReader.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
2323
import org.apache.hudi.common.util
2424
import org.apache.hudi.internal.schema.InternalSchema
2525
import org.apache.hudi.storage.StorageConfiguration
26+
import org.apache.parquet.schema.MessageType
2627
import org.apache.spark.sql.catalyst.InternalRow
2728
import org.apache.spark.sql.sources.Filter
2829
import org.apache.spark.sql.types.StructType
@@ -37,12 +38,14 @@ trait SparkColumnarFileReader extends Serializable {
3738
* @param internalSchemaOpt option of internal schema for schema.on.read
3839
* @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters.
3940
* @param storageConf the hadoop conf
41+
* @param tableSchemaOpt option of table schema for timestamp precision conversion
4042
* @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]]
4143
*/
4244
def read(file: PartitionedFile,
4345
requiredSchema: StructType,
4446
partitionSchema: StructType,
4547
internalSchemaOpt: util.Option[InternalSchema],
4648
filters: Seq[Filter],
47-
storageConf: StorageConfiguration[Configuration]): Iterator[InternalRow]
49+
storageConf: StorageConfiguration[Configuration],
50+
tableSchemaOpt: util.Option[MessageType] = util.Option.empty()): Iterator[InternalRow]
4851
}

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.metadata.FileMetaData
2424
import org.apache.spark.sql.HoodieSchemaUtils
2525
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
2626
import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, Attribute, Cast, CreateNamedStruct, CreateStruct, Expression, GetStructField, LambdaFunction, Literal, MapEntries, MapFromEntries, NamedLambdaVariable, UnsafeProjection}
27-
import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType}
27+
import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampNTZType}
2828

2929
object HoodieParquetFileFormatHelper {
3030

@@ -58,6 +58,9 @@ object HoodieParquetFileFormatHelper {
5858
def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean = (requiredType, fileType) match {
5959
case (requiredType, fileType) if requiredType == fileType => true
6060

61+
// prevent illegal cast
62+
case (TimestampNTZType, LongType) => true
63+
6164
case (ArrayType(rt, _), ArrayType(ft, _)) =>
6265
// Do not care about nullability as schema evolution require fields to be nullable
6366
isDataTypeEqual(rt, ft)

0 commit comments

Comments
 (0)