diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java index 418d603c86e40..cd95b7ba5e133 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java @@ -108,6 +108,7 @@ public static HoodieLogBlock.HoodieLogBlockType getLogBlockType(HoodieWriteConfi } HoodieFileFormat baseFileFormat = getBaseFileFormat(writeConfig, tableConfig); switch (getBaseFileFormat(writeConfig, tableConfig)) { + case LANCE: case PARQUET: case ORC: return HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java index 4b4ac3464374f..a83eb4b9e5658 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java @@ -22,6 +22,7 @@ import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.SparkFileFormatInternalRecordContext; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.model.HoodieInternalRow; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaType; @@ -327,7 +328,15 @@ public Option toIndexedRecord(HoodieSchema recordSchema @Override public ByteArrayOutputStream getAvroBytes(HoodieSchema recordSchema, Properties props) throws IOException { - throw new UnsupportedOperationException(); + // Convert Spark InternalRow to Avro GenericRecord + if (data == null) { + throw new IOException("Cannot convert null data to Avro bytes"); + } + StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + GenericRecord avroRecord = AvroConversionUtils + .createInternalRowToAvroConverter(structType, recordSchema.toAvroSchema(), false) + .apply(data); + return HoodieAvroUtils.avroToBytesStream(avroRecord); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java index 67e07d162ec1a..99bd3916fe922 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java @@ -18,8 +18,6 @@ package org.apache.hudi.io.storage; -import com.lancedb.lance.spark.arrow.LanceArrowWriter; -import org.apache.arrow.vector.types.pojo.Schema; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -27,6 +25,9 @@ import org.apache.hudi.io.storage.row.HoodieInternalRowFileWriter; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; + +import com.lancedb.lance.spark.arrow.LanceArrowWriter; +import org.apache.arrow.vector.types.pojo.Schema; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.LanceArrowUtils; @@ -113,26 +114,26 @@ public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws IOExcept if (populateMetaFields) { UTF8String recordKey = UTF8String.fromString(key.getRecordKey()); updateRecordMetadata(row, recordKey, key.getPartitionPath(), getWrittenRecordCount()); - super.write(row); + super.write(row.copy()); } else { - super.write(row); + super.write(row.copy()); } } @Override public void writeRow(String recordKey, InternalRow row) throws IOException { - super.write(row); + super.write(row.copy()); } @Override public void writeRow(UTF8String key, InternalRow row) throws IOException { // Key reserved for future bloom filter support (https://github.com/apache/hudi/issues/17664) - super.write(row); + super.write(row.copy()); } @Override public void writeRow(InternalRow row) throws IOException { - super.write(row); + super.write(row.copy()); } @Override diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f17d641edb11e..85d58f8d7a46c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -516,9 +516,6 @@ class HoodieSparkSqlWriterInternal { // scalastyle:on val writeConfig = client.getConfig - if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) { - throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.") - } instantTime = client.startCommit(commitActionType) // if table has undergone upgrade, we need to reload table config tableMetaClient.reloadTableConfig() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index c5109e0637f77..4cc53d776c42e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -30,10 +30,10 @@ import org.apache.hadoop.conf.Configuration import org.apache.parquet.schema.MessageType import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow} import org.apache.spark.sql.execution.datasources.{PartitionedFile, SparkColumnarFileReader} import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.util.LanceArrowUtils import java.io.IOException @@ -84,9 +84,20 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna // Open Lance file reader val lanceReader = LanceFileReader.open(filePath, allocator) - // Extract column names from required schema for projection - val columnNames: java.util.List[String] = if (requiredSchema.nonEmpty) { - requiredSchema.fields.map(_.name).toList.asJava + // Get schema from Lance file + val arrowSchema = lanceReader.schema() + val fileSchema = LanceArrowUtils.fromArrowSchema(arrowSchema) + + // Read columns that currently exist in the file, as requested col may not be present + val fileFieldNames = fileSchema.fieldNames.toSet + val (existingFields, missingFields) = if (requiredSchema.nonEmpty) { + requiredSchema.fields.partition(f => fileFieldNames.contains(f.name)) + } else { + (Array.empty[StructField], Array.empty[StructField]) + } + + val columnNames: java.util.List[String] = if (existingFields.nonEmpty) { + existingFields.map(_.name).toList.asJava } else { // If only partition columns requested, read minimal data null @@ -95,12 +106,13 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna // Read data with column projection (filters not supported yet) val arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE) - val schemaForIterator = if (requiredSchema.nonEmpty) { - requiredSchema + // Create schema for the data we're actually reading + val readSchema = StructType(existingFields) + + val schemaForIterator = if (readSchema.nonEmpty) { + readSchema } else { - // Only compute schema from Lance file when requiredSchema is empty - val arrowSchema = lanceReader.schema() - LanceArrowUtils.fromArrowSchema(arrowSchema) + fileSchema } // Create iterator using shared LanceRecordIterator @@ -117,17 +129,57 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna _.addTaskCompletionListener[Unit](_ => lanceIterator.close()) ) - // Need to convert to scala iterator for proper reading - val iter = lanceIterator.asScala + + // Need to convert to scala iterator for row merging + val baseIter = lanceIterator.asScala + // Add NULL padding for missing columns for schema evolution + val iterWithNulls = if (missingFields.nonEmpty) { + // Create a row with NULLs for missing columns + val nullRow = new GenericInternalRow(missingFields.length) + for (i <- missingFields.indices) { + nullRow.setNullAt(i) + } + + // Reorder columns to match the requiredSchema order + val fieldIndexMap = requiredSchema.fields.zipWithIndex.map { case (field, idx) => + field.name -> idx + }.toMap + + val existingFieldIndices = existingFields.map(f => fieldIndexMap(f.name)) + val missingFieldIndices = missingFields.map(f => fieldIndexMap(f.name)) + + baseIter.map { row => + // Create result row with correct ordering + val resultRow = new GenericInternalRow(requiredSchema.length) + + // Fill in existing columns + existingFieldIndices.zipWithIndex.foreach { case (targetIdx, sourceIdx) => + if (row.isNullAt(sourceIdx)) { + resultRow.setNullAt(targetIdx) + } else { + resultRow.update(targetIdx, row.get(sourceIdx, existingFields(sourceIdx).dataType)) + } + } + + // Fill in missing columns with NULL + missingFieldIndices.foreach { targetIdx => + resultRow.setNullAt(targetIdx) + } + + resultRow.asInstanceOf[InternalRow] + } + } else { + baseIter.asInstanceOf[Iterator[InternalRow]] + } // Handle partition columns if (partitionSchema.length == 0) { // No partition columns - return rows directly - iter.asInstanceOf[Iterator[InternalRow]] + iterWithNulls } else { // Append partition values to each row using JoinedRow val joinedRow = new JoinedRow() - iter.map(row => joinedRow(row, file.partitionValues)) + iterWithNulls.map(row => joinedRow(row, file.partitionValues)) } } catch { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index 9a634b8eec6df..4c45ba6458c44 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -19,15 +19,18 @@ package org.apache.hudi.functional import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DefaultSparkRecordMerger +import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.testutils.HoodieSparkClientTestBase -import org.apache.spark.sql.{SaveMode, SparkSession} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.junit.jupiter.api.{AfterEach, BeforeEach} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.condition.DisabledIfSystemProperty +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource import scala.collection.JavaConverters._ @@ -51,9 +54,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { spark = null } - @Test - def testBasicWriteAndRead(): Unit = { - val tableName = "test_lance_table" + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testBasicWriteAndRead(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_table_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // Create test data @@ -62,19 +66,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1) ) - val expectedDf = spark.createDataFrame(records).toDF("id", "name", "age", "score") + val expectedDf = createDataFrame(records) // Write to Hudi table with Lance base file format - expectedDf.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, expectedDf, saveMode = SaveMode.Overwrite) // Read back and verify val readDf = spark.read @@ -87,9 +82,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { assertTrue(actual.except(expectedDf).isEmpty) } - @Test - def testSchemaProjection(): Unit = { - val tableName = "test_lance_projection" + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testSchemaProjection(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_projection_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // Create test data with multiple columns @@ -101,16 +97,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val inputDf = spark.createDataFrame(records).toDF("id", "name", "age", "score", "department") // Write to Hudi table with Lance format - inputDf.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, inputDf, saveMode = SaveMode.Overwrite) // Read with schema projection - only select subset of columns val readDf = spark.read @@ -133,9 +120,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { assertTrue(actual.except(expectedDf).isEmpty) } - @Test - def testWhereClauseFiltering(): Unit = { - val tableName = "test_lance_where" + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testWhereClauseFiltering(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_where_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // Create test data @@ -146,19 +134,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (4, "David", 28, 88.9), (5, "Eve", 32, 91.4) ) - val df = spark.createDataFrame(records).toDF("id", "name", "age", "score") + val df = createDataFrame(records) // Write to Hudi table with Lance format - df.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df, saveMode = SaveMode.Overwrite) // Test 1: Simple WHERE clause on numeric column val filteredByAge = spark.read @@ -167,10 +146,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { .where("age > 30") .select("id", "name", "age", "score") - val expectedFilteredByAge = spark.createDataFrame(Seq( + val expectedFilteredByAge = createDataFrame(Seq( (3, "Charlie", 35, 92.1), (5, "Eve", 32, 91.4) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedFilteredByAge.except(filteredByAge).isEmpty) assertTrue(filteredByAge.except(expectedFilteredByAge).isEmpty) @@ -182,9 +161,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { .where("name = 'Bob'") .select("id", "name", "age", "score") - val expectedFilteredByName = spark.createDataFrame(Seq( - (2, "Bob", 25, 87.3) - )).toDF("id", "name", "age", "score") + val expectedFilteredByName = createDataFrame(Seq((2, "Bob", 25, 87.3))) assertTrue(expectedFilteredByName.except(filteredByName).isEmpty) assertTrue(filteredByName.except(expectedFilteredByName).isEmpty) @@ -196,19 +173,20 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { .where("age >= 28 AND score > 90") .select("id", "name", "age", "score") - val expectedFilteredComplex = spark.createDataFrame(Seq( + val expectedFilteredComplex = createDataFrame(Seq( (1, "Alice", 30, 95.5), (3, "Charlie", 35, 92.1), (5, "Eve", 32, 91.4) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedFilteredComplex.except(filteredComplex).isEmpty) assertTrue(filteredComplex.except(expectedFilteredComplex).isEmpty) } - @Test - def testMultipleBulkInsertsWithCommitValidation(): Unit = { - val tableName = "test_lance_multiple_inserts" + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testMultipleBulkInsertsWithCommitValidation(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_multiple_inserts_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // First insert - records 1-3 @@ -217,19 +195,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1) ) - val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", "score") + val df1 = createDataFrame(records1) - df1.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df1, operation = Some("bulk_insert")) // Second insert - records 4-6 val records2 = Seq( @@ -237,19 +205,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (5, "Eve", 32, 91.4), (6, "Frank", 27, 85.7) ) - val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", "score") + val df2 = createDataFrame(records2) - df2.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df2, operation = Some("bulk_insert")) // Third insert - records 7-9 val records3 = Seq( @@ -257,19 +215,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (8, "Henry", 31, 89.6), (9, "Iris", 26, 94.8) ) - val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", "score") + val df3 = createDataFrame(records3) - df3.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df3, operation = Some("bulk_insert")) // Validate number of commits matches number of inserts val metaClient = HoodieTableMetaClient.builder() @@ -284,11 +232,6 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList assertEquals(3, commits.size, "Should have exactly 3 commits") - // Check that each commit is a COMMIT action (bulk_insert creates COMMIT actions) - commits.foreach { instant => - assertEquals("commit", instant.getAction, s"Instant ${instant.requestedTime()} should be a commit action") - } - // Read back all data and verify total record count val readDf = spark.read .format("hudi") @@ -296,7 +239,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val actual = readDf.select("id", "name", "age", "score") - val expectedDf = spark.createDataFrame(Seq( + val expectedDf = createDataFrame(Seq( (1, "Alice", 30, 95.5), (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1), @@ -306,15 +249,16 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (7, "Grace", 29, 93.2), (8, "Henry", 31, 89.6), (9, "Iris", 26, 94.8) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedDf.except(actual).isEmpty) assertTrue(actual.except(expectedDf).isEmpty) } - @Test - def testTimeTravel(): Unit = { - val tableName = "test_lance_time_travel" + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testTimeTravel(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_time_travel_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // First insert - records 1-3 @@ -323,19 +267,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1) ) - val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", "score") + val df1 = createDataFrame(records1) - df1.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df1, saveMode = SaveMode.Overwrite) // Second insert - records 4-6 val records2 = Seq( @@ -343,19 +277,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (5, "Eve", 32, 91.4), (6, "Frank", 27, 85.7) ) - val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", "score") + val df2 = createDataFrame(records2) - df2.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df2) // Get the commit timestamp after second insert val metaClient = HoodieTableMetaClient.builder() @@ -373,19 +297,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (8, "Henry", 31, 89.6), (9, "Iris", 26, 94.8) ) - val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", "score") + val df3 = createDataFrame(records3) - df3.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df3) // Time travel query to second commit (should see data from c1 + c2 only) val timeTravelDf = spark.read @@ -395,22 +309,23 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val actual = timeTravelDf.select("id", "name", "age", "score") - val expectedDf = spark.createDataFrame(Seq( + val expectedDf = createDataFrame(Seq( (1, "Alice", 30, 95.5), (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1), (4, "David", 28, 88.9), (5, "Eve", 32, 91.4), (6, "Frank", 27, 85.7) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedDf.except(actual).isEmpty) assertTrue(actual.except(expectedDf).isEmpty) } - @Test - def testMultipleRegularInsertsWithCommitValidation(): Unit = { - val tableName = "test_lance_regular_inserts" + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testMultipleRegularInsertsWithCommitValidation(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_regular_inserts_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // First insert - records 1-3 using regular insert @@ -419,19 +334,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1) ) - val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", "score") + val df1 = createDataFrame(records1) - df1.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "insert") - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df1, saveMode = SaveMode.Overwrite, operation = Some("insert")) // Second insert - records 4-6 using regular insert val records2 = Seq( @@ -439,19 +344,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (5, "Eve", 32, 91.4), (6, "Frank", 27, 85.7) ) - val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", "score") + val df2 = createDataFrame(records2) - df2.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "insert") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df2, operation = Some("insert")) // Validate number of commits matches number of inserts val metaClient = HoodieTableMetaClient.builder() @@ -466,9 +361,11 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList assertEquals(2, commits.size, "Should have exactly 2 commits") - // Check that each commit is a COMMIT action (insert creates COMMIT actions) + // Verify commit action types based on table type + val expectedAction = if (tableType == HoodieTableType.COPY_ON_WRITE) "commit" else "deltacommit" commits.foreach { instant => - assertEquals("commit", instant.getAction, s"Instant ${instant.requestedTime()} should be a commit action") + assertEquals(expectedAction, instant.getAction, + s"Instant ${instant.requestedTime()} should be a $expectedAction action for $tableType table") } // Read back all data and verify total record count @@ -478,22 +375,23 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val actual = readDf.select("id", "name", "age", "score") - val expectedDf = spark.createDataFrame(Seq( + val expectedDf = createDataFrame(Seq( (1, "Alice", 30, 95.5), (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1), (4, "David", 28, 88.9), (5, "Eve", 32, 91.4), (6, "Frank", 27, 85.7) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedDf.except(actual).isEmpty) assertTrue(actual.except(expectedDf).isEmpty) } - @Test - def testBasicUpsertModifyExistingRow(): Unit = { - val tableName = "test_lance_upsert" + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testBasicUpsertModifyExistingRow(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_upsert_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // Initial insert - 3 records @@ -502,56 +400,26 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1) ) - val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", "score") + val df1 = createDataFrame(records1) - df1.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "insert") - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df1, saveMode = SaveMode.Overwrite, operation = Some("insert")) // Upsert - modify Bob's record (id=2) val records2 = Seq( (2, "Bob", 40, 95.0) // Update Bob: age 25->40, score 87.3->95.0 ) - val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", "score") + val df2 = createDataFrame(records2) - df2.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "upsert") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df2, operation = Some("upsert")) // Second upsert - modify Alice (id=1) and insert David (id=4) val records3 = Seq( (1, "Alice", 45, 98.5), // Update Alice: age 30->45, score 95.5->98.5 (4, "David", 28, 88.0) // Insert new record ) - val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", "score") + val df3 = createDataFrame(records3) - df3.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "upsert") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df3, operation = Some("upsert")) // Validate commits val metaClient = HoodieTableMetaClient.builder() @@ -562,24 +430,59 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val commitCount = metaClient.getCommitsTimeline.filterCompletedInstants().countInstants() assertEquals(3, commitCount, "Should have 3 completed commits (insert + 2 upserts)") + // Verify commit action types based on table type + val expectedAction = if (tableType == HoodieTableType.COPY_ON_WRITE) "commit" else "deltacommit" + val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala + commits.foreach { instant => + assertEquals(expectedAction, instant.getAction, + s"Instant ${instant.requestedTime()} should be a $expectedAction action for $tableType table") + } + // Read and verify data val readDf = spark.read.format("hudi").load(tablePath) val actual = readDf.select("id", "name", "age", "score") - val expectedDf = spark.createDataFrame(Seq( + val expectedDf = createDataFrame(Seq( (1, "Alice", 45, 98.5), (2, "Bob", 40, 95.0), (3, "Charlie", 35, 92.1), (4, "David", 28, 88.0) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedDf.except(actual).isEmpty) assertTrue(actual.except(expectedDf).isEmpty) + + if (tableType == HoodieTableType.MERGE_ON_READ) { + // Write one more commit to trigger compaction + val records4 = Seq( + (1, "Alice", 50, 98.5), // Update Alice: age 45->50 + (4, "David", 28, 90.0) // Update David: score 88.0->90.0 + ) + val df4 = createDataFrame(records4) + writeDataframe(tableType, tableName, tablePath, df4, operation = Some("upsert"), + extraOptions = Map("hoodie.compact.inline" -> "true", "hoodie.compact.inline.max.delta.commits" -> "1")) + val expectedDfAfterCompaction = createDataFrame(Seq( + (1, "Alice", 50, 98.5), + (2, "Bob", 40, 95.0), + (3, "Charlie", 35, 92.1), + (4, "David", 28, 90.0) + )) + // validate compaction commit is present + val compactionCommits = metaClient.reloadActiveTimeline().filterCompletedInstants().getInstants.asScala + .filter(instant => instant.getAction == "commit") + assertTrue(compactionCommits.nonEmpty, "Compaction commit should be present after upsert") + // Read and verify data after compaction + val readDfAfterCompaction = spark.read.format("hudi").load(tablePath) + val actualAfterCompaction = readDfAfterCompaction.select("id", "name", "age", "score") + assertTrue(expectedDfAfterCompaction.except(actualAfterCompaction).isEmpty) + assertTrue(actualAfterCompaction.except(expectedDfAfterCompaction).isEmpty) + } } - @Test - def testBasicDeleteOperation(): Unit = { - val tableName = "test_lance_delete" + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testBasicDeleteOperation(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_delete_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // Initial insert - 5 records @@ -590,19 +493,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (4, "David", 28, 88.0), (5, "Eve", 32, 91.4) ) - val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", "score") + val df1 = createDataFrame(records1) - df1.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "insert") - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df1, saveMode = SaveMode.Overwrite, operation = Some("insert")) // Delete operation - delete Bob (id=2), David (id=4), and a non-existent key (id=99) val recordsToDelete = Seq( @@ -610,19 +503,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (4, "David", 28, 88.0), // Delete David (exists) (99, "NonExistent", 50, 0.0) // Delete non-existent record (should be no-op) ) - val deleteDF = spark.createDataFrame(recordsToDelete).toDF("id", "name", "age", "score") + val deleteDF = createDataFrame(recordsToDelete) - deleteDF.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "delete") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, deleteDF, operation = Some("delete")) // Validate commits val metaClient = HoodieTableMetaClient.builder() @@ -633,23 +516,32 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val commitCount = metaClient.getCommitsTimeline.filterCompletedInstants().countInstants() assertEquals(2, commitCount, "Should have 2 completed commits (insert + delete)") + // Verify commit action types based on table type + val expectedAction = if (tableType == HoodieTableType.COPY_ON_WRITE) "commit" else "deltacommit" + val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala + commits.foreach { instant => + assertEquals(expectedAction, instant.getAction, + s"Instant ${instant.requestedTime()} should be a $expectedAction action for $tableType table") + } + // Read and verify data val readDf = spark.read.format("hudi").load(tablePath) val actual = readDf.select("id", "name", "age", "score") - val expectedDf = spark.createDataFrame(Seq( + val expectedDf = createDataFrame(Seq( (1, "Alice", 30, 95.5), (3, "Charlie", 35, 92.1), (5, "Eve", 32, 91.4) - )).toDF("id", "name", "age", "score") + )) assertTrue(expectedDf.except(actual).isEmpty) assertTrue(actual.except(expectedDf).isEmpty) } - @Test - def testIncrementalQuery(): Unit = { - val tableName = "test_lance_incremental" + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testIncrementalQuery(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_incremental_${tableType.name().toLowerCase}" val tablePath = s"$basePath/$tableName" // First insert - records 1-3 @@ -658,19 +550,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (2, "Bob", 25, 87.3), (3, "Charlie", 35, 92.1) ) - val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", "score") + val df1 = createDataFrame(records1) - df1.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") - .mode(SaveMode.Overwrite) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df1, saveMode = SaveMode.Overwrite) // Second insert - records 4-6 val records2 = Seq( @@ -678,19 +560,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (5, "Eve", 32, 91.4), (6, "Frank", 27, 85.7) ) - val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", "score") + val df2 = createDataFrame(records2) - df2.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df2) // Get commit timestamps val metaClient = HoodieTableMetaClient.builder() @@ -708,19 +580,9 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { (8, "Henry", 31, 89.6), (9, "Iris", 26, 94.8) ) - val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", "score") + val df3 = createDataFrame(records3) - df3.write - .format("hudi") - .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") - .option(RECORDKEY_FIELD.key(), "id") - .option(PRECOMBINE_FIELD.key(), "age") - .option(TABLE_NAME.key(), tableName) - .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - .option(OPERATION.key(), "bulk_insert") - .mode(SaveMode.Append) - .save(tablePath) + writeDataframe(tableType, tableName, tablePath, df3) // Reload metaClient to get latest commits metaClient.reloadActiveTimeline() @@ -738,11 +600,138 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val actual = incrementalDf.select("id", "name", "age", "score") - val expectedDf = spark.createDataFrame(Seq( + val expectedDf = createDataFrame(Seq( (7, "Grace", 29, 93.2), (8, "Henry", 31, 89.6), (9, "Iris", 26, 94.8) - )).toDF("id", "name", "age", "score") + )) + + assertTrue(expectedDf.except(actual).isEmpty) + assertTrue(actual.except(expectedDf).isEmpty) + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testClustering(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_clustering_${tableType.name().toLowerCase}" + val tablePath = s"$basePath/$tableName" + + // Initial insert - 5 records + val records1 = Seq( + (1, "Alice", 30, 95.5), + (2, "Bob", 25, 87.3), + (3, "Charlie", 35, 92.1), + (4, "David", 28, 88.0), + (5, "Eve", 32, 91.4) + ) + val df1 = createDataFrame(records1) + + writeDataframe(tableType, tableName, tablePath, df1, saveMode = SaveMode.Overwrite, operation = Some("bulk_insert")) + + // Second insert - 5 more records + val records2 = Seq( + (6, "Frank", 27, 85.7), + (7, "Grace", 29, 93.2), + (8, "Henry", 31, 89.6), + (9, "Iris", 26, 94.8), + (10, "Jack", 33, 90.5) + ) + val df2 = createDataFrame(records2) + writeDataframe(tableType, tableName, tablePath, df2, operation = Some("bulk_insert"), extraOptions = Map( + "hoodie.clustering.inline" -> "true", + "hoodie.clustering.inline.max.commits" -> "1" + )) + + // Validate that clustering commit is present + val metaClient = HoodieTableMetaClient.builder() + .setConf(HoodieTestUtils.getDefaultStorageConf) + .setBasePath(tablePath) + .build() + assertTrue(metaClient.getActiveTimeline.getLastClusteringInstant.isPresent, "Clustering commit should be present after inline clustering") + + // Read and verify data + val readDf = spark.read.format("hudi").load(tablePath) + val actual = readDf.select("id", "name", "age", "score") + + val expectedDf = createDataFrame(records1 ++ records2) + + assertTrue(expectedDf.except(actual).isEmpty) + assertTrue(actual.except(expectedDf).isEmpty) + } + + private def createDataFrame(records: Seq[(Int, String, Int, Double)]) = { + spark.createDataFrame(records).toDF("id", "name", "age", "score").coalesce(1) + } + + private def writeDataframe(tableType: HoodieTableType, tableName: String, tablePath: String, df: DataFrame, + saveMode: SaveMode = SaveMode.Append, operation: Option[String] = None, + extraOptions: Map[String, String] = Map.empty): Unit = { + var writer = df.write + .format("hudi") + .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE") + .option(TABLE_TYPE.key(), tableType.name()) + .option(RECORDKEY_FIELD.key(), "id") + .option(PRECOMBINE_FIELD.key(), "age") + .option(TABLE_NAME.key(), tableName) + .option(HoodieWriteConfig.TBL_NAME.key(), tableName) + .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) + + // Add operation if specified + writer = operation match { + case Some(op) => writer.option(OPERATION.key(), op) + case None => writer + } + + // Add any extra options + extraOptions.foreach { case (key, value) => writer = writer.option(key, value) } + + writer.mode(saveMode).save(tablePath) + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testSchemaEvolutionAddColumn(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_schema_evolution_${tableType.name.toLowerCase}" + val tablePath = s"$basePath/$tableName" + + // First insert with base schema - columns: (id, name, age, score) + val records1 = Seq( + (1, "Alice", 30, 95.5), + (2, "Bob", 25, 87.3), + (3, "Charlie", 35, 92.1) + ) + + val df1 = createDataFrame(records1) + writeDataframe(tableType, tableName, tablePath, df1, saveMode = SaveMode.Overwrite, operation = Some("insert")) + + // Schema evolution - add new column "email" and upsert existing records + val records2 = Seq( + (1, "Alice", 31, 96.0, "alice@example.com"), + (2, "Bob", 26, 88.0, "bob@example.com") + ) + val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", "score", "email") + writeDataframe(tableType, tableName, tablePath, df2, saveMode = SaveMode.Append, operation = Some("upsert")) + + // Insert a new record with the evolved schema + val records3 = Seq( + (4, "David", 28, 89.5, "david@example.com") + ) + val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", "score", "email") + writeDataframe(tableType, tableName, tablePath, df3, saveMode = SaveMode.Append, operation = Some("upsert")) + + // Read and verify schema evolution + val actual = spark.read + .format("hudi") + .load(tablePath) + .select("id", "name", "age", "score", "email") + + // Expected data after schema evolution + val expectedDf = spark.createDataFrame(Seq( + (1, "Alice", 31, 96.0, "alice@example.com"), + (2, "Bob", 26, 88.0, "bob@example.com"), + (3, "Charlie", 35, 92.1, null), + (4, "David", 28, 89.5, "david@example.com") + )).toDF("id", "name", "age", "score", "email") assertTrue(expectedDf.except(actual).isEmpty) assertTrue(actual.except(expectedDf).isEmpty)