Skip to content

Commit f41059f

Browse files
committed
Add tests for non-identity transform residuals and fix scalastyle errors
1 parent db2fa02 commit f41059f

File tree

2 files changed

+158
-2
lines changed

2 files changed

+158
-2
lines changed

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -480,14 +480,16 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
480480

481481
// Check for transform functions in residual expressions
482482
// Non-identity transforms (truncate, bucket, year, month, day, hour) in residuals
483-
// are now supported - they skip row-group filtering and are handled post-scan by CometFilter.
483+
// are now supported - they skip row-group filtering and are handled
484+
// post-scan by CometFilter.
484485
// This is less optimal than row-group filtering but still allows native execution.
485486
val transformFunctionsSupported =
486487
try {
487488
IcebergReflection.findNonIdentityTransformInResiduals(metadata.tasks) match {
488489
case Some(transformType) =>
489490
// Found non-identity transform - log info and continue with native scan
490-
// Row-group filtering will skip these predicates, but post-scan filtering will apply
491+
// Row-group filtering will skip these predicates, but post-scan
492+
// filtering will apply
491493
logInfo(
492494
s"Iceberg residual contains transform '$transformType' - " +
493495
"row-group filtering will skip this predicate, " +

spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2283,6 +2283,160 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper {
22832283
}
22842284
}
22852285

2286+
// Tests for non-identity transform residuals feature
2287+
// These tests verify that native Iceberg scans work when residual expressions
2288+
// contain non-identity transforms (truncate, bucket, year, month, day, hour).
2289+
// Previously these would fall back to Spark, but now they're supported with
2290+
// post-scan filtering via CometFilter.
2291+
2292+
test("non-identity transform residual - truncate transform allows native scan") {
2293+
assume(icebergAvailable, "Iceberg not available in classpath")
2294+
2295+
withTempIcebergDir { warehouseDir =>
2296+
withSQLConf(
2297+
"spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog",
2298+
"spark.sql.catalog.test_cat.type" -> "hadoop",
2299+
"spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
2300+
CometConf.COMET_ENABLED.key -> "true",
2301+
CometConf.COMET_EXEC_ENABLED.key -> "true",
2302+
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
2303+
2304+
// Create table partitioned by truncate transform
2305+
// When filtering by exact value (e.g., name = 'alpha_1'), Iceberg creates
2306+
// a residual expression because truncate(5, name) can't fully evaluate this
2307+
spark.sql("""
2308+
CREATE TABLE test_cat.db.truncate_residual_test (
2309+
id INT,
2310+
name STRING
2311+
) USING iceberg
2312+
PARTITIONED BY (truncate(5, name))
2313+
""")
2314+
2315+
spark.sql("""
2316+
INSERT INTO test_cat.db.truncate_residual_test VALUES
2317+
(1, 'alpha_1'), (2, 'alpha_2'), (3, 'alpha_3'),
2318+
(4, 'bravo_1'), (5, 'bravo_2'), (6, 'charlie_1')
2319+
""")
2320+
2321+
// This filter creates a residual with truncate transform
2322+
// The partition can narrow down to 'alpha' prefix, but exact match
2323+
// requires post-scan filtering
2324+
checkIcebergNativeScan(
2325+
"SELECT * FROM test_cat.db.truncate_residual_test WHERE name = 'alpha_2' ORDER BY id")
2326+
2327+
// Verify correct results
2328+
val result = spark
2329+
.sql("SELECT * FROM test_cat.db.truncate_residual_test WHERE name = 'alpha_2'")
2330+
.collect()
2331+
assert(result.length == 1, s"Expected 1 row, got ${result.length}")
2332+
assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}")
2333+
2334+
spark.sql("DROP TABLE test_cat.db.truncate_residual_test")
2335+
}
2336+
}
2337+
}
2338+
2339+
test("non-identity transform residual - bucket transform allows native scan") {
2340+
assume(icebergAvailable, "Iceberg not available in classpath")
2341+
2342+
withTempIcebergDir { warehouseDir =>
2343+
withSQLConf(
2344+
"spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog",
2345+
"spark.sql.catalog.test_cat.type" -> "hadoop",
2346+
"spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
2347+
CometConf.COMET_ENABLED.key -> "true",
2348+
CometConf.COMET_EXEC_ENABLED.key -> "true",
2349+
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
2350+
2351+
// Create table partitioned by bucket transform
2352+
// When filtering by exact id value, Iceberg creates a residual expression
2353+
// because bucket(4, id) maps multiple ids to the same bucket
2354+
spark.sql("""
2355+
CREATE TABLE test_cat.db.bucket_residual_test (
2356+
id INT,
2357+
value DOUBLE
2358+
) USING iceberg
2359+
PARTITIONED BY (bucket(4, id))
2360+
""")
2361+
2362+
spark.sql("""
2363+
INSERT INTO test_cat.db.bucket_residual_test
2364+
SELECT id, CAST(id * 1.5 AS DOUBLE) as value
2365+
FROM range(100)
2366+
""")
2367+
2368+
// This filter creates a residual with bucket transform
2369+
// The partition pruning uses bucket hash, but exact id match
2370+
// requires post-scan filtering
2371+
checkIcebergNativeScan(
2372+
"SELECT * FROM test_cat.db.bucket_residual_test WHERE id = 42 ORDER BY id")
2373+
2374+
// Verify correct results
2375+
val result =
2376+
spark.sql("SELECT * FROM test_cat.db.bucket_residual_test WHERE id = 42").collect()
2377+
assert(result.length == 1, s"Expected 1 row, got ${result.length}")
2378+
assert(result(0).getInt(0) == 42, s"Expected id=42, got ${result(0).getInt(0)}")
2379+
2380+
spark.sql("DROP TABLE test_cat.db.bucket_residual_test")
2381+
}
2382+
}
2383+
}
2384+
2385+
test("non-identity transform residual - year transform allows native scan") {
2386+
assume(icebergAvailable, "Iceberg not available in classpath")
2387+
2388+
withTempIcebergDir { warehouseDir =>
2389+
withSQLConf(
2390+
"spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog",
2391+
"spark.sql.catalog.test_cat.type" -> "hadoop",
2392+
"spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
2393+
CometConf.COMET_ENABLED.key -> "true",
2394+
CometConf.COMET_EXEC_ENABLED.key -> "true",
2395+
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
2396+
2397+
// Create table partitioned by year transform
2398+
// When filtering by exact date, Iceberg creates a residual expression
2399+
// because year(event_date) groups all dates in a year together
2400+
spark.sql("""
2401+
CREATE TABLE test_cat.db.year_residual_test (
2402+
id INT,
2403+
event_date DATE,
2404+
data STRING
2405+
) USING iceberg
2406+
PARTITIONED BY (year(event_date))
2407+
""")
2408+
2409+
spark.sql("""
2410+
INSERT INTO test_cat.db.year_residual_test VALUES
2411+
(1, DATE '2023-01-15', 'jan'),
2412+
(2, DATE '2023-06-20', 'jun'),
2413+
(3, DATE '2023-12-25', 'dec'),
2414+
(4, DATE '2024-01-10', 'new_year'),
2415+
(5, DATE '2024-07-04', 'july')
2416+
""")
2417+
2418+
// This filter creates a residual with year transform
2419+
// Partition pruning narrows to 2023, but exact date match
2420+
// requires post-scan filtering
2421+
checkIcebergNativeScan(
2422+
"SELECT * FROM test_cat.db.year_residual_test WHERE event_date = DATE '2023-06-20'")
2423+
2424+
// Verify correct results
2425+
val result = spark
2426+
.sql(
2427+
"SELECT * FROM test_cat.db.year_residual_test WHERE event_date = DATE '2023-06-20'")
2428+
.collect()
2429+
assert(result.length == 1, s"Expected 1 row, got ${result.length}")
2430+
assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}")
2431+
assert(
2432+
result(0).getString(2) == "jun",
2433+
s"Expected data='jun', got ${result(0).getString(2)}")
2434+
2435+
spark.sql("DROP TABLE test_cat.db.year_residual_test")
2436+
}
2437+
}
2438+
}
2439+
22862440
// Helper to create temp directory
22872441
def withTempIcebergDir(f: File => Unit): Unit = {
22882442
val dir = Files.createTempDirectory("comet-iceberg-test").toFile

0 commit comments

Comments
 (0)