diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 69bce75559..41fd30e43c 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -478,29 +478,35 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com false } - // Check for unsupported transform functions in residual expressions - // iceberg-rust can only handle identity transforms in residuals; all other transforms - // (truncate, bucket, year, month, day, hour) must fall back to Spark + // Check for transform functions in residual expressions + // Non-identity transforms (truncate, bucket, year, month, day, hour) in residuals + // are now supported - they skip row-group filtering and are handled + // post-scan by CometFilter. + // This is less optimal than row-group filtering but still allows native execution. val transformFunctionsSupported = try { IcebergReflection.findNonIdentityTransformInResiduals(metadata.tasks) match { case Some(transformType) => - // Found unsupported transform - fallbackReasons += - s"Iceberg transform function '$transformType' in residual expression " + - "is not yet supported by iceberg-rust. " + - "Only identity transforms are supported." - false + // Found non-identity transform - log info and continue with native scan + // Row-group filtering will skip these predicates, but post-scan + // filtering will apply + logInfo( + s"Iceberg residual contains transform '$transformType' - " + + "row-group filtering will skip this predicate, " + + "post-scan filtering by CometFilter will apply instead.") + true // Allow native execution case None => - // No unsupported transforms found - safe to use native execution + // No non-identity transforms - optimal row-group filtering will apply true } } catch { case e: Exception => - // Reflection failure - cannot verify safety, must fall back - fallbackReasons += "Iceberg reflection failure: Could not check for " + - s"transform functions in residuals: ${e.getMessage}" - false + // Reflection failure - log warning but allow native execution + // The predicate conversion will handle unsupported cases gracefully + logWarning( + s"Could not check for transform functions in residuals: ${e.getMessage}. " + + "Continuing with native scan.") + true } // Check for unsupported struct types in delete files diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index 174b091050..243baa7210 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -2283,6 +2283,160 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } + // Tests for non-identity transform residuals feature + // These tests verify that native Iceberg scans work when residual expressions + // contain non-identity transforms (truncate, bucket, year, month, day, hour). + // Previously these would fall back to Spark, but now they're supported with + // post-scan filtering via CometFilter. + + test("non-identity transform residual - truncate transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by truncate transform + // When filtering by exact value (e.g., name = 'alpha_1'), Iceberg creates + // a residual expression because truncate(5, name) can't fully evaluate this + spark.sql(""" + CREATE TABLE test_cat.db.truncate_residual_test ( + id INT, + name STRING + ) USING iceberg + PARTITIONED BY (truncate(5, name)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.truncate_residual_test VALUES + (1, 'alpha_1'), (2, 'alpha_2'), (3, 'alpha_3'), + (4, 'bravo_1'), (5, 'bravo_2'), (6, 'charlie_1') + """) + + // This filter creates a residual with truncate transform + // The partition can narrow down to 'alpha' prefix, but exact match + // requires post-scan filtering + checkIcebergNativeScan( + "SELECT * FROM test_cat.db.truncate_residual_test WHERE name = 'alpha_2' ORDER BY id") + + // Verify correct results + val result = spark + .sql("SELECT * FROM test_cat.db.truncate_residual_test WHERE name = 'alpha_2'") + .collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}") + + spark.sql("DROP TABLE test_cat.db.truncate_residual_test") + } + } + } + + test("non-identity transform residual - bucket transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by bucket transform + // When filtering by exact id value, Iceberg creates a residual expression + // because bucket(4, id) maps multiple ids to the same bucket + spark.sql(""" + CREATE TABLE test_cat.db.bucket_residual_test ( + id INT, + value DOUBLE + ) USING iceberg + PARTITIONED BY (bucket(4, id)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.bucket_residual_test + SELECT id, CAST(id * 1.5 AS DOUBLE) as value + FROM range(100) + """) + + // This filter creates a residual with bucket transform + // The partition pruning uses bucket hash, but exact id match + // requires post-scan filtering + checkIcebergNativeScan( + "SELECT * FROM test_cat.db.bucket_residual_test WHERE id = 42 ORDER BY id") + + // Verify correct results + val result = + spark.sql("SELECT * FROM test_cat.db.bucket_residual_test WHERE id = 42").collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 42, s"Expected id=42, got ${result(0).getInt(0)}") + + spark.sql("DROP TABLE test_cat.db.bucket_residual_test") + } + } + } + + test("non-identity transform residual - year transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by year transform + // When filtering by exact date, Iceberg creates a residual expression + // because year(event_date) groups all dates in a year together + spark.sql(""" + CREATE TABLE test_cat.db.year_residual_test ( + id INT, + event_date DATE, + data STRING + ) USING iceberg + PARTITIONED BY (year(event_date)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.year_residual_test VALUES + (1, DATE '2023-01-15', 'jan'), + (2, DATE '2023-06-20', 'jun'), + (3, DATE '2023-12-25', 'dec'), + (4, DATE '2024-01-10', 'new_year'), + (5, DATE '2024-07-04', 'july') + """) + + // This filter creates a residual with year transform + // Partition pruning narrows to 2023, but exact date match + // requires post-scan filtering + checkIcebergNativeScan( + "SELECT * FROM test_cat.db.year_residual_test WHERE event_date = DATE '2023-06-20'") + + // Verify correct results + val result = spark + .sql( + "SELECT * FROM test_cat.db.year_residual_test WHERE event_date = DATE '2023-06-20'") + .collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}") + assert( + result(0).getString(2) == "jun", + s"Expected data='jun', got ${result(0).getString(2)}") + + spark.sql("DROP TABLE test_cat.db.year_residual_test") + } + } + } + // Helper to create temp directory def withTempIcebergDir(f: File => Unit): Unit = { val dir = Files.createTempDirectory("comet-iceberg-test").toFile