Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3f9b9dd
Fix logical timestamp issue
linliu-code Nov 21, 2025
6eb4417
Disable spark2.4 for now
linliu-code Nov 21, 2025
fc00c9f
Fix CI issue
linliu-code Nov 21, 2025
b56c594
Disable spark-scala tests for spark2.x to 3.3
linliu-code Nov 21, 2025
cea2e0b
Remove the unnecessary file
linliu-code Nov 21, 2025
52c59d4
Handle AvroSchemaConverterWithTimestampNTZ
linliu-code Nov 24, 2025
00d784f
Fix validation and integration test failures
linliu-code Nov 25, 2025
26712be
Remove support from spark3.2
linliu-code Nov 25, 2025
49b5250
Disable NTZ convert and see what happens
linliu-code Nov 25, 2025
d59a25b
Fix the CI issues by using maven flags
linliu-code Nov 26, 2025
f1910be
Skip compiling for spark <= 3.1
linliu-code Nov 26, 2025
56c4aee
Remove spark3.2 for NTZ support
linliu-code Nov 27, 2025
2e23bf3
fix(ingest): Repair affected logical timestamp milli tables (#14161)
jonvex Nov 11, 2025
a09cbf6
Fix cherry-pick error
linliu-code Dec 15, 2025
28efea5
Add more changes
linliu-code Dec 16, 2025
1199934
Fix more issues
linliu-code Dec 22, 2025
bdf5261
[HUDI-8235] Adding support for EPOCHMICROSECONDS in TimestampBasedAvr…
sydneybeal Sep 22, 2024
a6a0942
Use spark3.5 for Azure test
linliu-code Dec 30, 2025
820d136
Fix more CI issues again
linliu-code Dec 30, 2025
711876a
Resolve java17 issues
linliu-code Jan 3, 2026
5ef643e
Fix Azure CI and integration tests
linliu-code Jan 3, 2026
ed4eeff
Fix more CI tests
linliu-code Jan 9, 2026
fcc5cda
Resove a few dependency issues
linliu-code Jan 9, 2026
65f6fdd
Add Janino dependency
linliu-code Jan 9, 2026
cc1e856
Fix compiling error
linliu-code Jan 12, 2026
1b966a1
Fix incremental queries
linliu-code Jan 26, 2026
38edcb4
Fix the data skipping bug
linliu-code Jan 28, 2026
61fae65
Pass schema from option instead of global configuration for thread sa…
linliu-code Jan 28, 2026
833aac2
Address partial comments
linliu-code Jan 28, 2026
b16342b
Address more comments
linliu-code Jan 29, 2026
0305c5c
Addressed comments
linliu-code Jan 30, 2026
7a664d3
address wiring comments
linliu-code Jan 30, 2026
235f750
Fix hive related tests
Feb 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -268,25 +268,33 @@ jobs:
distribution: 'temurin'
architecture: x64
cache: maven
- name: Verify Java 17 version
run: |
echo "JAVA_HOME: $JAVA_HOME"
java -version
which java
- name: Quickstart Test
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
run:
run: |
export PATH="$JAVA_HOME/bin:$PATH"
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl hudi-examples/hudi-examples-spark $MVN_ARGS
- name: Java UT - Common & Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
run:
run: |
export PATH="$JAVA_HOME/bin:$PATH"
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- name: Java FT - Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
run:
run: |
export PATH="$JAVA_HOME/bin:$PATH"
mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS

test-spark-java17-scala-tests:
Expand Down Expand Up @@ -323,19 +331,26 @@ jobs:
distribution: 'temurin'
architecture: x64
cache: maven
- name: Verify Java 17 version
run: |
echo "JAVA_HOME: $JAVA_HOME"
java -version
which java
- name: Scala UT - Common & Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
run:
run: |
export PATH="$JAVA_HOME/bin:$PATH"
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- name: Scala FT - Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
run:
run: |
export PATH="$JAVA_HOME/bin:$PATH"
mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS

test-spark-java11-17-java-tests:
Expand Down
6 changes: 3 additions & 3 deletions azure-pipelines-20230430.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ parameters:
default:
- 'hudi-spark-datasource'
- 'hudi-spark-datasource/hudi-spark'
- 'hudi-spark-datasource/hudi-spark3.2.x'
- 'hudi-spark-datasource/hudi-spark3.5.x'
- 'hudi-spark-datasource/hudi-spark3.2plus-common'
- 'hudi-spark-datasource/hudi-spark3-common'
- 'hudi-spark-datasource/hudi-spark-common'
Expand All @@ -85,7 +85,7 @@ parameters:
- '!hudi-flink-datasource/hudi-flink1.18.x'
- '!hudi-spark-datasource'
- '!hudi-spark-datasource/hudi-spark'
- '!hudi-spark-datasource/hudi-spark3.2.x'
- '!hudi-spark-datasource/hudi-spark3.5.x'
- '!hudi-spark-datasource/hudi-spark3.2plus-common'
- '!hudi-spark-datasource/hudi-spark3-common'
- '!hudi-spark-datasource/hudi-spark-common'
Expand Down Expand Up @@ -121,7 +121,7 @@ parameters:
- 'org.apache.spark.sql.hudi.dml'

variables:
BUILD_PROFILES: '-Dscala-2.12 -Dspark3.2 -Dflink1.18'
BUILD_PROFILES: '-Dscala-2.12 -Dspark3.5 -Dflink1.18'
PLUGIN_OPTS: '-Dcheckstyle.skip=true -Drat.skip=true -Djacoco.skip=true -ntp -B -V -Pwarn-log -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.shade=warn -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.dependency=warn'
MVN_OPTS_INSTALL: '-T 3 -Phudi-platform-service -DskipTests $(BUILD_PROFILES) $(PLUGIN_OPTS) -Dmaven.wagon.httpconnectionManager.ttlSeconds=25 -Dmaven.wagon.http.retryHandler.count=5'
MVN_OPTS_TEST: '-fae -Pwarn-log $(BUILD_PROFILES) $(PLUGIN_OPTS)'
Expand Down
46 changes: 46 additions & 0 deletions hudi-client/hudi-client-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,48 @@
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-timeline-service</artifactId>
<version>${project.version}</version>
<!-- Exclude Jetty from timeline-service to use our managed version -->
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced to resolve some conflicts. Will check if we can avoid this or due to some flakiness.

</exclusion>
</exclusions>
</dependency>

<!-- Jetty: Explicitly declare all Jetty dependencies to ensure version alignment -->
<!-- This is critical when running in Spark/Hadoop environments that may have older Jetty versions -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-xml</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
</dependency>

<dependency>
Expand Down Expand Up @@ -172,6 +214,10 @@
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you hep me understand the necessity of this code change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are due to some dependency conflicts. Most likely since we use spark3.5 for Azure CI. I can remove these dependency change to see which compilation or tests fail.

<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.io;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieLogFile;
Expand Down Expand Up @@ -68,10 +69,13 @@ public HoodieMergedReadHandle(HoodieWriteConfig config,
Pair<String, String> partitionPathFileIDPair,
Option<FileSlice> fileSliceOption) {
super(config, instantTime, hoodieTable, partitionPathFileIDPair);
readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
Schema orignalReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
// config.getSchema is not canonicalized, while config.getWriteSchema is canonicalized. So, we have to use the canonicalized schema to read the existing data.
baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField());
fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption : getLatestFileSlice();
// Repair reader schema.
// Assume writer schema should be correct. If not, no repair happens.
readerSchema = AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, baseFileReaderSchema);
}

