Skip to content

Commit ecce61e

Browse files
committed
save
1 parent 43e83c2 commit ecce61e

File tree

3 files changed

+26
-19
lines changed

3 files changed

+26
-19
lines changed

native/core/src/execution/memory_pools/config.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use crate::errors::{CometError, CometResult};
2121
pub(crate) enum MemoryPoolType {
2222
GreedyUnified,
2323
FairUnified,
24-
FairUnifiedGlobal,
2524
Greedy,
2625
FairSpill,
2726
GreedyTaskShared,
@@ -65,7 +64,7 @@ pub(crate) fn parse_memory_pool_config(
6564
let memory_pool_config = if off_heap_mode {
6665
match memory_pool_type.as_str() {
6766
"fair_unified_global" => {
68-
MemoryPoolConfig::new(MemoryPoolType::FairUnifiedGlobal, pool_size_global)
67+
MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size_global)
6968
}
7069
"fair_unified" => {
7170
MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size_per_task)

native/core/src/execution/memory_pools/fair_pool.rs

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use parking_lot::Mutex;
3636

3737
/// A DataFusion fair `MemoryPool` implementation for Comet. Internally this is
3838
/// implemented via delegating calls to [`crate::jvm_bridge::CometTaskMemoryManager`].
39-
pub struct CometFairMemoryPool {
39+
pub struct CometFairUnifiedMemoryPool {
4040
task_memory_manager_handle: Arc<GlobalRef>,
4141
pool_size: usize,
4242
state: Mutex<CometFairPoolState>,
@@ -47,7 +47,7 @@ struct CometFairPoolState {
4747
num: usize,
4848
}
4949

50-
impl Debug for CometFairMemoryPool {
50+
impl Debug for CometFairUnifiedMemoryPool {
5151
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
5252
let state = self.state.lock();
5353
f.debug_struct("CometFairMemoryPool")
@@ -58,19 +58,19 @@ impl Debug for CometFairMemoryPool {
5858
}
5959
}
6060

61-
impl CometFairMemoryPool {
61+
impl CometFairUnifiedMemoryPool {
6262
pub fn new(
6363
task_memory_manager_handle: Arc<GlobalRef>,
6464
pool_size: usize,
65-
) -> CometFairMemoryPool {
65+
) -> CometFairUnifiedMemoryPool {
6666
Self {
6767
task_memory_manager_handle,
6868
pool_size,
6969
state: Mutex::new(CometFairPoolState { used: 0, num: 0 }),
7070
}
7171
}
7272

73-
fn acquire(&self, additional: usize) -> CometResult<i64> {
73+
fn acquire_from_spark(&self, additional: usize) -> CometResult<i64> {
7474
let mut env = JVMClasses::get_env()?;
7575
let handle = self.task_memory_manager_handle.as_obj();
7676
unsafe {
@@ -79,7 +79,7 @@ impl CometFairMemoryPool {
7979
}
8080
}
8181

82-
fn release(&self, size: usize) -> CometResult<()> {
82+
fn release_to_spark(&self, size: usize) -> CometResult<()> {
8383
let mut env = JVMClasses::get_env()?;
8484
let handle = self.task_memory_manager_handle.as_obj();
8585
unsafe {
@@ -88,10 +88,10 @@ impl CometFairMemoryPool {
8888
}
8989
}
9090

91-
unsafe impl Send for CometFairMemoryPool {}
92-
unsafe impl Sync for CometFairMemoryPool {}
91+
unsafe impl Send for CometFairUnifiedMemoryPool {}
92+
unsafe impl Sync for CometFairUnifiedMemoryPool {}
9393

94-
impl MemoryPool for CometFairMemoryPool {
94+
impl MemoryPool for CometFairUnifiedMemoryPool {
9595
fn register(&self, _: &MemoryConsumer) {
9696
let mut state = self.state.lock();
9797
state.num = state
@@ -119,7 +119,7 @@ impl MemoryPool for CometFairMemoryPool {
119119
if size < subtractive {
120120
panic!("Failed to release {subtractive} bytes where only {size} bytes reserved")
121121
}
122-
self.release(subtractive)
122+
self.release_to_spark(subtractive)
123123
.unwrap_or_else(|_| panic!("Failed to release {subtractive} bytes"));
124124
state.used = state.used.checked_sub(subtractive).unwrap();
125125
}
@@ -133,20 +133,23 @@ impl MemoryPool for CometFairMemoryPool {
133133
if additional > 0 {
134134
let mut state = self.state.lock();
135135
let num = state.num;
136-
let limit = self.pool_size.checked_div(num).unwrap();
136+
let limit = self
137+
.pool_size
138+
.checked_div(num)
139+
.expect("overflow in checked_div");
137140
let size = reservation.size();
138141
if limit < size + additional {
139142
return resources_err!(
140143
"Failed to acquire {additional} bytes where {size} bytes already reserved and the fair limit is {limit} bytes, {num} registered"
141144
);
142145
}
143146

144-
let acquired = self.acquire(additional)?;
147+
let acquired = self.acquire_from_spark(additional)?;
145148
// If the number of bytes we acquired is less than the requested, return an error,
146149
// and hopefully will trigger spilling from the caller side.
147150
if acquired < additional as i64 {
148151
// Release the acquired bytes before throwing error
149-
self.release(acquired as usize)?;
152+
self.release_to_spark(acquired as usize)?;
150153

151154
return resources_err!(
152155
"Failed to acquire {} bytes, only got {} bytes. Reserved: {} bytes",
@@ -155,7 +158,10 @@ impl MemoryPool for CometFairMemoryPool {
155158
state.used
156159
);
157160
}
158-
state.used = state.used.checked_add(additional).unwrap();
161+
state.used = state
162+
.used
163+
.checked_add(additional)
164+
.expect("overflow in checked_add");
159165
}
160166
Ok(())
161167
}

native/core/src/execution/memory_pools/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ mod unified_pool;
2323
use datafusion::execution::memory_pool::{
2424
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
2525
};
26-
use fair_pool::CometFairMemoryPool;
26+
use fair_pool::CometFairUnifiedMemoryPool;
2727
use jni::objects::GlobalRef;
2828
use once_cell::sync::OnceCell;
2929
use std::num::NonZeroUsize;
@@ -51,8 +51,10 @@ pub(crate) fn create_memory_pool(
5151
}
5252
MemoryPoolType::FairUnified => {
5353
// Set Comet fair memory pool for native
54-
let memory_pool =
55-
CometFairMemoryPool::new(comet_task_memory_manager, memory_pool_config.pool_size);
54+
let memory_pool = CometFairUnifiedMemoryPool::new(
55+
comet_task_memory_manager,
56+
memory_pool_config.pool_size,
57+
);
5658
Arc::new(TrackConsumersPool::new(
5759
memory_pool,
5860
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),

0 commit comments

Comments
 (0)