Skip to content

Commit 8381108

Browse files
authored
feat: Make DiskManager max_temp_directory_size configurable (#2479)
1 parent 6d5bdbe commit 8381108

File tree

5 files changed

+22
-5
lines changed

5 files changed

+22
-5
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,12 @@ object CometConf extends ShimCometConf {
651651
.stringConf
652652
.createOptional
653653

654+
val COMET_MAX_TEMP_DIRECTORY_SIZE: ConfigEntry[Long] =
655+
conf("spark.comet.maxTempDirectorySize")
656+
.doc("The maximum amount of data (in bytes) stored inside the temporary directories.")
657+
.bytesConf(ByteUnit.BYTE)
658+
.createWithDefault(100L * 1024 * 1024 * 1024) // 100 GB
659+
654660
/** Create a config to enable a specific operator */
655661
private def createExecEnabledConfig(
656662
exec: String,

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ Comet provides the following configuration settings.
6767
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
6868
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
6969
| spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |
70+
| spark.comet.maxTempDirectorySize | The maximum amount of data (in bytes) stored inside the temporary directories. | 107374182400b |
7071
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional memory for Comet when running Spark in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 0.2 |
7172
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 402653184b |
7273
| spark.comet.memoryOverhead | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. This config is optional. If this is not specified, it will be set to `spark.comet.memory.overhead.factor` * `spark.executor.memory`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | |

native/core/src/execution/jni_api.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
170170
debug_native: jboolean,
171171
explain_native: jboolean,
172172
tracing_enabled: jboolean,
173+
max_temp_directory_size: jlong,
173174
) -> jlong {
174175
try_unwrap_or_throw(&e, |mut env| {
175176
with_trace("createPlan", tracing_enabled != JNI_FALSE, || {
@@ -231,8 +232,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
231232
// We need to keep the session context alive. Some session state like temporary
232233
// dictionaries are stored in session context. If it is dropped, the temporary
233234
// dictionaries will be dropped as well.
234-
let session =
235-
prepare_datafusion_session_context(batch_size as usize, memory_pool, local_dirs)?;
235+
let session = prepare_datafusion_session_context(
236+
batch_size as usize,
237+
memory_pool,
238+
local_dirs,
239+
max_temp_directory_size as u64,
240+
)?;
236241

237242
let plan_creation_time = start.elapsed();
238243

@@ -272,9 +277,12 @@ fn prepare_datafusion_session_context(
272277
batch_size: usize,
273278
memory_pool: Arc<dyn MemoryPool>,
274279
local_dirs: Vec<String>,
280+
max_temp_directory_size: u64,
275281
) -> CometResult<SessionContext> {
276282
let paths = local_dirs.into_iter().map(PathBuf::from).collect();
277-
let disk_manager = DiskManagerBuilder::default().with_mode(DiskManagerMode::Directories(paths));
283+
let disk_manager = DiskManagerBuilder::default()
284+
.with_mode(DiskManagerMode::Directories(paths))
285+
.with_max_temp_directory_size(max_temp_directory_size);
278286
let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager_builder(disk_manager);
279287
rt_config = rt_config.with_memory_pool(memory_pool);
280288

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ class CometExecIterator(
120120
taskAttemptId,
121121
debug = COMET_DEBUG_ENABLED.get(),
122122
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
123-
tracingEnabled)
123+
tracingEnabled,
124+
maxTempDirectorySize = CometConf.COMET_MAX_TEMP_DIRECTORY_SIZE.get())
124125
}
125126

126127
private var nextBatch: Option[ColumnarBatch] = None

spark/src/main/scala/org/apache/comet/Native.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ class Native extends NativeBase {
6868
taskAttemptId: Long,
6969
debug: Boolean,
7070
explain: Boolean,
71-
tracingEnabled: Boolean): Long
71+
tracingEnabled: Boolean,
72+
maxTempDirectorySize: Long): Long
7273
// scalastyle:on
7374

7475
/**

0 commit comments

Comments
 (0)