Skip to content

Commit 5f38e2e

Browse files
andygroveclaude
andcommitted
chore: Add memory reservation debug logging
Add a new `spark.comet.debug.memory` config that wraps the native memory pool in a LoggingPool decorator, logging all grow/shrink/try_grow calls with task ID and consumer name. This helps diagnose memory reservation issues in production environments. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 10d0c8d commit 5f38e2e

File tree

6 files changed

+151
-3
lines changed

6 files changed

+151
-3
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ object CometConf extends ShimCometConf {
5454
private val TRACING_GUIDE = "For more information, refer to the Comet Tracing " +
5555
"Guide (https://datafusion.apache.org/comet/contributor-guide/tracing.html)"
5656

57+
private val DEBUGGING_GUIDE = "For more information, refer to the Comet Debugging " +
58+
"Guide (https://datafusion.apache.org/comet/contributor-guide/debugging.html)"
59+
5760
/** List of all configs that is used for generating documentation */
5861
val allConfs = new ListBuffer[ConfigEntry[_]]
5962

@@ -549,6 +552,13 @@ object CometConf extends ShimCometConf {
549552
.booleanConf
550553
.createWithDefault(false)
551554

555+
val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] =
556+
conf(s"$COMET_PREFIX.debug.memory")
557+
.category(CATEGORY_TESTING)
558+
.doc(s"When enabled, log all native memory pool interactions. $DEBUGGING_GUIDE.")
559+
.booleanConf
560+
.createWithDefault(false)
561+
552562
val COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE = "verbose"
553563
val COMET_EXTENDED_EXPLAIN_FORMAT_FALLBACK = "fallback"
554564

docs/source/contributor-guide/debugging.md

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ To build Comet with this feature enabled:
127127
make release COMET_FEATURES=backtrace
128128
```
129129

130-
Start Comet with `RUST_BACKTRACE=1`
130+
Set `RUST_BACKTRACE=1` for the Spark worker/executor process, or for `spark-submit` if running in local mode.
131131

132132
```console
133133
RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars spark/target/comet-spark-spark3.5_2.12-$COMET_VERSION.jar --conf spark.plugins=org.apache.spark.CometPlugin --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true
@@ -188,3 +188,29 @@ This produces output like the following:
188188

189189
Additionally, you can place a `log4rs.yaml` configuration file inside the Comet configuration directory specified by the `COMET_CONF_DIR` environment variable to enable more advanced logging configurations. This file uses the [log4rs YAML configuration format](https://docs.rs/log4rs/latest/log4rs/#configuration-via-a-yaml-file).
190190
For example, see: [log4rs.yaml](https://github.com/apache/datafusion-comet/blob/main/conf/log4rs.yaml).
191+
192+
### Debugging Memory Reservations
193+
194+
Set `spark.comet.debug.memory=true` to log all calls that grow or shrink memory reservations.
195+
196+
Example log output:
197+
198+
```
199+
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256232960) returning Ok
200+
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256375168) returning Ok
201+
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256899456) returning Ok
202+
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257296128) returning Ok
203+
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257820416) returning Err
204+
[Task 486] MemoryPool[ExternalSorterMerge[6]].shrink(10485760)
205+
[Task 486] MemoryPool[ExternalSorter[6]].shrink(150464)
206+
[Task 486] MemoryPool[ExternalSorter[6]].shrink(146688)
207+
[Task 486] MemoryPool[ExternalSorter[6]].shrink(137856)
208+
[Task 486] MemoryPool[ExternalSorter[6]].shrink(141952)
209+
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
210+
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
211+
[Task 486] MemoryPool[ExternalSorter[6]].shrink(524288)
212+
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
213+
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(68928) returning Ok
214+
```
215+
216+
When backtraces are enabled (see earlier section) then backtraces will be included for failed allocations.

native/core/src/execution/jni_api.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,10 @@ use crate::execution::spark_plan::SparkPlan;
8282

8383
use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace};
8484

85+
use crate::execution::memory_pools::logging_pool::LoggingPool;
8586
use crate::execution::spark_config::{
86-
SparkConfig, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_MAX_TEMP_DIRECTORY_SIZE,
87-
COMET_TRACING_ENABLED,
87+
SparkConfig, COMET_DEBUG_ENABLED, COMET_DEBUG_MEMORY, COMET_EXPLAIN_NATIVE_ENABLED,
88+
COMET_MAX_TEMP_DIRECTORY_SIZE, COMET_TRACING_ENABLED,
8889
};
8990
use crate::parquet::encryption_support::{CometEncryptionFactory, ENCRYPTION_FACTORY_ID};
9091
use datafusion_comet_proto::spark_operator::operator::OpStruct;
@@ -193,6 +194,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
193194
let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED);
194195
let max_temp_directory_size =
195196
spark_config.get_u64(COMET_MAX_TEMP_DIRECTORY_SIZE, 100 * 1024 * 1024 * 1024);
197+
let logging_memory_pool = spark_config.get_bool(COMET_DEBUG_MEMORY);
196198

197199
with_trace("createPlan", tracing_enabled, || {
198200
// Init JVM classes
@@ -229,6 +231,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
229231
let memory_pool =
230232
create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id);
231233

234+
let memory_pool = if logging_memory_pool {
235+
Arc::new(LoggingPool::new(task_attempt_id as u64, memory_pool))
236+
} else {
237+
memory_pool
238+
};
239+
232240
// Get local directories for storing spill files
233241
let num_local_dirs = env.get_array_length(&local_dirs)?;
234242
let mut local_dirs_vec = vec![];
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion::execution::memory_pool::{
19+
MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
20+
};
21+
use log::info;
22+
use std::sync::Arc;
23+
24+
#[derive(Debug)]
25+
pub(crate) struct LoggingPool {
26+
task_attempt_id: u64,
27+
pool: Arc<dyn MemoryPool>,
28+
}
29+
30+
impl LoggingPool {
31+
pub fn new(task_attempt_id: u64, pool: Arc<dyn MemoryPool>) -> Self {
32+
Self {
33+
task_attempt_id,
34+
pool,
35+
}
36+
}
37+
}
38+
39+
impl MemoryPool for LoggingPool {
40+
fn register(&self, consumer: &MemoryConsumer) {
41+
self.pool.register(consumer)
42+
}
43+
44+
fn unregister(&self, consumer: &MemoryConsumer) {
45+
self.pool.unregister(consumer)
46+
}
47+
48+
fn grow(&self, reservation: &MemoryReservation, additional: usize) {
49+
info!(
50+
"[Task {}] MemoryPool[{}].grow({})",
51+
self.task_attempt_id,
52+
reservation.consumer().name(),
53+
additional
54+
);
55+
self.pool.grow(reservation, additional);
56+
}
57+
58+
fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
59+
info!(
60+
"[Task {}] MemoryPool[{}].shrink({})",
61+
self.task_attempt_id,
62+
reservation.consumer().name(),
63+
shrink
64+
);
65+
self.pool.shrink(reservation, shrink);
66+
}
67+
68+
fn try_grow(
69+
&self,
70+
reservation: &MemoryReservation,
71+
additional: usize,
72+
) -> datafusion::common::Result<()> {
73+
match self.pool.try_grow(reservation, additional) {
74+
Ok(_) => {
75+
info!(
76+
"[Task {}] MemoryPool[{}].try_grow({}) returning Ok",
77+
self.task_attempt_id,
78+
reservation.consumer().name(),
79+
additional
80+
);
81+
Ok(())
82+
}
83+
Err(e) => {
84+
info!(
85+
"[Task {}] MemoryPool[{}].try_grow({}) returning Err: {e:?}",
86+
self.task_attempt_id,
87+
reservation.consumer().name(),
88+
additional
89+
);
90+
Err(e)
91+
}
92+
}
93+
}
94+
95+
fn reserved(&self) -> usize {
96+
self.pool.reserved()
97+
}
98+
99+
fn memory_limit(&self) -> MemoryLimit {
100+
self.pool.memory_limit()
101+
}
102+
}

native/core/src/execution/memory_pools/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
mod config;
1919
mod fair_pool;
20+
pub mod logging_pool;
2021
mod task_shared;
2122
mod unified_pool;
2223

native/core/src/execution/spark_config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled";
2121
pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled";
2222
pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled";
2323
pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = "spark.comet.maxTempDirectorySize";
24+
pub(crate) const COMET_DEBUG_MEMORY: &str = "spark.comet.debug.memory";
2425

2526
pub(crate) trait SparkConfig {
2627
fn get_bool(&self, name: &str) -> bool;

0 commit comments

Comments
 (0)