Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -478,29 +478,35 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
false
}

// Check for unsupported transform functions in residual expressions
// iceberg-rust can only handle identity transforms in residuals; all other transforms
// (truncate, bucket, year, month, day, hour) must fall back to Spark
// Check for transform functions in residual expressions
// Non-identity transforms (truncate, bucket, year, month, day, hour) in residuals
// are now supported - they skip row-group filtering and are handled
// post-scan by CometFilter.
// This is less optimal than row-group filtering but still allows native execution.
val transformFunctionsSupported =
try {
IcebergReflection.findNonIdentityTransformInResiduals(metadata.tasks) match {
case Some(transformType) =>
// Found unsupported transform
fallbackReasons +=
s"Iceberg transform function '$transformType' in residual expression " +
"is not yet supported by iceberg-rust. " +
"Only identity transforms are supported."
false
// Found non-identity transform - log info and continue with native scan
// Row-group filtering will skip these predicates, but post-scan
// filtering will apply
logInfo(
s"Iceberg residual contains transform '$transformType' - " +
"row-group filtering will skip this predicate, " +
"post-scan filtering by CometFilter will apply instead.")
true // Allow native execution
case None =>
// No unsupported transforms found - safe to use native execution
// No non-identity transforms - optimal row-group filtering will apply
true
}
} catch {
case e: Exception =>
// Reflection failure - cannot verify safety, must fall back
fallbackReasons += "Iceberg reflection failure: Could not check for " +
s"transform functions in residuals: ${e.getMessage}"
false
// Reflection failure - log warning but allow native execution
// The predicate conversion will handle unsupported cases gracefully
logWarning(
s"Could not check for transform functions in residuals: ${e.getMessage}. " +
"Continuing with native scan.")
true
}

// Check for unsupported struct types in delete files
Expand Down
154 changes: 154 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2283,6 +2283,160 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper {
}
}

// Tests for non-identity transform residuals feature
// These tests verify that native Iceberg scans work when residual expressions
// contain non-identity transforms (truncate, bucket, year, month, day, hour).
// Previously these would fall back to Spark, but now they're supported with
// post-scan filtering via CometFilter.

test("non-identity transform residual - truncate transform allows native scan") {
assume(icebergAvailable, "Iceberg not available in classpath")

withTempIcebergDir { warehouseDir =>
withSQLConf(
"spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.test_cat.type" -> "hadoop",
"spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {

// Create table partitioned by truncate transform
// When filtering by exact value (e.g., name = 'alpha_1'), Iceberg creates
// a residual expression because truncate(5, name) can't fully evaluate this
spark.sql("""
CREATE TABLE test_cat.db.truncate_residual_test (
id INT,
name STRING
) USING iceberg
PARTITIONED BY (truncate(5, name))
""")

spark.sql("""
INSERT INTO test_cat.db.truncate_residual_test VALUES
(1, 'alpha_1'), (2, 'alpha_2'), (3, 'alpha_3'),
(4, 'bravo_1'), (5, 'bravo_2'), (6, 'charlie_1')
""")

// This filter creates a residual with truncate transform
// The partition can narrow down to 'alpha' prefix, but exact match
// requires post-scan filtering
checkIcebergNativeScan(
"SELECT * FROM test_cat.db.truncate_residual_test WHERE name = 'alpha_2' ORDER BY id")

// Verify correct results
val result = spark
.sql("SELECT * FROM test_cat.db.truncate_residual_test WHERE name = 'alpha_2'")
.collect()
assert(result.length == 1, s"Expected 1 row, got ${result.length}")
assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}")

spark.sql("DROP TABLE test_cat.db.truncate_residual_test")
}
}
}

test("non-identity transform residual - bucket transform allows native scan") {
assume(icebergAvailable, "Iceberg not available in classpath")

withTempIcebergDir { warehouseDir =>
withSQLConf(
"spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.test_cat.type" -> "hadoop",
"spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {

// Create table partitioned by bucket transform
// When filtering by exact id value, Iceberg creates a residual expression
// because bucket(4, id) maps multiple ids to the same bucket
spark.sql("""
CREATE TABLE test_cat.db.bucket_residual_test (
id INT,
value DOUBLE
) USING iceberg
PARTITIONED BY (bucket(4, id))
""")

spark.sql("""
INSERT INTO test_cat.db.bucket_residual_test
SELECT id, CAST(id * 1.5 AS DOUBLE) as value
FROM range(100)
""")

// This filter creates a residual with bucket transform
// The partition pruning uses bucket hash, but exact id match
// requires post-scan filtering
checkIcebergNativeScan(
"SELECT * FROM test_cat.db.bucket_residual_test WHERE id = 42 ORDER BY id")

// Verify correct results
val result =
spark.sql("SELECT * FROM test_cat.db.bucket_residual_test WHERE id = 42").collect()
assert(result.length == 1, s"Expected 1 row, got ${result.length}")
assert(result(0).getInt(0) == 42, s"Expected id=42, got ${result(0).getInt(0)}")

spark.sql("DROP TABLE test_cat.db.bucket_residual_test")
}
}
}

test("non-identity transform residual - year transform allows native scan") {
assume(icebergAvailable, "Iceberg not available in classpath")

withTempIcebergDir { warehouseDir =>
withSQLConf(
"spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.test_cat.type" -> "hadoop",
"spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {

// Create table partitioned by year transform
// When filtering by exact date, Iceberg creates a residual expression
// because year(event_date) groups all dates in a year together
spark.sql("""
CREATE TABLE test_cat.db.year_residual_test (
id INT,
event_date DATE,
data STRING
) USING iceberg
PARTITIONED BY (year(event_date))
""")

spark.sql("""
INSERT INTO test_cat.db.year_residual_test VALUES
(1, DATE '2023-01-15', 'jan'),
(2, DATE '2023-06-20', 'jun'),
(3, DATE '2023-12-25', 'dec'),
(4, DATE '2024-01-10', 'new_year'),
(5, DATE '2024-07-04', 'july')
""")

// This filter creates a residual with year transform
// Partition pruning narrows to 2023, but exact date match
// requires post-scan filtering
checkIcebergNativeScan(
"SELECT * FROM test_cat.db.year_residual_test WHERE event_date = DATE '2023-06-20'")

// Verify correct results
val result = spark
.sql(
"SELECT * FROM test_cat.db.year_residual_test WHERE event_date = DATE '2023-06-20'")
.collect()
assert(result.length == 1, s"Expected 1 row, got ${result.length}")
assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}")
assert(
result(0).getString(2) == "jun",
s"Expected data='jun', got ${result(0).getString(2)}")

spark.sql("DROP TABLE test_cat.db.year_residual_test")
}
}
}

// Helper to create temp directory
def withTempIcebergDir(f: File => Unit): Unit = {
val dir = Files.createTempDirectory("comet-iceberg-test").toFile
Expand Down
Loading