Skip to content

Commit 236b339

Browse files
committed
Support migrated tables via apache/iceberg-rust#1777.
1 parent 19797f3 commit 236b339

File tree

3 files changed

+99
-7
lines changed

3 files changed

+99
-7
lines changed

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ These settings can be used to determine which parts of the plan are accelerated
269269
| `spark.comet.expression.Reverse.enabled` | Enable Comet acceleration for `Reverse` | true |
270270
| `spark.comet.expression.Round.enabled` | Enable Comet acceleration for `Round` | true |
271271
| `spark.comet.expression.Second.enabled` | Enable Comet acceleration for `Second` | true |
272+
| `spark.comet.expression.Sha1.enabled` | Enable Comet acceleration for `Sha1` | true |
272273
| `spark.comet.expression.Sha2.enabled` | Enable Comet acceleration for `Sha2` | true |
273274
| `spark.comet.expression.ShiftLeft.enabled` | Enable Comet acceleration for `ShiftLeft` | true |
274275
| `spark.comet.expression.ShiftRight.enabled` | Enable Comet acceleration for `ShiftRight` | true |

native/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala

Lines changed: 97 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -786,12 +786,7 @@ class CometIcebergNativeSuite extends CometTestBase {
786786
}
787787
}
788788

789-
// TODO: Re-enable when iceberg-rust supports schema evolution in projections
790-
// Currently iceberg-rust errors when projecting columns that don't exist in old files.
791-
// See: https://github.com/apache/iceberg-rust/blob/main/crates/iceberg/src/arrow/reader.rs#L586-L601
792-
// The strict validation at line 586: `if column_map.len() != leaf_field_ids.len()`
793-
// prevents reading new columns from evolved schemas as NULL values.
794-
ignore("schema evolution - add column") {
789+
test("schema evolution - add column") {
795790
assume(icebergAvailable, "Iceberg not available in classpath")
796791

797792
withTempIcebergDir { warehouseDir =>
@@ -830,6 +825,102 @@ class CometIcebergNativeSuite extends CometTestBase {
830825
}
831826
}
832827

828+
test("schema evolution - drop column") {
829+
assume(icebergAvailable, "Iceberg not available in classpath")
830+
831+
withTempIcebergDir { warehouseDir =>
832+
withSQLConf(
833+
"spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog",
834+
"spark.sql.catalog.test_cat.type" -> "hadoop",
835+
"spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
836+
CometConf.COMET_ENABLED.key -> "true",
837+
CometConf.COMET_EXEC_ENABLED.key -> "true",
838+
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
839+
840+
spark.sql("""
841+
CREATE TABLE test_cat.db.drop_column_test (
842+
id INT,
843+
name STRING,
844+
age INT
845+
) USING iceberg
846+
""")
847+
848+
spark.sql("""
849+
INSERT INTO test_cat.db.drop_column_test VALUES (1, 'Alice', 30), (2, 'Bob', 25)
850+
""")
851+
852+
// Drop the age column
853+
spark.sql("ALTER TABLE test_cat.db.drop_column_test DROP COLUMN age")
854+
855+
// Insert new data without the age column
856+
spark.sql("""
857+
INSERT INTO test_cat.db.drop_column_test VALUES (3, 'Charlie'), (4, 'Diana')
858+
""")
859+
860+
// Read all data - must handle old files (with age) and new files (without age)
861+
checkIcebergNativeScan("SELECT * FROM test_cat.db.drop_column_test ORDER BY id")
862+
checkIcebergNativeScan("SELECT id, name FROM test_cat.db.drop_column_test ORDER BY id")
863+
864+
spark.sql("DROP TABLE test_cat.db.drop_column_test")
865+
}
866+
}
867+
}
868+
869+
test("migration - basic read after migration (fallback for no field ID)") {
870+
assume(icebergAvailable, "Iceberg not available in classpath")
871+
872+
withTempIcebergDir { warehouseDir =>
873+
withSQLConf(
874+
"spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog",
875+
"spark.sql.catalog.test_cat.type" -> "hadoop",
876+
"spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
877+
CometConf.COMET_ENABLED.key -> "true",
878+
CometConf.COMET_EXEC_ENABLED.key -> "true",
879+
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
880+
881+
val sourceName = "parquet_source"
882+
val destName = "test_cat.db.iceberg_dest"
883+
val dataPath = s"${warehouseDir.getAbsolutePath}/source_data"
884+
885+
// Step 1: Create regular Parquet table (without field IDs)
886+
spark
887+
.range(10)
888+
.selectExpr(
889+
"CAST(id AS INT) as id",
890+
"CONCAT('name_', CAST(id AS STRING)) as name",
891+
"CAST(id * 2 AS DOUBLE) as value")
892+
.write
893+
.mode("overwrite")
894+
.option("path", dataPath)
895+
.saveAsTable(sourceName)
896+
897+
// Step 2: Snapshot the Parquet table into Iceberg using SparkActions API
898+
try {
899+
val actionsClass = Class.forName("org.apache.iceberg.spark.actions.SparkActions")
900+
val getMethod = actionsClass.getMethod("get")
901+
val actions = getMethod.invoke(null)
902+
val snapshotMethod = actions.getClass.getMethod("snapshotTable", classOf[String])
903+
val snapshotAction = snapshotMethod.invoke(actions, sourceName)
904+
val asMethod = snapshotAction.getClass.getMethod("as", classOf[String])
905+
val snapshotWithDest = asMethod.invoke(snapshotAction, destName)
906+
val executeMethod = snapshotWithDest.getClass.getMethod("execute")
907+
executeMethod.invoke(snapshotWithDest)
908+
909+
// Step 3: Read the Iceberg table - Parquet files have no field IDs, so position-based mapping is used
910+
checkIcebergNativeScan(s"SELECT * FROM $destName ORDER BY id")
911+
checkIcebergNativeScan(s"SELECT id, name FROM $destName ORDER BY id")
912+
checkIcebergNativeScan(s"SELECT value FROM $destName WHERE id < 5 ORDER BY id")
913+
914+
spark.sql(s"DROP TABLE $destName")
915+
spark.sql(s"DROP TABLE $sourceName")
916+
} catch {
917+
case _: ClassNotFoundException =>
918+
cancel("Iceberg Actions API not available - requires iceberg-spark-runtime")
919+
}
920+
}
921+
}
922+
}
923+
833924
test("projection - column subset, reordering, and duplication") {
834925
assume(icebergAvailable, "Iceberg not available in classpath")
835926

0 commit comments

Comments
 (0)