public List<HoodieRecord<T>> getMergedRecords() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hudi.common.config.TimestampKeyGeneratorConfig.DATE_TIME_PARSER;
Expand All @@ -54,7 +55,7 @@
*/
public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
public enum TimestampType implements Serializable {
UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR
UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, EPOCHMICROSECONDS, SCALAR
}

private final TimeUnit timeUnit;
Expand Down Expand Up @@ -93,6 +94,9 @@ public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException
case EPOCHMILLISECONDS:
timeUnit = MILLISECONDS;
break;
case EPOCHMICROSECONDS:
timeUnit = MICROSECONDS;
break;
case UNIX_TIMESTAMP:
timeUnit = SECONDS;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.action.commit;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -86,7 +87,8 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
HoodieFileReader bootstrapFileReader = null;

Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
Schema readerSchema = baseFileReader.getSchema();
Schema readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema);


// In case Advanced Schema Evolution is enabled we might need to rewrite currently
// persisted records to adhere to an evolved schema
Expand Down
24 changes: 24 additions & 0 deletions hudi-client/hudi-spark-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -281,6 +285,26 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<id>add-spark32plus-parquet-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<skipAddSource>${spark31orEarlier}</skipAddSource>
<sources>
<source>src/parquet/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.spark.sql.types.UserDefinedType;
import org.apache.spark.sql.types.VarcharType;

import java.lang.reflect.Field;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Deque;
Expand All @@ -82,6 +83,21 @@ private SparkInternalSchemaConverter() {
public static final String HOODIE_TABLE_PATH = "hoodie.tablePath";
public static final String HOODIE_VALID_COMMITS_LIST = "hoodie.valid.commits.list";

/**
* Get TimestampNTZType$ using reflection, as it's only available in Spark 3.3+.
* Falls back to TimestampType$ if TimestampNTZType is not available.
*/
private static DataType getTimestampNTZType() {
try {
Class<?> timestampNTZTypeClass = Class.forName("org.apache.spark.sql.types.TimestampNTZType$");
Field moduleField = timestampNTZTypeClass.getField("MODULE$");
return (DataType) moduleField.get(null);
} catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException e) {
// TimestampNTZType is not available in this Spark version, fall back to TimestampType
return TimestampType$.MODULE$;
}
}

public static Type buildTypeFromStructType(DataType sparkType, Boolean firstVisitRoot, AtomicInteger nextId) {
if (sparkType instanceof StructType) {
StructField[] fields = ((StructType) sparkType).fields();
Expand Down Expand Up @@ -267,10 +283,14 @@ private static DataType constructSparkSchemaFromType(Type type) {
case DATE:
return DateType$.MODULE$;
case TIME:
case TIME_MILLIS:
throw new UnsupportedOperationException(String.format("cannot convert %s type to Spark", type));
case TIMESTAMP:
// todo support TimeStampNTZ
case TIMESTAMP_MILLIS:
return TimestampType$.MODULE$;
case LOCAL_TIMESTAMP_MILLIS:
case LOCAL_TIMESTAMP_MICROS:
return getTimestampNTZType();
case STRING:
return StringType$.MODULE$;
case UUID:
Expand Down
Loading
Loading