-
Notifications
You must be signed in to change notification settings - Fork 249
chore: Add memory reservation debug logging and visualization #2521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 18 commits
7252605
d084cfa
4837935
ad9c9b8
f3bb412
13f14d3
78f5b4f
5a39d3b
dc11515
d2a1ab1
31cdbc6
ffb1f71
322b4c5
89e10ac
3b191fd
7c24836
405f5b7
522238d
36565ca
21189a6
dfa2c67
d9817ce
acba7bc
4051d29
df69875
ad891a0
8756256
7eb1bc1
21bd386
ec823c2
a66fa65
12db37f
2fb336e
706f5e7
4faf881
d91abda
f6128b5
7d40ac2
c495897
06814b7
e51751f
75e727f
e844287
2884ed3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -78,6 +78,11 @@ use crate::execution::spark_plan::SparkPlan; | |||||
|
|
||||||
| use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace}; | ||||||
|
|
||||||
| use crate::execution::memory_pools::logging_pool::LoggingPool; | ||||||
| use crate::execution::spark_config::{ | ||||||
| SparkConfig, COMET_DEBUG_ENABLED, COMET_DEBUG_MEMORY, COMET_EXPLAIN_NATIVE_ENABLED, | ||||||
| COMET_TRACING_ENABLED, | ||||||
| }; | ||||||
| use datafusion_comet_proto::spark_operator::operator::OpStruct; | ||||||
| use log::info; | ||||||
| use once_cell::sync::Lazy; | ||||||
|
|
@@ -167,12 +172,21 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( | |||||
| memory_limit: jlong, | ||||||
| memory_limit_per_task: jlong, | ||||||
| task_attempt_id: jlong, | ||||||
| debug_native: jboolean, | ||||||
| explain_native: jboolean, | ||||||
| tracing_enabled: jboolean, | ||||||
| ) -> jlong { | ||||||
| try_unwrap_or_throw(&e, |mut env| { | ||||||
| with_trace("createPlan", tracing_enabled != JNI_FALSE, || { | ||||||
| // Deserialize Spark configs | ||||||
| let array = unsafe { JPrimitiveArray::from_raw(serialized_spark_configs) }; | ||||||
| let bytes = env.convert_byte_array(array)?; | ||||||
| let spark_configs = serde::deserialize_config(bytes.as_slice())?; | ||||||
| let spark_config: HashMap<String, String> = spark_configs.entries.into_iter().collect(); | ||||||
|
|
||||||
| // Access Comet configs | ||||||
| let debug_native = spark_config.get_bool(COMET_DEBUG_ENABLED); | ||||||
| let explain_native = spark_config.get_bool(COMET_EXPLAIN_NATIVE_ENABLED); | ||||||
| let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED); | ||||||
| let logging_memory_pool = spark_config.get_bool(COMET_DEBUG_MEMORY); | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
|
||||||
| with_trace("createPlan", tracing_enabled, || { | ||||||
| // Init JVM classes | ||||||
| JVMClasses::init(&mut env); | ||||||
|
|
||||||
|
|
@@ -183,15 +197,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( | |||||
| let bytes = env.convert_byte_array(array)?; | ||||||
| let spark_plan = serde::deserialize_op(bytes.as_slice())?; | ||||||
|
|
||||||
| // Deserialize Spark configs | ||||||
| let array = unsafe { JPrimitiveArray::from_raw(serialized_spark_configs) }; | ||||||
| let bytes = env.convert_byte_array(array)?; | ||||||
| let spark_configs = serde::deserialize_config(bytes.as_slice())?; | ||||||
|
|
||||||
| // Convert Spark configs to HashMap | ||||||
| let _spark_config_map: HashMap<String, String> = | ||||||
| spark_configs.entries.into_iter().collect(); | ||||||
|
|
||||||
| let metrics = Arc::new(jni_new_global_ref!(env, metrics_node)?); | ||||||
|
|
||||||
| // Get the global references of input sources | ||||||
|
|
@@ -218,6 +223,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( | |||||
| let memory_pool = | ||||||
| create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id); | ||||||
|
|
||||||
| let memory_pool = if logging_memory_pool { | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| Arc::new(LoggingPool::new(task_attempt_id as u64, memory_pool)) | ||||||
| } else { | ||||||
| memory_pool | ||||||
| }; | ||||||
|
|
||||||
| // Get local directories for storing spill files | ||||||
| let local_dirs_array = JObjectArray::from_raw(local_dirs); | ||||||
| let num_local_dirs = env.get_array_length(&local_dirs_array)?; | ||||||
|
|
@@ -256,10 +267,10 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( | |||||
| metrics_last_update_time: Instant::now(), | ||||||
| plan_creation_time, | ||||||
| session_ctx: Arc::new(session), | ||||||
| debug_native: debug_native == 1, | ||||||
| explain_native: explain_native == 1, | ||||||
| debug_native, | ||||||
| explain_native, | ||||||
| memory_pool_config, | ||||||
| tracing_enabled: tracing_enabled != JNI_FALSE, | ||||||
| tracing_enabled, | ||||||
| }); | ||||||
|
|
||||||
| Ok(Box::into_raw(exec_context) as i64) | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use datafusion::execution::memory_pool::{MemoryPool, MemoryReservation}; | ||
| use std::sync::Arc; | ||
|
|
||
| #[derive(Debug)] | ||
| pub(crate) struct LoggingPool { | ||
| task_attempt_id: u64, | ||
| pool: Arc<dyn MemoryPool>, | ||
| } | ||
|
|
||
| impl LoggingPool { | ||
| pub fn new(task_attempt_id: u64, pool: Arc<dyn MemoryPool>) -> Self { | ||
| Self { | ||
| task_attempt_id, | ||
| pool, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl MemoryPool for LoggingPool { | ||
| fn grow(&self, reservation: &MemoryReservation, additional: usize) { | ||
| println!( | ||
|
||
| "[Task {}] MemoryPool[{}].grow({})", | ||
parthchandra marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self.task_attempt_id, | ||
| reservation.consumer().name(), | ||
| reservation.size() | ||
| ); | ||
| self.pool.grow(reservation, additional); | ||
| } | ||
|
|
||
| fn shrink(&self, reservation: &MemoryReservation, shrink: usize) { | ||
| println!( | ||
| "[Task {}] MemoryPool[{}].shrink({})", | ||
| self.task_attempt_id, | ||
| reservation.consumer().name(), | ||
| reservation.size() | ||
| ); | ||
| self.pool.shrink(reservation, shrink); | ||
| } | ||
|
|
||
| fn try_grow( | ||
| &self, | ||
| reservation: &MemoryReservation, | ||
| additional: usize, | ||
| ) -> datafusion::common::Result<()> { | ||
| let result = self.pool.try_grow(reservation, additional); | ||
| if result.is_ok() { | ||
| println!( | ||
| "[Task {}] MemoryPool[{}].try_grow({}) returning Ok", | ||
| self.task_attempt_id, | ||
| reservation.consumer().name(), | ||
| reservation.size() | ||
| ); | ||
| } else { | ||
| println!( | ||
| "[Task {}] MemoryPool[{}].try_grow({}) returning Err", | ||
| self.task_attempt_id, | ||
| reservation.consumer().name(), | ||
| reservation.size() | ||
| ); | ||
| } | ||
| result | ||
| } | ||
|
|
||
| fn reserved(&self) -> usize { | ||
| self.pool.reserved() | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
|
|
||
| mod config; | ||
| mod fair_pool; | ||
| pub mod logging_pool; | ||
| mod task_shared; | ||
| mod unified_pool; | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use std::collections::HashMap; | ||
|
|
||
| pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled"; | ||
| pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled"; | ||
| pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled"; | ||
|
|
||
| pub(crate) const COMET_DEBUG_MEMORY: &str = "spark.comet.debug.memory"; | ||
|
|
||
| pub(crate) trait SparkConfig { | ||
| fn get_bool(&self, name: &str) -> bool; | ||
| } | ||
|
|
||
| impl SparkConfig for HashMap<String, String> { | ||
| fn get_bool(&self, name: &str) -> bool { | ||
| self.get(name) | ||
| .and_then(|str_val| str_val.parse::<bool>().ok()) | ||
| .unwrap_or(false) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than adding yet another flag to this API call, I am now using the already available spark config map in native code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. The config map should be the preferred method