From 9ac2ff8edb7d0549c4842be997cab788a6f0993a Mon Sep 17 00:00:00 2001 From: Rahil Chertara Date: Thu, 23 Oct 2025 10:17:56 -0700 Subject: [PATCH 01/10] Ensure MOR table works, with lance base files and avro logs file --- .../apache/hudi/util/CommonClientUtils.java | 1 + .../hudi/common/model/HoodieSparkRecord.java | 11 +- .../apache/hudi/HoodieSparkSqlWriter.scala | 3 - .../hudi/functional/TestLanceDataSource.scala | 118 ++++++++++++------ 4 files changed, 89 insertions(+), 44 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java index 418d603c86e40..cd95b7ba5e133 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java @@ -108,6 +108,7 @@ public static HoodieLogBlock.HoodieLogBlockType getLogBlockType(HoodieWriteConfi } HoodieFileFormat baseFileFormat = getBaseFileFormat(writeConfig, tableConfig); switch (getBaseFileFormat(writeConfig, tableConfig)) { + case LANCE: case PARQUET: case ORC: return HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java index 4b4ac3464374f..a83eb4b9e5658 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java @@ -22,6 +22,7 @@ import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.SparkFileFormatInternalRecordContext; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.model.HoodieInternalRow; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaType; @@ -327,7 +328,15 @@ public Option toIndexedRecord(HoodieSchema recordSchema @Override public ByteArrayOutputStream getAvroBytes(HoodieSchema recordSchema, Properties props) throws IOException { - throw new UnsupportedOperationException(); + // Convert Spark InternalRow to Avro GenericRecord + if (data == null) { + throw new IOException("Cannot convert null data to Avro bytes"); + } + StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + GenericRecord avroRecord = AvroConversionUtils + .createInternalRowToAvroConverter(structType, recordSchema.toAvroSchema(), false) + .apply(data); + return HoodieAvroUtils.avroToBytesStream(avroRecord); } @Override diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f17d641edb11e..85d58f8d7a46c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -516,9 +516,6 @@ class HoodieSparkSqlWriterInternal { // scalastyle:on val writeConfig = client.getConfig - if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) { - throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.") - } instantTime = client.startCommit(commitActionType) // if table has undergone upgrade, we need to reload table config tableMetaClient.reloadTableConfig() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index 9a634b8eec6df..2027bb66498cc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -28,6 +28,9 @@ import org.apache.spark.sql.{SaveMode, SparkSession} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.condition.DisabledIfSystemProperty +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.collection.JavaConverters._ @@ -51,9 +54,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { spark = null } - @Test - def testBasicWriteAndRead(): Unit = { - val tableName = "test_lance_table" + @ParameterizedTest + @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) + def testBasicWriteAndRead(tableType: String): Unit = { + val tableName = s"test_lance_table_${tableType.toLowerCase}" val tablePath = s"$basePath/$tableName" // Create test data @@ -68,6 +72,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { expectedDf.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -87,9 +92,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { assertTrue(actual.except(expectedDf).isEmpty) } - @Test - def testSchemaProjection(): Unit = { - val tableName = "test_lance_projection" + @ParameterizedTest + @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) + def testSchemaProjection(tableType: String): Unit = { + val tableName = s"test_lance_projection_${tableType.toLowerCase}" val tablePath = s"$basePath/$tableName" // Create test data with multiple columns @@ -104,6 +110,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { inputDf.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -133,9 +140,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { assertTrue(actual.except(expectedDf).isEmpty) } - @Test - def testWhereClauseFiltering(): Unit = { - val tableName = "test_lance_where" + @ParameterizedTest + @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) + def testWhereClauseFiltering(tableType: String): Unit = { + val tableName = s"test_lance_where_${tableType.toLowerCase}" val tablePath = s"$basePath/$tableName" // Create test data @@ -152,6 +160,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -206,9 +215,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { assertTrue(filteredComplex.except(expectedFilteredComplex).isEmpty) } - @Test - def testMultipleBulkInsertsWithCommitValidation(): Unit = { - val tableName = "test_lance_multiple_inserts" + @ParameterizedTest + @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) + def testMultipleBulkInsertsWithCommitValidation(tableType: String): Unit = { + val tableName = s"test_lance_multiple_inserts_${tableType.toLowerCase}" val tablePath = s"$basePath/$tableName" // First insert - records 1-3 @@ -222,6 +232,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df1.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -242,6 +253,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df2.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -262,6 +274,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df3.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -284,11 +297,6 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList assertEquals(3, commits.size, "Should have exactly 3 commits") - // Check that each commit is a COMMIT action (bulk_insert creates COMMIT actions) - commits.foreach { instant => - assertEquals("commit", instant.getAction, s"Instant ${instant.requestedTime()} should be a commit action") - } - // Read back all data and verify total record count val readDf = spark.read .format("hudi") @@ -312,9 +320,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { assertTrue(actual.except(expectedDf).isEmpty) } - @Test - def testTimeTravel(): Unit = { - val tableName = "test_lance_time_travel" + @ParameterizedTest + @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) + def testTimeTravel(tableType: String): Unit = { + val tableName = s"test_lance_time_travel_${tableType.toLowerCase}" val tablePath = s"$basePath/$tableName" // First insert - records 1-3 @@ -328,12 +337,12 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df1.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) .option(HoodieWriteConfig.TBL_NAME.key(), tableName) .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") .mode(SaveMode.Overwrite) .save(tablePath) @@ -348,12 +357,12 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df2.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) .option(HoodieWriteConfig.TBL_NAME.key(), tableName) .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") .mode(SaveMode.Append) .save(tablePath) @@ -378,12 +387,12 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df3.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) .option(HoodieWriteConfig.TBL_NAME.key(), tableName) .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") .mode(SaveMode.Append) .save(tablePath) @@ -408,9 +417,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { assertTrue(actual.except(expectedDf).isEmpty) } - @Test - def testMultipleRegularInsertsWithCommitValidation(): Unit = { - val tableName = "test_lance_regular_inserts" + @ParameterizedTest + @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) + def testMultipleRegularInsertsWithCommitValidation(tableType: String): Unit = { + val tableName = s"test_lance_regular_inserts_${tableType.toLowerCase}" val tablePath = s"$basePath/$tableName" // First insert - records 1-3 using regular insert @@ -424,6 +434,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df1.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -444,6 +455,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df2.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -466,9 +478,11 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList assertEquals(2, commits.size, "Should have exactly 2 commits") - // Check that each commit is a COMMIT action (insert creates COMMIT actions) + // Verify commit action types based on table type + val expectedAction = if (tableType == "COPY_ON_WRITE") "commit" else "deltacommit" commits.foreach { instant => - assertEquals("commit", instant.getAction, s"Instant ${instant.requestedTime()} should be a commit action") + assertEquals(expectedAction, instant.getAction, + s"Instant ${instant.requestedTime()} should be a $expectedAction action for $tableType table") } // Read back all data and verify total record count @@ -491,9 +505,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { assertTrue(actual.except(expectedDf).isEmpty) } - @Test - def testBasicUpsertModifyExistingRow(): Unit = { - val tableName = "test_lance_upsert" + @ParameterizedTest + @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) + def testBasicUpsertModifyExistingRow(tableType: String): Unit = { + val tableName = s"test_lance_upsert_${tableType.toLowerCase}" val tablePath = s"$basePath/$tableName" // Initial insert - 3 records @@ -507,6 +522,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df1.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -525,6 +541,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df2.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -544,6 +561,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df3.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -562,6 +580,14 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val commitCount = metaClient.getCommitsTimeline.filterCompletedInstants().countInstants() assertEquals(3, commitCount, "Should have 3 completed commits (insert + 2 upserts)") + // Verify commit action types based on table type + val expectedAction = if (tableType == "COPY_ON_WRITE") "commit" else "deltacommit" + val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.iterator().asScala.toList + commits.foreach { instant => + assertEquals(expectedAction, instant.getAction, + s"Instant ${instant.requestedTime()} should be a $expectedAction action for $tableType table") + } + // Read and verify data val readDf = spark.read.format("hudi").load(tablePath) val actual = readDf.select("id", "name", "age", "score") @@ -577,9 +603,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { assertTrue(actual.except(expectedDf).isEmpty) } - @Test - def testBasicDeleteOperation(): Unit = { - val tableName = "test_lance_delete" + @ParameterizedTest + @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) + def testBasicDeleteOperation(tableType: String): Unit = { + val tableName = s"test_lance_delete_${tableType.toLowerCase}" val tablePath = s"$basePath/$tableName" // Initial insert - 5 records @@ -595,6 +622,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df1.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -615,6 +643,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { deleteDF.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -633,6 +662,14 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val commitCount = metaClient.getCommitsTimeline.filterCompletedInstants().countInstants() assertEquals(2, commitCount, "Should have 2 completed commits (insert + delete)") + // Verify commit action types based on table type + val expectedAction = if (tableType == "COPY_ON_WRITE") "commit" else "deltacommit" + val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.iterator().asScala.toList + commits.foreach { instant => + assertEquals(expectedAction, instant.getAction, + s"Instant ${instant.requestedTime()} should be a $expectedAction action for $tableType table") + } + // Read and verify data val readDf = spark.read.format("hudi").load(tablePath) val actual = readDf.select("id", "name", "age", "score") @@ -647,9 +684,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { assertTrue(actual.except(expectedDf).isEmpty) } - @Test - def testIncrementalQuery(): Unit = { - val tableName = "test_lance_incremental" + @ParameterizedTest + @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) + def testIncrementalQuery(tableType: String): Unit = { + val tableName = s"test_lance_incremental_${tableType.toLowerCase}" val tablePath = s"$basePath/$tableName" // First insert - records 1-3 @@ -663,12 +701,12 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df1.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) .option(HoodieWriteConfig.TBL_NAME.key(), tableName) .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") .mode(SaveMode.Overwrite) .save(tablePath) @@ -683,12 +721,12 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df2.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) .option(HoodieWriteConfig.TBL_NAME.key(), tableName) .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") .mode(SaveMode.Append) .save(tablePath) @@ -713,12 +751,12 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df3.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) .option(HoodieWriteConfig.TBL_NAME.key(), tableName) .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") .mode(SaveMode.Append) .save(tablePath) From a81c6943f7fc37f46ce8f06057ed25a49079f95a Mon Sep 17 00:00:00 2001 From: Rahil Chertara Date: Fri, 2 Jan 2026 06:12:35 -0800 Subject: [PATCH 02/10] fix style --- .../scala/org/apache/hudi/functional/TestLanceDataSource.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index 2027bb66498cc..54e74da0ac977 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.{SaveMode, SparkSession} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.condition.DisabledIfSystemProperty -import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource From c496518f9ac1811bc8b547ac6122d7d0fc29ea9a Mon Sep 17 00:00:00 2001 From: Rahil Chertara Date: Fri, 2 Jan 2026 09:26:43 -0800 Subject: [PATCH 03/10] minor --- .../org/apache/hudi/functional/TestLanceDataSource.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index 54e74da0ac977..a5b423a465830 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -550,7 +550,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { .mode(SaveMode.Append) .save(tablePath) - // Second upsert - modify Alice (id=1) and insert David (id=4) + // Second upsert - modify Alice (id=1) and insert David (id=4) made it pass first upsert val records3 = Seq( (1, "Alice", 45, 98.5), // Update Alice: age 30->45, score 95.5->98.5 (4, "David", 28, 88.0) // Insert new record @@ -598,7 +598,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (4, "David", 28, 88.0) )).toDF("id", "name", "age", "score") - assertTrue(expectedDf.except(actual).isEmpty) + assertTrue(expectedDf.except(actual).isEmpty) //fails here assertTrue(actual.except(expectedDf).isEmpty) } From ff1bcfdb21672c6ea9249669624ba4472143a4b0 Mon Sep 17 00:00:00 2001 From: Rahil Chertara Date: Fri, 2 Jan 2026 10:32:30 -0800 Subject: [PATCH 04/10] version downgrade in lance spark and lance core due to arrow issue for MOR --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 1dbfc302754fb..82c0a9bb93665 100644 --- a/pom.xml +++ b/pom.xml @@ -241,8 +241,8 @@ 2.1.1 1.1.10.7 18.3.0 - 0.39.0 - 0.0.15 + 0.38.0 + 0.0.13 lance-spark-3.5_${scala.binary.version} false From e56ed4a1bd1b7813124589360a332dc263677ad8 Mon Sep 17 00:00:00 2001 From: Rahil Chertara Date: Fri, 2 Jan 2026 10:40:27 -0800 Subject: [PATCH 05/10] retrigger ci From decc2b35bcfb006b6a76ee74372273bdd9f914a4 Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Wed, 7 Jan 2026 21:24:01 -0500 Subject: [PATCH 06/10] cleanup --- .../org/apache/hudi/functional/TestLanceDataSource.scala | 4 ++-- pom.xml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index a5b423a465830..180160ab752be 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -581,7 +581,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { // Verify commit action types based on table type val expectedAction = if (tableType == "COPY_ON_WRITE") "commit" else "deltacommit" - val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.iterator().asScala.toList + val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala commits.foreach { instant => assertEquals(expectedAction, instant.getAction, s"Instant ${instant.requestedTime()} should be a $expectedAction action for $tableType table") @@ -663,7 +663,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { // Verify commit action types based on table type val expectedAction = if (tableType == "COPY_ON_WRITE") "commit" else "deltacommit" - val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.iterator().asScala.toList + val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala commits.foreach { instant => assertEquals(expectedAction, instant.getAction, s"Instant ${instant.requestedTime()} should be a $expectedAction action for $tableType table") diff --git a/pom.xml b/pom.xml index 82c0a9bb93665..1dbfc302754fb 100644 --- a/pom.xml +++ b/pom.xml @@ -241,8 +241,8 @@ 2.1.1 1.1.10.7 18.3.0 - 0.38.0 - 0.0.13 + 0.39.0 + 0.0.15 lance-spark-3.5_${scala.binary.version} false From b427e30089cf93530c4f0bbc638b76d83dfb9339 Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Thu, 8 Jan 2026 10:01:46 -0500 Subject: [PATCH 07/10] add compaction validation --- .../hudi/functional/TestLanceDataSource.scala | 59 +++++++++++++++---- 1 file changed, 49 insertions(+), 10 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index 180160ab752be..ec94e64ed6730 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -19,6 +19,7 @@ package org.apache.hudi.functional import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DefaultSparkRecordMerger +import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.config.HoodieWriteConfig @@ -29,7 +30,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.condition.DisabledIfSystemProperty import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.{EnumSource, ValueSource} import scala.collection.JavaConverters._ @@ -505,9 +506,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { } @ParameterizedTest - @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) - def testBasicUpsertModifyExistingRow(tableType: String): Unit = { - val tableName = s"test_lance_upsert_${tableType.toLowerCase}" + @EnumSource(value = classOf[HoodieTableType]) + def testBasicUpsertModifyExistingRow(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_upsert_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // Initial insert - 3 records @@ -521,7 +522,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df1.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) + .option(TABLE_TYPE.key(), tableType.name()) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -540,7 +541,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df2.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) + .option(TABLE_TYPE.key(), tableType.name()) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -550,7 +551,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { .mode(SaveMode.Append) .save(tablePath) - // Second upsert - modify Alice (id=1) and insert David (id=4) made it pass first upsert + // Second upsert - modify Alice (id=1) and insert David (id=4) val records3 = Seq( (1, "Alice", 45, 98.5), // Update Alice: age 30->45, score 95.5->98.5 (4, "David", 28, 88.0) // Insert new record @@ -560,7 +561,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { df3.write .format("hudi") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) + .option(TABLE_TYPE.key(), tableType.name()) .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) @@ -580,7 +581,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { assertEquals(3, commitCount, "Should have 3 completed commits (insert + 2 upserts)") // Verify commit action types based on table type - val expectedAction = if (tableType == "COPY_ON_WRITE") "commit" else "deltacommit" + val expectedAction = if (tableType == HoodieTableType.COPY_ON_WRITE) "commit" else "deltacommit" val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala commits.foreach { instant => assertEquals(expectedAction, instant.getAction, @@ -598,8 +599,46 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (4, "David", 28, 88.0) )).toDF("id", "name", "age", "score") - assertTrue(expectedDf.except(actual).isEmpty) //fails here + assertTrue(expectedDf.except(actual).isEmpty) assertTrue(actual.except(expectedDf).isEmpty) + + if (tableType == HoodieTableType.MERGE_ON_READ) { + // Write one more commit to trigger compaction + val records4 = Seq( + (1, "Alice", 50, 98.5), // Update Alice: age 45->50 + (4, "David", 28, 90.0) // Update David: score 88.0->90.0 + ) + val df4 = spark.createDataFrame(records4).toDF("id", "name", "age", "score") + df4.write + .format("hudi") + .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType.name()) + .option(RECORDKEY_FIELD.key(), "id") + .option(PRECOMBINE_FIELD.key(), "age") + .option(TABLE_NAME.key(), tableName) + .option(HoodieWriteConfig.TBL_NAME.key(), tableName) + .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) + .option(OPERATION.key(), "upsert") + .option("hoodie.compact.inline", "true") + .option("hoodie.compact.inline.max.delta.commits", "1") + .mode(SaveMode.Append) + .save(tablePath) + val expectedDfAfterCompaction = spark.createDataFrame(Seq( + (1, "Alice", 50, 98.5), + (2, "Bob", 40, 95.0), + (3, "Charlie", 35, 92.1), + (4, "David", 28, 90.0) + )).toDF("id", "name", "age", "score") + // validate compaction commit is present + val compactionCommits = metaClient.reloadActiveTimeline().filterCompletedInstants().getInstants.asScala + .filter(instant => instant.getAction == "commit") + assertTrue(compactionCommits.nonEmpty, "Compaction commit should be present after upsert") + // Read and verify data after compaction + val readDfAfterCompaction = spark.read.format("hudi").load(tablePath) + val actualAfterCompaction = readDfAfterCompaction.select("id", "name", "age", "score") + assertTrue(expectedDfAfterCompaction.except(actualAfterCompaction).isEmpty) + assertTrue(actualAfterCompaction.except(expectedDfAfterCompaction).isEmpty) + } } @ParameterizedTest From 78829f145fb90a7c5915dd7229405c181973d46a Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Thu, 8 Jan 2026 11:04:03 -0500 Subject: [PATCH 08/10] refactor test to reduce code duplication, add clustering validation, fix writer bug --- .../io/storage/HoodieSparkLanceWriter.java | 15 +- .../hudi/functional/TestLanceDataSource.scala | 468 +++++++----------- 2 files changed, 174 insertions(+), 309 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java index 67e07d162ec1a..99bd3916fe922 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java @@ -18,8 +18,6 @@ package org.apache.hudi.io.storage; -import com.lancedb.lance.spark.arrow.LanceArrowWriter; -import org.apache.arrow.vector.types.pojo.Schema; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -27,6 +25,9 @@ import org.apache.hudi.io.storage.row.HoodieInternalRowFileWriter; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; + +import com.lancedb.lance.spark.arrow.LanceArrowWriter; +import org.apache.arrow.vector.types.pojo.Schema; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.LanceArrowUtils; @@ -113,26 +114,26 @@ public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws IOExcept if (populateMetaFields) { UTF8String recordKey = UTF8String.fromString(key.getRecordKey()); updateRecordMetadata(row, recordKey, key.getPartitionPath(), getWrittenRecordCount()); - super.write(row); + super.write(row.copy()); } else { - super.write(row); + super.write(row.copy()); } } @Override public void writeRow(String recordKey, InternalRow row) throws IOException { - super.write(row); + super.write(row.copy()); } @Override public void writeRow(UTF8String key, InternalRow row) throws IOException { // Key reserved for future bloom filter support (https://github.com/apache/hudi/issues/17664) - super.write(row); + super.write(row.copy()); } @Override public void writeRow(InternalRow row) throws IOException { - super.write(row); + super.write(row.copy()); } @Override diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index ec94e64ed6730..c9e1083bb5501 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -25,12 +25,12 @@ import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.testutils.HoodieSparkClientTestBase -import org.apache.spark.sql.{SaveMode, SparkSession} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.junit.jupiter.api.{AfterEach, BeforeEach} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.condition.DisabledIfSystemProperty import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{EnumSource, ValueSource} +import org.junit.jupiter.params.provider.EnumSource import scala.collection.JavaConverters._ @@ -55,9 +55,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { } @ParameterizedTest - @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) - def testBasicWriteAndRead(tableType: String): Unit = { - val tableName = s"test_lance_table_${tableType.toLowerCase}" + @EnumSource(value = classOf[HoodieTableType]) + def testBasicWriteAndRead(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_table_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // Create test data @@ -66,20 +66,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1) ) - val expectedDf = spark.createDataFrame(records).toDF("id", "name", "age", "score") + val expectedDf = createDataFrame(records) // Write to Hudi table with Lance base file format - expectedDf.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, expectedDf, saveMode = SaveMode.Overwrite) // Read back and verify val readDf = spark.read @@ -93,9 +83,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { } @ParameterizedTest - @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) - def testSchemaProjection(tableType: String): Unit = { - val tableName = s"test_lance_projection_${tableType.toLowerCase}" + @EnumSource(value = classOf[HoodieTableType]) + def testSchemaProjection(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_projection_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // Create test data with multiple columns @@ -107,17 +97,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val inputDf = spark.createDataFrame(records).toDF("id", "name", "age", "score", "department") // Write to Hudi table with Lance format - inputDf.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, inputDf, saveMode = SaveMode.Overwrite) // Read with schema projection - only select subset of columns val readDf = spark.read @@ -141,9 +121,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { } @ParameterizedTest - @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) - def testWhereClauseFiltering(tableType: String): Unit = { - val tableName = s"test_lance_where_${tableType.toLowerCase}" + @EnumSource(value = classOf[HoodieTableType]) + def testWhereClauseFiltering(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_where_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // Create test data @@ -154,20 +134,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (4, "David", 28, 88.9), (5, "Eve", 32, 91.4) ) - val df = spark.createDataFrame(records).toDF("id", "name", "age", "score") + val df = createDataFrame(records) // Write to Hudi table with Lance format - df.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df, saveMode = SaveMode.Overwrite) // Test 1: Simple WHERE clause on numeric column val filteredByAge = spark.read @@ -176,10 +146,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { .where("age > 30") .select("id", "name", "age", "score") - val expectedFilteredByAge = spark.createDataFrame(Seq( + val expectedFilteredByAge = createDataFrame(Seq( (3, "Charlie", 35, 92.1), (5, "Eve", 32, 91.4) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedFilteredByAge.except(filteredByAge).isEmpty) assertTrue(filteredByAge.except(expectedFilteredByAge).isEmpty) @@ -191,9 +161,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { .where("name = 'Bob'") .select("id", "name", "age", "score") - val expectedFilteredByName = spark.createDataFrame(Seq( - (2, "Bob", 25, 87.3) - )).toDF("id", "name", "age", "score") + val expectedFilteredByName = createDataFrame(Seq((2, "Bob", 25, 87.3))) assertTrue(expectedFilteredByName.except(filteredByName).isEmpty) assertTrue(filteredByName.except(expectedFilteredByName).isEmpty) @@ -205,20 +173,20 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { .where("age >= 28 AND score > 90") .select("id", "name", "age", "score") - val expectedFilteredComplex = spark.createDataFrame(Seq( + val expectedFilteredComplex = createDataFrame(Seq( (1, "Alice", 30, 95.5), (3, "Charlie", 35, 92.1), (5, "Eve", 32, 91.4) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedFilteredComplex.except(filteredComplex).isEmpty) assertTrue(filteredComplex.except(expectedFilteredComplex).isEmpty) } @ParameterizedTest - @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) - def testMultipleBulkInsertsWithCommitValidation(tableType: String): Unit = { - val tableName = s"test_lance_multiple_inserts_${tableType.toLowerCase}" + @EnumSource(value = classOf[HoodieTableType]) + def testMultipleBulkInsertsWithCommitValidation(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_multiple_inserts_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // First insert - records 1-3 @@ -227,20 +195,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1) ) - val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", "score") + val df1 = createDataFrame(records1) - df1.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df1, operation = Some("bulk_insert")) // Second insert - records 4-6 val records2 = Seq( @@ -248,20 +205,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (5, "Eve", 32, 91.4), (6, "Frank", 27, 85.7) ) - val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", "score") + val df2 = createDataFrame(records2) - df2.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df2, operation = Some("bulk_insert")) // Third insert - records 7-9 val records3 = Seq( @@ -269,20 +215,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (8, "Henry", 31, 89.6), (9, "Iris", 26, 94.8) ) - val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", "score") + val df3 = createDataFrame(records3) - df3.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df3, operation = Some("bulk_insert")) // Validate number of commits matches number of inserts val metaClient = HoodieTableMetaClient.builder() @@ -304,7 +239,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val actual = readDf.select("id", "name", "age", "score") - val expectedDf = spark.createDataFrame(Seq( + val expectedDf = createDataFrame(Seq( (1, "Alice", 30, 95.5), (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1), @@ -314,16 +249,16 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (7, "Grace", 29, 93.2), (8, "Henry", 31, 89.6), (9, "Iris", 26, 94.8) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedDf.except(actual).isEmpty) assertTrue(actual.except(expectedDf).isEmpty) } @ParameterizedTest - @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) - def testTimeTravel(tableType: String): Unit = { - val tableName = s"test_lance_time_travel_${tableType.toLowerCase}" + @EnumSource(value = classOf[HoodieTableType]) + def testTimeTravel(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_time_travel_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // First insert - records 1-3 @@ -332,19 +267,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1) ) - val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", "score") + val df1 = createDataFrame(records1) - df1.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df1, saveMode = SaveMode.Overwrite) // Second insert - records 4-6 val records2 = Seq( @@ -352,19 +277,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (5, "Eve", 32, 91.4), (6, "Frank", 27, 85.7) ) - val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", "score") + val df2 = createDataFrame(records2) - df2.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df2) // Get the commit timestamp after second insert val metaClient = HoodieTableMetaClient.builder() @@ -382,19 +297,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (8, "Henry", 31, 89.6), (9, "Iris", 26, 94.8) ) - val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", "score") + val df3 = createDataFrame(records3) - df3.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df3) // Time travel query to second commit (should see data from c1 + c2 only) val timeTravelDf = spark.read @@ -404,23 +309,23 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val actual = timeTravelDf.select("id", "name", "age", "score") - val expectedDf = spark.createDataFrame(Seq( + val expectedDf = createDataFrame(Seq( (1, "Alice", 30, 95.5), (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1), (4, "David", 28, 88.9), (5, "Eve", 32, 91.4), (6, "Frank", 27, 85.7) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedDf.except(actual).isEmpty) assertTrue(actual.except(expectedDf).isEmpty) } @ParameterizedTest - @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) - def testMultipleRegularInsertsWithCommitValidation(tableType: String): Unit = { - val tableName = s"test_lance_regular_inserts_${tableType.toLowerCase}" + @EnumSource(value = classOf[HoodieTableType]) + def testMultipleRegularInsertsWithCommitValidation(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_regular_inserts_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // First insert - records 1-3 using regular insert @@ -429,20 +334,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1) ) - val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", "score") + val df1 = createDataFrame(records1) - df1.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "insert") - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df1, saveMode = SaveMode.Overwrite, operation = Some("insert")) // Second insert - records 4-6 using regular insert val records2 = Seq( @@ -450,20 +344,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (5, "Eve", 32, 91.4), (6, "Frank", 27, 85.7) ) - val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", "score") + val df2 = createDataFrame(records2) - df2.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "insert") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df2, operation = Some("insert")) // Validate number of commits matches number of inserts val metaClient = HoodieTableMetaClient.builder() @@ -479,7 +362,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { assertEquals(2, commits.size, "Should have exactly 2 commits") // Verify commit action types based on table type - val expectedAction = if (tableType == "COPY_ON_WRITE") "commit" else "deltacommit" + val expectedAction = if (tableType == HoodieTableType.COPY_ON_WRITE) "commit" else "deltacommit" commits.foreach { instant => assertEquals(expectedAction, instant.getAction, s"Instant ${instant.requestedTime()} should be a $expectedAction action for $tableType table") @@ -492,14 +375,14 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val actual = readDf.select("id", "name", "age", "score") - val expectedDf = spark.createDataFrame(Seq( + val expectedDf = createDataFrame(Seq( (1, "Alice", 30, 95.5), (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1), (4, "David", 28, 88.9), (5, "Eve", 32, 91.4), (6, "Frank", 27, 85.7) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedDf.except(actual).isEmpty) assertTrue(actual.except(expectedDf).isEmpty) @@ -517,59 +400,26 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1) ) - val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", "score") + val df1 = createDataFrame(records1) - df1.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType.name()) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "insert") - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df1, saveMode = SaveMode.Overwrite, operation = Some("insert")) // Upsert - modify Bob's record (id=2) val records2 = Seq( (2, "Bob", 40, 95.0) // Update Bob: age 25->40, score 87.3->95.0 ) - val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", "score") + val df2 = createDataFrame(records2) - df2.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType.name()) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "upsert") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df2, operation = Some("upsert")) // Second upsert - modify Alice (id=1) and insert David (id=4) val records3 = Seq( (1, "Alice", 45, 98.5), // Update Alice: age 30->45, score 95.5->98.5 (4, "David", 28, 88.0) // Insert new record ) - val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", "score") + val df3 = createDataFrame(records3) - df3.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType.name()) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "upsert") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df3, operation = Some("upsert")) // Validate commits val metaClient = HoodieTableMetaClient.builder() @@ -592,12 +442,12 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val readDf = spark.read.format("hudi").load(tablePath) val actual = readDf.select("id", "name", "age", "score") - val expectedDf = spark.createDataFrame(Seq( + val expectedDf = createDataFrame(Seq( (1, "Alice", 45, 98.5), (2, "Bob", 40, 95.0), (3, "Charlie", 35, 92.1), (4, "David", 28, 88.0) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedDf.except(actual).isEmpty) assertTrue(actual.except(expectedDf).isEmpty) @@ -608,27 +458,15 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (1, "Alice", 50, 98.5), // Update Alice: age 45->50 (4, "David", 28, 90.0) // Update David: score 88.0->90.0 ) - val df4 = spark.createDataFrame(records4).toDF("id", "name", "age", "score") - df4.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType.name()) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "upsert") - .option("hoodie.compact.inline", "true") - .option("hoodie.compact.inline.max.delta.commits", "1") - .mode(SaveMode.Append) - .save(tablePath) - val expectedDfAfterCompaction = spark.createDataFrame(Seq( + val df4 = createDataFrame(records4) + writeDataframe(tableType, tableName, tablePath, df4, operation = Some("upsert"), + extraOptions = Map("hoodie.compact.inline" -> "true", "hoodie.compact.inline.max.delta.commits" -> "1")) + val expectedDfAfterCompaction = createDataFrame(Seq( (1, "Alice", 50, 98.5), (2, "Bob", 40, 95.0), (3, "Charlie", 35, 92.1), (4, "David", 28, 90.0) - )).toDF("id", "name", "age", "score") + )) // validate compaction commit is present val compactionCommits = metaClient.reloadActiveTimeline().filterCompletedInstants().getInstants.asScala .filter(instant => instant.getAction == "commit") @@ -642,9 +480,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { } @ParameterizedTest - @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) - def testBasicDeleteOperation(tableType: String): Unit = { - val tableName = s"test_lance_delete_${tableType.toLowerCase}" + @EnumSource(value = classOf[HoodieTableType]) + def testBasicDeleteOperation(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_delete_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // Initial insert - 5 records @@ -655,20 +493,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (4, "David", 28, 88.0), (5, "Eve", 32, 91.4) ) - val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", "score") + val df1 = createDataFrame(records1) - df1.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "insert") - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df1, saveMode = SaveMode.Overwrite, operation = Some("insert")) // Delete operation - delete Bob (id=2), David (id=4), and a non-existent key (id=99) val recordsToDelete = Seq( @@ -676,20 +503,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (4, "David", 28, 88.0), // Delete David (exists) (99, "NonExistent", 50, 0.0) // Delete non-existent record (should be no-op) ) - val deleteDF = spark.createDataFrame(recordsToDelete).toDF("id", "name", "age", "score") + val deleteDF = createDataFrame(recordsToDelete) - deleteDF.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "delete") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, deleteDF, operation = Some("delete")) // Validate commits val metaClient = HoodieTableMetaClient.builder() @@ -701,7 +517,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { assertEquals(2, commitCount, "Should have 2 completed commits (insert + delete)") // Verify commit action types based on table type - val expectedAction = if (tableType == "COPY_ON_WRITE") "commit" else "deltacommit" + val expectedAction = if (tableType == HoodieTableType.COPY_ON_WRITE) "commit" else "deltacommit" val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala commits.foreach { instant => assertEquals(expectedAction, instant.getAction, @@ -712,20 +528,20 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val readDf = spark.read.format("hudi").load(tablePath) val actual = readDf.select("id", "name", "age", "score") - val expectedDf = spark.createDataFrame(Seq( + val expectedDf = createDataFrame(Seq( (1, "Alice", 30, 95.5), (3, "Charlie", 35, 92.1), (5, "Eve", 32, 91.4) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedDf.except(actual).isEmpty) assertTrue(actual.except(expectedDf).isEmpty) } @ParameterizedTest - @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) - def testIncrementalQuery(tableType: String): Unit = { - val tableName = s"test_lance_incremental_${tableType.toLowerCase}" + @EnumSource(value = classOf[HoodieTableType]) + def testIncrementalQuery(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_incremental_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // First insert - records 1-3 @@ -734,19 +550,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1) ) - val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", "score") + val df1 = createDataFrame(records1) - df1.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df1, saveMode = SaveMode.Overwrite) // Second insert - records 4-6 val records2 = Seq( @@ -754,19 +560,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (5, "Eve", 32, 91.4), (6, "Frank", 27, 85.7) ) - val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", "score") + val df2 = createDataFrame(records2) - df2.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df2) // Get commit timestamps val metaClient = HoodieTableMetaClient.builder() @@ -784,19 +580,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (8, "Henry", 31, 89.6), (9, "Iris", 26, 94.8) ) - val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", "score") + val df3 = createDataFrame(records3) - df3.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(TABLE_TYPE.key(), tableType) - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df3) // Reload metaClient to get latest commits metaClient.reloadActiveTimeline() @@ -814,13 +600,91 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val actual = incrementalDf.select("id", "name", "age", "score") - val expectedDf = spark.createDataFrame(Seq( + val expectedDf = createDataFrame(Seq( (7, "Grace", 29, 93.2), (8, "Henry", 31, 89.6), (9, "Iris", 26, 94.8) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedDf.except(actual).isEmpty) assertTrue(actual.except(expectedDf).isEmpty) } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testClustering(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_clustering_${tableType.name().toLowerCase}" + val tablePath = s"$basePath/$tableName" + + // Initial insert - 5 records + val records1 = Seq( + (1, "Alice", 30, 95.5), + (2, "Bob", 25, 87.3), + (3, "Charlie", 35, 92.1), + (4, "David", 28, 88.0), + (5, "Eve", 32, 91.4) + ) + val df1 = createDataFrame(records1) + + writeDataframe(tableType, tableName, tablePath, df1, saveMode = SaveMode.Overwrite, operation = Some("bulk_insert")) + + // Second insert - 5 more records + val records2 = Seq( + (6, "Frank", 27, 85.7), + (7, "Grace", 29, 93.2), + (8, "Henry", 31, 89.6), + (9, "Iris", 26, 94.8), + (10, "Jack", 33, 90.5) + ) + val df2 = createDataFrame(records2) + writeDataframe(tableType, tableName, tablePath, df2, operation = Some("bulk_insert"), extraOptions = Map( + "hoodie.clustering.inline" -> "true", + "hoodie.clustering.inline.max.commits" -> "1" + )) + + // Validate that clustering commit is present + val metaClient = HoodieTableMetaClient.builder() + .setConf(HoodieTestUtils.getDefaultStorageConf) + .setBasePath(tablePath) + .build() + assertTrue(metaClient.getActiveTimeline.getLastClusteringInstant.isPresent, "Clustering commit should be present after inline clustering") + + // Read and verify data + val readDf = spark.read.format("hudi").load(tablePath) + val actual = readDf.select("id", "name", "age", "score") + + val expectedDf = createDataFrame(records1 ++ records2) + + assertTrue(expectedDf.except(actual).isEmpty) + assertTrue(actual.except(expectedDf).isEmpty) + } + + private def createDataFrame(records: Seq[(Int, String, Int, Double)]) = { + spark.createDataFrame(records).toDF("id", "name", "age", "score").coalesce(1) + } + + private def writeDataframe(tableType: HoodieTableType, tableName: String, tablePath: String, df: DataFrame, + saveMode: SaveMode = SaveMode.Append, operation: Option[String] = None, + extraOptions: Map[String, String] = Map.empty): Unit = { + var writer = df.write + .format("hudi") + .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType.name()) + .option(RECORDKEY_FIELD.key(), "id") + .option(PRECOMBINE_FIELD.key(), "age") + .option(TABLE_NAME.key(), tableName) + .option(HoodieWriteConfig.TBL_NAME.key(), tableName) + .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) + + // Add operation if specified + writer = operation match { + case Some(op) => writer.option(OPERATION.key(), op) + case None => writer + } + + // Add any extra options + extraOptions.foreach { case (key, value) => writer = writer.option(key, value) } + + writer.mode(saveMode).save(tablePath) + } } From e26ff82f6901eadaad42719988d5b89701f42379 Mon Sep 17 00:00:00 2001 From: Rahil Chertara Date: Sun, 26 Oct 2025 07:40:41 -0700 Subject: [PATCH 09/10] Add schema evolution support for add column --- .../lance/SparkLanceReaderBase.scala | 80 +++++++++++++++---- .../hudi/functional/TestLanceDataSource.scala | 49 ++++++++++++ 2 files changed, 115 insertions(+), 14 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index c5109e0637f77..4cc53d776c42e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -30,10 +30,10 @@ import org.apache.hadoop.conf.Configuration import org.apache.parquet.schema.MessageType import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow} import org.apache.spark.sql.execution.datasources.{PartitionedFile, SparkColumnarFileReader} import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.util.LanceArrowUtils import java.io.IOException @@ -84,9 +84,20 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna // Open Lance file reader val lanceReader = LanceFileReader.open(filePath, allocator) - // Extract column names from required schema for projection - val columnNames: java.util.List[String] = if (requiredSchema.nonEmpty) { - requiredSchema.fields.map(_.name).toList.asJava + // Get schema from Lance file + val arrowSchema = lanceReader.schema() + val fileSchema = LanceArrowUtils.fromArrowSchema(arrowSchema) + + // Read columns that currently exist in the file, as requested col may not be present + val fileFieldNames = fileSchema.fieldNames.toSet + val (existingFields, missingFields) = if (requiredSchema.nonEmpty) { + requiredSchema.fields.partition(f => fileFieldNames.contains(f.name)) + } else { + (Array.empty[StructField], Array.empty[StructField]) + } + + val columnNames: java.util.List[String] = if (existingFields.nonEmpty) { + existingFields.map(_.name).toList.asJava } else { // If only partition columns requested, read minimal data null @@ -95,12 +106,13 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna // Read data with column projection (filters not supported yet) val arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE) - val schemaForIterator = if (requiredSchema.nonEmpty) { - requiredSchema + // Create schema for the data we're actually reading + val readSchema = StructType(existingFields) + + val schemaForIterator = if (readSchema.nonEmpty) { + readSchema } else { - // Only compute schema from Lance file when requiredSchema is empty - val arrowSchema = lanceReader.schema() - LanceArrowUtils.fromArrowSchema(arrowSchema) + fileSchema } // Create iterator using shared LanceRecordIterator @@ -117,17 +129,57 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna _.addTaskCompletionListener[Unit](_ => lanceIterator.close()) ) - // Need to convert to scala iterator for proper reading - val iter = lanceIterator.asScala + + // Need to convert to scala iterator for row merging + val baseIter = lanceIterator.asScala + // Add NULL padding for missing columns for schema evolution + val iterWithNulls = if (missingFields.nonEmpty) { + // Create a row with NULLs for missing columns + val nullRow = new GenericInternalRow(missingFields.length) + for (i <- missingFields.indices) { + nullRow.setNullAt(i) + } + + // Reorder columns to match the requiredSchema order + val fieldIndexMap = requiredSchema.fields.zipWithIndex.map { case (field, idx) => + field.name -> idx + }.toMap + + val existingFieldIndices = existingFields.map(f => fieldIndexMap(f.name)) + val missingFieldIndices = missingFields.map(f => fieldIndexMap(f.name)) + + baseIter.map { row => + // Create result row with correct ordering + val resultRow = new GenericInternalRow(requiredSchema.length) + + // Fill in existing columns + existingFieldIndices.zipWithIndex.foreach { case (targetIdx, sourceIdx) => + if (row.isNullAt(sourceIdx)) { + resultRow.setNullAt(targetIdx) + } else { + resultRow.update(targetIdx, row.get(sourceIdx, existingFields(sourceIdx).dataType)) + } + } + + // Fill in missing columns with NULL + missingFieldIndices.foreach { targetIdx => + resultRow.setNullAt(targetIdx) + } + + resultRow.asInstanceOf[InternalRow] + } + } else { + baseIter.asInstanceOf[Iterator[InternalRow]] + } // Handle partition columns if (partitionSchema.length == 0) { // No partition columns - return rows directly - iter.asInstanceOf[Iterator[InternalRow]] + iterWithNulls } else { // Append partition values to each row using JoinedRow val joinedRow = new JoinedRow() - iter.map(row => joinedRow(row, file.partitionValues)) + iterWithNulls.map(row => joinedRow(row, file.partitionValues)) } } catch { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index c9e1083bb5501..4c45ba6458c44 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -687,4 +687,53 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { writer.mode(saveMode).save(tablePath) } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testSchemaEvolutionAddColumn(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_schema_evolution_${tableType.name.toLowerCase}" + val tablePath = s"$basePath/$tableName" + + // First insert with base schema - columns: (id, name, age, score) + val records1 = Seq( + (1, "Alice", 30, 95.5), + (2, "Bob", 25, 87.3), + (3, "Charlie", 35, 92.1) + ) + + val df1 = createDataFrame(records1) + writeDataframe(tableType, tableName, tablePath, df1, saveMode = SaveMode.Overwrite, operation = Some("insert")) + + // Schema evolution - add new column "email" and upsert existing records + val records2 = Seq( + (1, "Alice", 31, 96.0, "alice@example.com"), + (2, "Bob", 26, 88.0, "bob@example.com") + ) + val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", "score", "email") + writeDataframe(tableType, tableName, tablePath, df2, saveMode = SaveMode.Append, operation = Some("upsert")) + + // Insert a new record with the evolved schema + val records3 = Seq( + (4, "David", 28, 89.5, "david@example.com") + ) + val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", "score", "email") + writeDataframe(tableType, tableName, tablePath, df3, saveMode = SaveMode.Append, operation = Some("upsert")) + + // Read and verify schema evolution + val actual = spark.read + .format("hudi") + .load(tablePath) + .select("id", "name", "age", "score", "email") + + // Expected data after schema evolution + val expectedDf = spark.createDataFrame(Seq( + (1, "Alice", 31, 96.0, "alice@example.com"), + (2, "Bob", 26, 88.0, "bob@example.com"), + (3, "Charlie", 35, 92.1, null), + (4, "David", 28, 89.5, "david@example.com") + )).toDF("id", "name", "age", "score", "email") + + assertTrue(expectedDf.except(actual).isEmpty) + assertTrue(actual.except(expectedDf).isEmpty) + } } From 06f8f2300f14b5c150d8c97999fc6214063588f6 Mon Sep 17 00:00:00 2001 From: Rahil Chertara Date: Thu, 15 Jan 2026 17:06:27 -0800 Subject: [PATCH 10/10] retrigger