Skip to content

Commit 655081b

Browse files
test: enabling Spark tests with offHeap requirement (#1177)
## Which issue does this PR close? ## Rationale for this change After #1062 We have not running Spark tests for native execution ## What changes are included in this PR? Removed the off heap requirement for testing ## How are these changes tested? Bringing back Spark tests for native execution
1 parent 46a28db commit 655081b

File tree

5 files changed

+142
-11
lines changed

5 files changed

+142
-11
lines changed

dev/diffs/4.0.0-preview1.diff

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,77 @@ index 698ca009b4f..57d774a3617 100644
146146

147147
-- Test tables
148148
CREATE table explain_temp1 (key int, val int) USING PARQUET;
149+
diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
150+
index 3a409eea348..26e9aaf215c 100644
151+
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
152+
+++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
153+
@@ -6,6 +6,9 @@
154+
-- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int4.sql
155+
--
156+
157+
+-- TODO: https://github.com/apache/datafusion-comet/issues/551
158+
+--SET spark.comet.enabled = false
159+
+
160+
CREATE TABLE INT4_TBL(f1 int) USING parquet;
161+
162+
-- [SPARK-28023] Trim the string when cast string type to other types
163+
diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
164+
index fac23b4a26f..98b12ae5ccc 100644
165+
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
166+
+++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
167+
@@ -6,6 +6,10 @@
168+
-- Test int8 64-bit integers.
169+
-- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int8.sql
170+
--
171+
+
172+
+-- TODO: https://github.com/apache/datafusion-comet/issues/551
173+
+--SET spark.comet.enabled = false
174+
+
175+
CREATE TABLE INT8_TBL(q1 bigint, q2 bigint) USING parquet;
176+
177+
-- PostgreSQL implicitly casts string literals to data with integral types, but
178+
diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
179+
index 0efe0877e9b..f9df0400c99 100644
180+
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
181+
+++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
182+
@@ -6,6 +6,9 @@
183+
-- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql
184+
--
185+
186+
+-- TODO: https://github.com/apache/datafusion-comet/issues/551
187+
+--SET spark.comet.enabled = false
188+
+
189+
-- load test data
190+
CREATE TABLE test_having (a int, b int, c string, d string) USING parquet;
191+
INSERT INTO test_having VALUES (0, 1, 'XXXX', 'A');
192+
diff --git a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
193+
index e803254ea64..74db78aee38 100644
194+
--- a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
195+
+++ b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
196+
@@ -1,6 +1,9 @@
197+
-- This test suits check the spark.sql.viewSchemaBindingMode configuration.
198+
-- It can be DISABLED and COMPENSATION
199+
200+
+-- TODO: https://github.com/apache/datafusion-comet/issues/551
201+
+--SET spark.comet.enabled = false
202+
+
203+
-- Verify the default binding is true
204+
SET spark.sql.legacy.viewSchemaBindingMode;
205+
206+
diff --git a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
207+
index 21a3ce1e122..f4762ab98f0 100644
208+
--- a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
209+
+++ b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
210+
@@ -1,5 +1,9 @@
211+
-- This test suite checks the WITH SCHEMA COMPENSATION clause
212+
-- Disable ANSI mode to ensure we are forcing it explicitly in the CASTS
213+
+
214+
+-- TODO: https://github.com/apache/datafusion-comet/issues/551
215+
+--SET spark.comet.enabled = false
216+
+
217+
SET spark.sql.ansi.enabled = false;
218+
219+
-- In COMPENSATION views get invalidated if the type can't cast
149220
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
150221
index d023fb82185..0f4f03bda6c 100644
151222
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -917,7 +988,7 @@ index 34c6c49bc49..f5dea07a213 100644
917988
protected val baseResourcePath = {
918989
// use the same way as `SQLQueryTestSuite` to get the resource path
919990
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
920-
index 56c364e2084..a00a50e020a 100644
991+
index 56c364e2084..fc3abd7cdc4 100644
921992
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
922993
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
923994
@@ -1510,7 +1510,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
@@ -930,6 +1001,36 @@ index 56c364e2084..a00a50e020a 100644
9301001
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
9311002
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
9321003
}
1004+
@@ -4454,7 +4455,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
1005+
}
1006+
1007+
test("SPARK-39166: Query context of binary arithmetic should be serialized to executors" +
1008+
- " when WSCG is off") {
1009+
+ " when WSCG is off",
1010+
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
1011+
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
1012+
SQLConf.ANSI_ENABLED.key -> "true") {
1013+
withTable("t") {
1014+
@@ -4475,7 +4477,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
1015+
}
1016+
1017+
test("SPARK-39175: Query context of Cast should be serialized to executors" +
1018+
- " when WSCG is off") {
1019+
+ " when WSCG is off",
1020+
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
1021+
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
1022+
SQLConf.ANSI_ENABLED.key -> "true") {
1023+
withTable("t") {
1024+
@@ -4502,7 +4505,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
1025+
}
1026+
1027+
test("SPARK-39190,SPARK-39208,SPARK-39210: Query context of decimal overflow error should " +
1028+
- "be serialized to executors when WSCG is off") {
1029+
+ "be serialized to executors when WSCG is off",
1030+
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
1031+
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
1032+
SQLConf.ANSI_ENABLED.key -> "true") {
1033+
withTable("t") {
9331034
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
9341035
index 68f14f13bbd..174636cefb5 100644
9351036
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala

native/core/src/execution/jni_api.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
106106
metrics_node: JObject,
107107
comet_task_memory_manager_obj: JObject,
108108
batch_size: jint,
109+
use_unified_memory_manager: jboolean,
110+
memory_limit: jlong,
111+
memory_fraction: jdouble,
109112
debug_native: jboolean,
110113
explain_native: jboolean,
111114
worker_threads: jint,
@@ -147,7 +150,13 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
147150
// We need to keep the session context alive. Some session state like temporary
148151
// dictionaries are stored in session context. If it is dropped, the temporary
149152
// dictionaries will be dropped as well.
150-
let session = prepare_datafusion_session_context(batch_size as usize, task_memory_manager)?;
153+
let session = prepare_datafusion_session_context(
154+
batch_size as usize,
155+
use_unified_memory_manager == 1,
156+
memory_limit as usize,
157+
memory_fraction,
158+
task_memory_manager,
159+
)?;
151160

152161
let plan_creation_time = start.elapsed();
153162

@@ -174,13 +183,22 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
174183
/// Configure DataFusion session context.
175184
fn prepare_datafusion_session_context(
176185
batch_size: usize,
186+
use_unified_memory_manager: bool,
187+
memory_limit: usize,
188+
memory_fraction: f64,
177189
comet_task_memory_manager: Arc<GlobalRef>,
178190
) -> CometResult<SessionContext> {
179191
let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);
180192

181-
// Set Comet memory pool for native
182-
let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
183-
rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
193+
// Check if we are using unified memory manager integrated with Spark.
194+
if use_unified_memory_manager {
195+
// Set Comet memory pool for native
196+
let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
197+
rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
198+
} else {
199+
// Use the memory pool from DF
200+
rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction)
201+
}
184202

185203
// Get Datafusion configuration from Spark Execution context
186204
// can be configured in Comet Spark JVM using Spark --conf parameters

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark._
2323
import org.apache.spark.sql.comet.CometMetricNode
2424
import org.apache.spark.sql.vectorized._
2525

26-
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
26+
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
2727
import org.apache.comet.vector.NativeUtil
2828

2929
/**
@@ -60,13 +60,20 @@ class CometExecIterator(
6060
new CometBatchIterator(iterator, nativeUtil)
6161
}.toArray
6262
private val plan = {
63+
val conf = SparkEnv.get.conf
64+
// Only enable unified memory manager when off-heap mode is enabled. Otherwise,
65+
// we'll use the built-in memory pool from DF, and initializes with `memory_limit`
66+
// and `memory_fraction` below.
6367
nativeLib.createPlan(
6468
id,
6569
cometBatchIterators,
6670
protobufQueryPlan,
6771
nativeMetrics,
6872
new CometTaskMemoryManager(id),
6973
batchSize = COMET_BATCH_SIZE.get(),
74+
use_unified_memory_manager = conf.getBoolean("spark.memory.offHeap.enabled", false),
75+
memory_limit = CometSparkSessionExtensions.getCometMemoryOverhead(conf),
76+
memory_fraction = COMET_EXEC_MEMORY_FRACTION.get(),
7077
debug = COMET_DEBUG_ENABLED.get(),
7178
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
7279
workerThreads = COMET_WORKER_THREADS.get(),

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType}
5353

5454
import org.apache.comet.CometConf._
5555
import org.apache.comet.CometExplainInfo.getActualPlan
56-
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos}
56+
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isOffHeapEnabled, isSpark34Plus, isSpark40Plus, isTesting, shouldApplySparkToColumnar, withInfo, withInfos}
5757
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
5858
import org.apache.comet.rules.RewriteJoin
5959
import org.apache.comet.serde.OperatorOuterClass.Operator
@@ -921,8 +921,9 @@ class CometSparkSessionExtensions
921921
override def apply(plan: SparkPlan): SparkPlan = {
922922

923923
// Comet required off-heap memory to be enabled
924-
if ("true" != conf.getConfString("spark.memory.offHeap.enabled", "false")) {
925-
logInfo("Comet extension disabled because spark.memory.offHeap.enabled=false")
924+
if (!isOffHeapEnabled(conf) && !isTesting) {
925+
logWarning("Comet native exec disabled because spark.memory.offHeap.enabled=false")
926+
withInfo(plan, "Comet native exec disabled because spark.memory.offHeap.enabled=false")
926927
return plan
927928
}
928929

@@ -1174,8 +1175,7 @@ object CometSparkSessionExtensions extends Logging {
11741175
}
11751176

11761177
private[comet] def isOffHeapEnabled(conf: SQLConf): Boolean =
1177-
conf.contains("spark.memory.offHeap.enabled") &&
1178-
conf.getConfString("spark.memory.offHeap.enabled").toBoolean
1178+
conf.getConfString("spark.memory.offHeap.enabled", "false").toBoolean
11791179

11801180
// Copied from org.apache.spark.util.Utils which is private to Spark.
11811181
private[comet] def isTesting: Boolean = {

spark/src/main/scala/org/apache/comet/Native.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,22 @@ class Native extends NativeBase {
4343
* @return
4444
* the address to native query plan.
4545
*/
46+
// scalastyle:off
4647
@native def createPlan(
4748
id: Long,
4849
iterators: Array[CometBatchIterator],
4950
plan: Array[Byte],
5051
metrics: CometMetricNode,
5152
taskMemoryManager: CometTaskMemoryManager,
5253
batchSize: Int,
54+
use_unified_memory_manager: Boolean,
55+
memory_limit: Long,
56+
memory_fraction: Double,
5357
debug: Boolean,
5458
explain: Boolean,
5559
workerThreads: Int,
5660
blockingThreads: Int): Long
61+
// scalastyle:on
5762

5863
/**
5964
* Execute a native query plan based on given input Arrow arrays.

0 commit comments

Comments
 (0)