Skip to content

Commit 75ddcfc

Browse files
authored
chore: Making comet native operators write spill files to spark local dir (#1581)
* Use spark local dirs in comet * Add unit test
1 parent efca991 commit 75ddcfc

File tree

4 files changed

+60
-5
lines changed

4 files changed

+60
-5
lines changed

native/core/src/execution/jni_api.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use jni::{
3838
sys::{jbyteArray, jint, jlong, jlongArray},
3939
JNIEnv,
4040
};
41+
use std::path::PathBuf;
4142
use std::time::{Duration, Instant};
4243
use std::{collections::HashMap, sync::Arc, task::Poll};
4344

@@ -167,6 +168,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
167168
metrics_node: JObject,
168169
metrics_update_interval: jlong,
169170
comet_task_memory_manager_obj: JObject,
171+
local_dirs: jobjectArray,
170172
batch_size: jint,
171173
off_heap_mode: jboolean,
172174
memory_pool_type: jstring,
@@ -208,6 +210,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
208210
let input_source = Arc::new(jni_new_global_ref!(env, input_source)?);
209211
input_sources.push(input_source);
210212
}
213+
214+
// Create DataFusion memory pool
211215
let task_memory_manager =
212216
Arc::new(jni_new_global_ref!(env, comet_task_memory_manager_obj)?);
213217

@@ -221,10 +225,21 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
221225
let memory_pool =
222226
create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id);
223227

228+
// Get local directories for storing spill files
229+
let local_dirs_array = JObjectArray::from_raw(local_dirs);
230+
let num_local_dirs = env.get_array_length(&local_dirs_array)?;
231+
let mut local_dirs = vec![];
232+
for i in 0..num_local_dirs {
233+
let local_dir: JString = env.get_object_array_element(&local_dirs_array, i)?.into();
234+
let local_dir = env.get_string(&local_dir)?;
235+
local_dirs.push(local_dir.into());
236+
}
237+
224238
// We need to keep the session context alive. Some session state like temporary
225239
// dictionaries are stored in session context. If it is dropped, the temporary
226240
// dictionaries will be dropped as well.
227-
let session = prepare_datafusion_session_context(batch_size as usize, memory_pool)?;
241+
let session =
242+
prepare_datafusion_session_context(batch_size as usize, memory_pool, local_dirs)?;
228243

229244
let plan_creation_time = start.elapsed();
230245

@@ -262,8 +277,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
262277
fn prepare_datafusion_session_context(
263278
batch_size: usize,
264279
memory_pool: Arc<dyn MemoryPool>,
280+
local_dirs: Vec<String>,
265281
) -> CometResult<SessionContext> {
266-
let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager(DiskManagerConfig::NewOs);
282+
let disk_manager_config =
283+
DiskManagerConfig::NewSpecified(local_dirs.into_iter().map(PathBuf::from).collect());
284+
let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager(disk_manager_config);
267285
rt_config = rt_config.with_memory_pool(memory_pool);
268286

269287
// Get Datafusion configuration from Spark Execution context

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class CometExecIterator(
6464
}.toArray
6565
private val plan = {
6666
val conf = SparkEnv.get.conf
67+
val localDiskDirs = SparkEnv.get.blockManager.getLocalDiskDirs
6768

6869
val offHeapMode = CometSparkSessionExtensions.isOffHeapEnabled(conf)
6970
val memoryLimit = if (offHeapMode) {
@@ -83,6 +84,7 @@ class CometExecIterator(
8384
nativeMetrics,
8485
metricsUpdateInterval = COMET_METRICS_UPDATE_INTERVAL.get(),
8586
new CometTaskMemoryManager(id),
87+
localDiskDirs,
8688
batchSize = COMET_BATCH_SIZE.get(),
8789
offHeapMode,
8890
memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE.get(),

spark/src/main/scala/org/apache/comet/Native.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class Native extends NativeBase {
5858
metrics: CometMetricNode,
5959
metricsUpdateInterval: Long,
6060
taskMemoryManager: CometTaskMemoryManager,
61+
localDirs: Array[String],
6162
batchSize: Int,
6263
offHeapMode: Boolean,
6364
memoryPoolType: String,

spark/src/test/scala/org/apache/comet/CometNativeSuite.scala

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package org.apache.comet
2121

22-
import org.apache.spark.SparkException
22+
import org.apache.spark.{SparkEnv, SparkException}
2323
import org.apache.spark.sql.CometTestBase
2424
import org.apache.spark.sql.catalyst.expressions.PrettyAttribute
2525
import org.apache.spark.sql.comet.{CometExec, CometExecUtils}
@@ -40,8 +40,11 @@ class CometNativeSuite extends CometTestBase {
4040
limitOp,
4141
1,
4242
0)
43-
cometIter.next()
44-
cometIter.close()
43+
try {
44+
cometIter.next()
45+
} finally {
46+
cometIter.close()
47+
}
4548
value
4649
}
4750

@@ -63,4 +66,35 @@ class CometNativeSuite extends CometTestBase {
6366
}
6467
assert(exception2.getMessage contains "null context handle")
6568
}
69+
70+
test("Comet native should use spark local dir as temp dir") {
71+
withParquetTable((0 until 100000).map(i => (i, i + 1)), "table") {
72+
val dirs = SparkEnv.get.blockManager.getLocalDiskDirs
73+
dirs.foreach { dir =>
74+
val files = new java.io.File(dir).listFiles()
75+
assert(!files.exists(f => f.isDirectory && f.getName.startsWith("datafusion-")))
76+
}
77+
78+
// Check if the DataFusion temporary dir exists in the Spark local dirs when a spark job involving
79+
// Comet native operator is running.
80+
val observedDataFusionDir = spark
81+
.table("table")
82+
.selectExpr("_1 + _2 as value")
83+
.rdd
84+
.mapPartitions { _ =>
85+
dirs.map { dir =>
86+
val files = new java.io.File(dir).listFiles()
87+
files.count(f => f.isDirectory && f.getName.startsWith("datafusion-"))
88+
}.iterator
89+
}
90+
.sum()
91+
assert(observedDataFusionDir > 0)
92+
93+
// DataFusion temporary dir should be cleaned up after the job is done.
94+
dirs.foreach { dir =>
95+
val files = new java.io.File(dir).listFiles()
96+
assert(!files.exists(f => f.isDirectory && f.getName.startsWith("datafusion-")))
97+
}
98+
}
99+
}
66100
}

0 commit comments

Comments
 (0)