From 725260576669b65ae1ba945b5b7ecdf875d7ecfa Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Aug 2025 10:10:23 -0600 Subject: [PATCH 1/3] Access Spark configs from native code --- native/core/src/execution/jni_api.rs | 34 ++++++++++--------- native/core/src/execution/mod.rs | 1 + native/core/src/execution/spark_config.rs | 34 +++++++++++++++++++ .../org/apache/comet/CometExecIterator.scala | 7 ++-- .../main/scala/org/apache/comet/Native.scala | 5 +-- 5 files changed, 56 insertions(+), 25 deletions(-) create mode 100644 native/core/src/execution/spark_config.rs diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index ef5435cbc9..a060e3cea5 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -75,6 +75,9 @@ use crate::execution::spark_plan::SparkPlan; use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace}; +use crate::execution::spark_config::{ + SparkConfig, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_TRACING_ENABLED, +}; use datafusion_comet_proto::spark_operator::operator::OpStruct; use log::info; use once_cell::sync::Lazy; @@ -164,12 +167,20 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( memory_limit: jlong, memory_limit_per_task: jlong, task_attempt_id: jlong, - debug_native: jboolean, - explain_native: jboolean, - tracing_enabled: jboolean, ) -> jlong { try_unwrap_or_throw(&e, |mut env| { - with_trace("createPlan", tracing_enabled != JNI_FALSE, || { + // Deserialize Spark configs + let array = unsafe { JPrimitiveArray::from_raw(serialized_spark_configs) }; + let bytes = env.convert_byte_array(array)?; + let spark_configs = serde::deserialize_config(bytes.as_slice())?; + + // Convert Spark configs to HashMap + let spark_config: HashMap = spark_configs.entries.into_iter().collect(); + let debug_native = spark_config.get_bool(COMET_DEBUG_ENABLED); + let explain_native = spark_config.get_bool(COMET_EXPLAIN_NATIVE_ENABLED); + let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED); + + with_trace("createPlan", tracing_enabled, || { // Init JVM classes JVMClasses::init(&mut env); @@ -180,15 +191,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( let bytes = env.convert_byte_array(array)?; let spark_plan = serde::deserialize_op(bytes.as_slice())?; - // Deserialize Spark configs - let array = unsafe { JPrimitiveArray::from_raw(serialized_spark_configs) }; - let bytes = env.convert_byte_array(array)?; - let spark_configs = serde::deserialize_config(bytes.as_slice())?; - - // Convert Spark configs to HashMap - let _spark_config_map: HashMap = - spark_configs.entries.into_iter().collect(); - let metrics = Arc::new(jni_new_global_ref!(env, metrics_node)?); // Get the global references of input sources @@ -253,10 +255,10 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( metrics_last_update_time: Instant::now(), plan_creation_time, session_ctx: Arc::new(session), - debug_native: debug_native == 1, - explain_native: explain_native == 1, + debug_native, + explain_native, memory_pool_config, - tracing_enabled: tracing_enabled != JNI_FALSE, + tracing_enabled, }); Ok(Box::into_raw(exec_context) as i64) diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs index c55b96f2a9..b8a3d546b3 100644 --- a/native/core/src/execution/mod.rs +++ b/native/core/src/execution/mod.rs @@ -27,6 +27,7 @@ pub(crate) mod sort; pub(crate) mod spark_plan; pub use datafusion_comet_spark_expr::timezone; mod memory_pools; +pub(crate) mod spark_config; pub(crate) mod tracing; pub(crate) mod utils; diff --git a/native/core/src/execution/spark_config.rs b/native/core/src/execution/spark_config.rs new file mode 100644 index 0000000000..7465a1ea9f --- /dev/null +++ b/native/core/src/execution/spark_config.rs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled"; +pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled"; +pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled"; + +pub(crate) trait SparkConfig { + fn get_bool(&self, name: &str) -> bool; +} + +impl SparkConfig for HashMap { + fn get_bool(&self, name: &str) -> bool { + self.get(name) + .and_then(|str_val| str_val.parse::().ok()) + .unwrap_or(false) + } +} diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 67d044f8c5..700e786e35 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -30,7 +30,7 @@ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.comet.CometMetricNode import org.apache.spark.sql.vectorized._ -import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_METRICS_UPDATE_INTERVAL} +import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_EXEC_MEMORY_POOL_TYPE, COMET_METRICS_UPDATE_INTERVAL} import org.apache.comet.Tracing.withTrace import org.apache.comet.serde.Config.ConfigMap import org.apache.comet.vector.NativeUtil @@ -108,10 +108,7 @@ class CometExecIterator( memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE.get(), memoryLimit, memoryLimitPerTask = getMemoryLimitPerTask(conf), - taskAttemptId = TaskContext.get().taskAttemptId, - debug = COMET_DEBUG_ENABLED.get(), - explain = COMET_EXPLAIN_NATIVE_ENABLED.get(), - tracingEnabled) + taskAttemptId = TaskContext.get().taskAttemptId) } private var nextBatch: Option[ColumnarBatch] = None diff --git a/spark/src/main/scala/org/apache/comet/Native.scala b/spark/src/main/scala/org/apache/comet/Native.scala index 7430a4322c..a269993bb1 100644 --- a/spark/src/main/scala/org/apache/comet/Native.scala +++ b/spark/src/main/scala/org/apache/comet/Native.scala @@ -65,10 +65,7 @@ class Native extends NativeBase { memoryPoolType: String, memoryLimit: Long, memoryLimitPerTask: Long, - taskAttemptId: Long, - debug: Boolean, - explain: Boolean, - tracingEnabled: Boolean): Long + taskAttemptId: Long): Long // scalastyle:on /** From d084cfa0e192d7655587ce1d3bb7e6038fc2bd7f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Aug 2025 10:16:35 -0600 Subject: [PATCH 2/3] code cleanup --- dev/diffs/4.0.0.diff | 153 +++++++++++++++++++++++---- native/core/src/execution/jni_api.rs | 4 +- 2 files changed, 134 insertions(+), 23 deletions(-) diff --git a/dev/diffs/4.0.0.diff b/dev/diffs/4.0.0.diff index 7d2dc790d3..6799999b8a 100644 --- a/dev/diffs/4.0.0.diff +++ b/dev/diffs/4.0.0.diff @@ -700,10 +700,10 @@ index 9c529d14221..069b7c5adeb 100644 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala new file mode 100644 -index 00000000000..5eb3fa17ca8 +index 00000000000..5691536c114 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala -@@ -0,0 +1,43 @@ +@@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with @@ -732,6 +732,8 @@ index 00000000000..5eb3fa17ca8 + * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`). + */ +case class IgnoreComet(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet") +case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet") + +/** @@ -1055,7 +1057,7 @@ index ad424b3a7cc..4ece0117a34 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index b3fce19979e..345acb4811a 100644 +index b3fce19979e..67edf5eb91c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1524,7 +1524,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -1084,11 +1086,24 @@ index b3fce19979e..345acb4811a 100644 test("SPARK-39175: Query context of Cast should be serialized to executors" + - " when WSCG is off") { + " when WSCG is off", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/2218")) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", SQLConf.ANSI_ENABLED.key -> "true") { withTable("t") { -@@ -4497,7 +4500,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark +@@ -4490,14 +4493,20 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + assert(ex.isInstanceOf[SparkNumberFormatException] || + ex.isInstanceOf[SparkDateTimeException] || + ex.isInstanceOf[SparkRuntimeException]) +- assert(ex.getMessage.contains(query)) ++ ++ if (!isCometEnabled) { ++ // Comet's error message does not include the original SQL query ++ // https://github.com/apache/datafusion-comet/issues/2215 ++ assert(ex.getMessage.contains(query)) ++ } + } + } + } } test("SPARK-39190,SPARK-39208,SPARK-39210: Query context of decimal overflow error should " + @@ -1277,11 +1292,26 @@ index 2e33f6505ab..e1e93ab3bad 100644 } withTable("t1", "t2") { +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala +index fee375db10a..8c2c24e2c5f 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala +@@ -33,7 +33,9 @@ import org.apache.spark.sql.types._ + import org.apache.spark.types.variant._ + import org.apache.spark.unsafe.types.{UTF8String, VariantVal} + +-class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest { ++class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest ++ // TODO enable tests once https://github.com/apache/datafusion-comet/issues/2209 is fixed ++ with IgnoreCometSuite { + def parseJson(s: String): VariantVal = { + val v = VariantBuilder.parseJson(s, false) + new VariantVal(v.getValue, v.getMetadata) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala -index 11e9547dfc5..df5678c8d82 100644 +index 11e9547dfc5..be9ae40ab3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala -@@ -20,7 +20,7 @@ package org.apache.spark.sql.collation +@@ -20,10 +20,11 @@ package org.apache.spark.sql.collation import scala.jdk.CollectionConverters.MapHasAsJava import org.apache.spark.SparkException @@ -1290,7 +1320,21 @@ index 11e9547dfc5..df5678c8d82 100644 import org.apache.spark.sql.catalyst.ExtendedAnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.CollationFactory -@@ -1505,7 +1505,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { ++import org.apache.spark.sql.comet.{CometBroadcastHashJoinExec, CometHashJoinExec, CometSortMergeJoinExec} + import org.apache.spark.sql.connector.{DatasourceV2SQLBase, FakeV2ProviderWithCustomSchema} + import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable} + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper +@@ -55,7 +56,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { + assert( + collectFirst(queryPlan) { + case _: SortMergeJoinExec => assert(isSortMergeForced) ++ case _: CometSortMergeJoinExec => assert(isSortMergeForced) + case _: HashJoin => assert(!isSortMergeForced) ++ case _: CometHashJoinExec | _: CometBroadcastHashJoinExec => assert(!isSortMergeForced) + }.nonEmpty + ) + } +@@ -1505,7 +1508,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } } @@ -1301,7 +1345,23 @@ index 11e9547dfc5..df5678c8d82 100644 val t1 = "T_1" val t2 = "T_2" -@@ -1815,7 +1817,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { +@@ -1611,6 +1616,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { + } else { + assert(!collectFirst(queryPlan) { + case b: BroadcastHashJoinExec => b.leftKeys.head ++ case b: CometBroadcastHashJoinExec => b.leftKeys.head + }.head.isInstanceOf[ArrayTransform]) + } + } +@@ -1676,6 +1682,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { + } else { + assert(!collectFirst(queryPlan) { + case b: BroadcastHashJoinExec => b.leftKeys.head ++ case b: CometBroadcastHashJoinExec => b.leftKeys.head + }.head.isInstanceOf[ArrayTransform]) + } + } +@@ -1815,7 +1822,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } } @@ -2636,10 +2696,23 @@ index 22839d3f0d2..7e66d100e90 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..4f33ce4b3f2 100644 +index bba71f1c48d..38c60ee2584 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -1060,7 +1060,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +996,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + Seq(Some("A"), Some("A"), None).toDF().repartition(1) + .write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) +- checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df) ++ // Similar to Spark's vectorized reader, Comet doesn't do row-level filtering but relies ++ // on Spark to apply the data filters after columnar batches are returned ++ if (!isCometEnabled) { ++ checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df) ++ } + } + } + } +@@ -1060,7 +1064,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2649,7 +2722,7 @@ index bba71f1c48d..4f33ce4b3f2 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1085,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1089,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -3330,27 +3403,34 @@ index 86c4e49f6f6..2e639e5f38d 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index f0f3f94b811..486a436afb2 100644 +index f0f3f94b811..d64e4e54e22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -@@ -33,7 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, Suite, Tag} +@@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._ + import scala.language.implicitConversions + import scala.util.control.NonFatal + ++import org.apache.comet.CometConf + import org.apache.hadoop.fs.Path + import org.scalactic.source.Position + import org.scalatest.{BeforeAndAfterAll, Suite, Tag} import org.scalatest.concurrent.Eventually import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{AnalysisException, Row} -+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, IgnoreCometNativeIcebergCompat, IgnoreCometNativeScan, Row} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE -@@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase +@@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.classic.{ClassicConversions, ColumnConversions, ColumnNodeToExpressionConverter, DataFrame, Dataset, SparkSession, SQLImplicits} -+import org.apache.spark.sql.comet._ ++import org.apache.spark.sql.comet.{CometFilterExec, CometProjectExec} import org.apache.spark.sql.execution.FilterExec import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution import org.apache.spark.sql.execution.datasources.DataSourceUtils -@@ -128,7 +129,11 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with +@@ -128,7 +130,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with } } } else { @@ -3358,12 +3438,29 @@ index f0f3f94b811..486a436afb2 100644 + if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) { + ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) + } else { -+ super.test(testName, testTags: _*)(testFun) ++ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) ++ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT || ++ cometScanImpl == CometConf.SCAN_AUTO ++ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION || ++ cometScanImpl == CometConf.SCAN_AUTO ++ if (isCometEnabled && isNativeIcebergCompat && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) { ++ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun) ++ } else if (isCometEnabled && isNativeDataFusion && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) { ++ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun) ++ } else if (isCometEnabled && (isNativeDataFusion || isNativeIcebergCompat) && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) { ++ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)", ++ testTags: _*)(testFun) ++ } else { ++ super.test(testName, testTags: _*)(testFun) ++ } + } } } -@@ -248,8 +253,33 @@ private[sql] trait SQLTestUtilsBase +@@ -248,8 +271,33 @@ private[sql] trait SQLTestUtilsBase override protected def converter: ColumnNodeToExpressionConverter = self.spark.converter } @@ -3397,7 +3494,7 @@ index f0f3f94b811..486a436afb2 100644 super.withSQLConf(pairs: _*)(f) } -@@ -451,6 +481,8 @@ private[sql] trait SQLTestUtilsBase +@@ -451,6 +499,8 @@ private[sql] trait SQLTestUtilsBase val schema = df.schema val withoutFilters = df.queryExecution.executedPlan.transform { case FilterExec(_, child) => child @@ -3509,6 +3606,20 @@ index 4b27082e188..09f591dfed3 100644 Utils.withContextClassLoader(Utils.getSparkClassLoader) { withUserDefinedFunction(udfInfo.funcName -> false) { val sparkClassLoader = Thread.currentThread().getContextClassLoader +diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +index cc7bb193731..06555d48da7 100644 +--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ++++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +@@ -818,7 +818,8 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter + } + } + +- test("SPARK-30201 HiveOutputWriter standardOI should use ObjectInspectorCopyOption.DEFAULT") { ++ test("SPARK-30201 HiveOutputWriter standardOI should use ObjectInspectorCopyOption.DEFAULT", ++ IgnoreComet("Comet does not support reading non UTF-8 strings")) { + withTable("t1", "t2") { + withTempDir { dir => + val file = new File(dir, "test.hex") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index b67370f6eb9..746b3974b29 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a060e3cea5..1694d81ea7 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -173,9 +173,9 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( let array = unsafe { JPrimitiveArray::from_raw(serialized_spark_configs) }; let bytes = env.convert_byte_array(array)?; let spark_configs = serde::deserialize_config(bytes.as_slice())?; - - // Convert Spark configs to HashMap let spark_config: HashMap = spark_configs.entries.into_iter().collect(); + + // Access Spark configs let debug_native = spark_config.get_bool(COMET_DEBUG_ENABLED); let explain_native = spark_config.get_bool(COMET_EXPLAIN_NATIVE_ENABLED); let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED); From 48379352f6f684e74e7052659ae31f0dcfe0cc3b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Aug 2025 10:18:18 -0600 Subject: [PATCH 3/3] revert --- dev/diffs/4.0.0.diff | 153 ++++++------------------------------------- 1 file changed, 21 insertions(+), 132 deletions(-) diff --git a/dev/diffs/4.0.0.diff b/dev/diffs/4.0.0.diff index 6799999b8a..7d2dc790d3 100644 --- a/dev/diffs/4.0.0.diff +++ b/dev/diffs/4.0.0.diff @@ -700,10 +700,10 @@ index 9c529d14221..069b7c5adeb 100644 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala new file mode 100644 -index 00000000000..5691536c114 +index 00000000000..5eb3fa17ca8 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala -@@ -0,0 +1,45 @@ +@@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with @@ -732,8 +732,6 @@ index 00000000000..5691536c114 + * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`). + */ +case class IgnoreComet(reason: String) extends Tag("DisableComet") -+case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet") -+case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet") +case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet") + +/** @@ -1057,7 +1055,7 @@ index ad424b3a7cc..4ece0117a34 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index b3fce19979e..67edf5eb91c 100644 +index b3fce19979e..345acb4811a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1524,7 +1524,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -1086,24 +1084,11 @@ index b3fce19979e..67edf5eb91c 100644 test("SPARK-39175: Query context of Cast should be serialized to executors" + - " when WSCG is off") { + " when WSCG is off", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/2218")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", SQLConf.ANSI_ENABLED.key -> "true") { withTable("t") { -@@ -4490,14 +4493,20 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark - assert(ex.isInstanceOf[SparkNumberFormatException] || - ex.isInstanceOf[SparkDateTimeException] || - ex.isInstanceOf[SparkRuntimeException]) -- assert(ex.getMessage.contains(query)) -+ -+ if (!isCometEnabled) { -+ // Comet's error message does not include the original SQL query -+ // https://github.com/apache/datafusion-comet/issues/2215 -+ assert(ex.getMessage.contains(query)) -+ } - } - } - } +@@ -4497,7 +4500,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } test("SPARK-39190,SPARK-39208,SPARK-39210: Query context of decimal overflow error should " + @@ -1292,26 +1277,11 @@ index 2e33f6505ab..e1e93ab3bad 100644 } withTable("t1", "t2") { -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala -index fee375db10a..8c2c24e2c5f 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala -@@ -33,7 +33,9 @@ import org.apache.spark.sql.types._ - import org.apache.spark.types.variant._ - import org.apache.spark.unsafe.types.{UTF8String, VariantVal} - --class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest { -+class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest -+ // TODO enable tests once https://github.com/apache/datafusion-comet/issues/2209 is fixed -+ with IgnoreCometSuite { - def parseJson(s: String): VariantVal = { - val v = VariantBuilder.parseJson(s, false) - new VariantVal(v.getValue, v.getMetadata) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala -index 11e9547dfc5..be9ae40ab3d 100644 +index 11e9547dfc5..df5678c8d82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala -@@ -20,10 +20,11 @@ package org.apache.spark.sql.collation +@@ -20,7 +20,7 @@ package org.apache.spark.sql.collation import scala.jdk.CollectionConverters.MapHasAsJava import org.apache.spark.SparkException @@ -1320,21 +1290,7 @@ index 11e9547dfc5..be9ae40ab3d 100644 import org.apache.spark.sql.catalyst.ExtendedAnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.CollationFactory -+import org.apache.spark.sql.comet.{CometBroadcastHashJoinExec, CometHashJoinExec, CometSortMergeJoinExec} - import org.apache.spark.sql.connector.{DatasourceV2SQLBase, FakeV2ProviderWithCustomSchema} - import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable} - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper -@@ -55,7 +56,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { - assert( - collectFirst(queryPlan) { - case _: SortMergeJoinExec => assert(isSortMergeForced) -+ case _: CometSortMergeJoinExec => assert(isSortMergeForced) - case _: HashJoin => assert(!isSortMergeForced) -+ case _: CometHashJoinExec | _: CometBroadcastHashJoinExec => assert(!isSortMergeForced) - }.nonEmpty - ) - } -@@ -1505,7 +1508,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { +@@ -1505,7 +1505,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } } @@ -1345,23 +1301,7 @@ index 11e9547dfc5..be9ae40ab3d 100644 val t1 = "T_1" val t2 = "T_2" -@@ -1611,6 +1616,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { - } else { - assert(!collectFirst(queryPlan) { - case b: BroadcastHashJoinExec => b.leftKeys.head -+ case b: CometBroadcastHashJoinExec => b.leftKeys.head - }.head.isInstanceOf[ArrayTransform]) - } - } -@@ -1676,6 +1682,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { - } else { - assert(!collectFirst(queryPlan) { - case b: BroadcastHashJoinExec => b.leftKeys.head -+ case b: CometBroadcastHashJoinExec => b.leftKeys.head - }.head.isInstanceOf[ArrayTransform]) - } - } -@@ -1815,7 +1822,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { +@@ -1815,7 +1817,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } } @@ -2696,23 +2636,10 @@ index 22839d3f0d2..7e66d100e90 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..38c60ee2584 100644 +index bba71f1c48d..4f33ce4b3f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -996,7 +996,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - Seq(Some("A"), Some("A"), None).toDF().repartition(1) - .write.parquet(path.getAbsolutePath) - val df = spark.read.parquet(path.getAbsolutePath) -- checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df) -+ // Similar to Spark's vectorized reader, Comet doesn't do row-level filtering but relies -+ // on Spark to apply the data filters after columnar batches are returned -+ if (!isCometEnabled) { -+ checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df) -+ } - } - } - } -@@ -1060,7 +1064,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1060,7 +1060,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2722,7 +2649,7 @@ index bba71f1c48d..38c60ee2584 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1089,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1085,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -3403,34 +3330,27 @@ index 86c4e49f6f6..2e639e5f38d 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index f0f3f94b811..d64e4e54e22 100644 +index f0f3f94b811..486a436afb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -@@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._ - import scala.language.implicitConversions - import scala.util.control.NonFatal - -+import org.apache.comet.CometConf - import org.apache.hadoop.fs.Path - import org.scalactic.source.Position - import org.scalatest.{BeforeAndAfterAll, Suite, Tag} +@@ -33,7 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, Suite, Tag} import org.scalatest.concurrent.Eventually import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{AnalysisException, Row} -+import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, IgnoreCometNativeIcebergCompat, IgnoreCometNativeScan, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE -@@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase +@@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.classic.{ClassicConversions, ColumnConversions, ColumnNodeToExpressionConverter, DataFrame, Dataset, SparkSession, SQLImplicits} -+import org.apache.spark.sql.comet.{CometFilterExec, CometProjectExec} ++import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution.FilterExec import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution import org.apache.spark.sql.execution.datasources.DataSourceUtils -@@ -128,7 +130,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with +@@ -128,7 +129,11 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with } } } else { @@ -3438,29 +3358,12 @@ index f0f3f94b811..d64e4e54e22 100644 + if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) { + ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) + } else { -+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) -+ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT || -+ cometScanImpl == CometConf.SCAN_AUTO -+ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION || -+ cometScanImpl == CometConf.SCAN_AUTO -+ if (isCometEnabled && isNativeIcebergCompat && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) { -+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun) -+ } else if (isCometEnabled && isNativeDataFusion && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) { -+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun) -+ } else if (isCometEnabled && (isNativeDataFusion || isNativeIcebergCompat) && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) { -+ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)", -+ testTags: _*)(testFun) -+ } else { -+ super.test(testName, testTags: _*)(testFun) -+ } ++ super.test(testName, testTags: _*)(testFun) + } } } -@@ -248,8 +271,33 @@ private[sql] trait SQLTestUtilsBase +@@ -248,8 +253,33 @@ private[sql] trait SQLTestUtilsBase override protected def converter: ColumnNodeToExpressionConverter = self.spark.converter } @@ -3494,7 +3397,7 @@ index f0f3f94b811..d64e4e54e22 100644 super.withSQLConf(pairs: _*)(f) } -@@ -451,6 +499,8 @@ private[sql] trait SQLTestUtilsBase +@@ -451,6 +481,8 @@ private[sql] trait SQLTestUtilsBase val schema = df.schema val withoutFilters = df.queryExecution.executedPlan.transform { case FilterExec(_, child) => child @@ -3606,20 +3509,6 @@ index 4b27082e188..09f591dfed3 100644 Utils.withContextClassLoader(Utils.getSparkClassLoader) { withUserDefinedFunction(udfInfo.funcName -> false) { val sparkClassLoader = Thread.currentThread().getContextClassLoader -diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala -index cc7bb193731..06555d48da7 100644 ---- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala -+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala -@@ -818,7 +818,8 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter - } - } - -- test("SPARK-30201 HiveOutputWriter standardOI should use ObjectInspectorCopyOption.DEFAULT") { -+ test("SPARK-30201 HiveOutputWriter standardOI should use ObjectInspectorCopyOption.DEFAULT", -+ IgnoreComet("Comet does not support reading non UTF-8 strings")) { - withTable("t1", "t2") { - withTempDir { dir => - val file = new File(dir, "test.hex") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index b67370f6eb9..746b3974b29 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala