Skip to content

Commit 1ead58b

Browse files
committed
Try arrow_pool approach
1 parent 769f367 commit 1ead58b

File tree

19 files changed

+190
-41
lines changed

19 files changed

+190
-41
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/expr-common/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ name = "datafusion_expr_common"
4242

4343
[dependencies]
4444
arrow = { workspace = true }
45-
datafusion-common = { workspace = true }
4645
indexmap = { workspace = true }
46+
datafusion-common = { workspace = true }
4747
itertools = { workspace = true }
4848
paste = { workspace = true }
49+
arrow-buffer = { workspace = true, features = ["pool"] }

datafusion/expr-common/src/accumulator.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! Accumulator module contains the trait definition for aggregation function's accumulators.
1919
2020
use arrow::array::ArrayRef;
21+
use arrow_buffer::MemoryPool;
2122
use datafusion_common::{internal_err, Result, ScalarValue};
2223
use std::fmt::Debug;
2324

@@ -71,16 +72,34 @@ pub trait Accumulator: Send + Sync + Debug {
7172
/// when possible (for example distinct strings)
7273
fn evaluate(&mut self) -> Result<ScalarValue>;
7374

74-
/// Returns the allocated size required for this accumulator, in
75-
/// bytes, including `Self`.
75+
/// Returns the size of non-Arrow allocations in bytes, including `Self`.
7676
///
7777
/// This value is used to calculate the memory used during
7878
/// execution so DataFusion can stay within its allotted limit.
7979
///
80+
/// This includes Vec capacity, BufferBuilder capacity, and other
81+
/// non-Arrow data structures. Arrow Buffer memory should be tracked
82+
/// separately via [`claim_buffers`].
83+
///
8084
/// "Allocated" means that for internal containers such as `Vec`,
8185
/// the `capacity` should be used not the `len`.
86+
///
87+
/// [`claim_buffers`]: Self::claim_buffers
8288
fn size(&self) -> usize;
8389

90+
/// Claim Arrow buffers with the memory pool for accurate tracking.
91+
///
92+
/// This method should be called to register Arrow Buffer instances
93+
/// with the pool, enabling automatic deduplication of shared buffers.
94+
///
95+
/// # Default Implementation
96+
///
97+
/// The default implementation does nothing, which is appropriate for
98+
/// accumulators that don't store Arrow arrays (e.g., simple numeric accumulators).
99+
fn claim_buffers(&self, _pool: &dyn MemoryPool) {
100+
// Default: no-op for accumulators without Arrow buffers
101+
}
102+
84103
/// Returns the intermediate state of the accumulator, consuming the
85104
/// intermediate state.
86105
///

datafusion/expr-common/src/groups_accumulator.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! Vectorized [`GroupsAccumulator`]
1919
2020
use arrow::array::{ArrayRef, BooleanArray};
21+
use arrow_buffer::MemoryPool as ArrowMemoryPool;
2122
use datafusion_common::{not_impl_err, Result};
2223

2324
/// Describes how many rows should be emitted during grouping.
@@ -246,8 +247,33 @@ pub trait GroupsAccumulator: Send {
246247

247248
/// Amount of memory used to store the state of this accumulator,
248249
/// in bytes.
250+
/// Returns the size of non-Arrow allocations in bytes.
251+
///
252+
/// This includes Vec capacity, BufferBuilder capacity, and other
253+
/// non-Arrow data structures. Arrow Buffer memory should be tracked
254+
/// separately via [`claim_buffers`].
249255
///
250256
/// This function is called once per batch, so it should be `O(n)` to
251257
/// compute, not `O(num_groups)`
258+
///
259+
/// [`claim_buffers`]: GroupsAccumulator::claim_buffers
252260
fn size(&self) -> usize;
261+
262+
/// Claims all internal Arrow buffers with the provided memory pool.
263+
///
264+
/// This method should call `.claim()` on all Arrow `Buffer`s held by
265+
/// this accumulator to enable accurate memory tracking that automatically
266+
/// deduplicates shared buffers.
267+
///
268+
/// # Arguments
269+
///
270+
/// * `pool` - Arrow memory pool to claim buffers with
271+
///
272+
/// # Default Implementation
273+
///
274+
/// The default implementation does nothing, which is appropriate for
275+
/// accumulators that don't store Arrow arrays (e.g., simple numeric accumulators).
276+
fn claim_buffers(&self, _pool: &dyn ArrowMemoryPool) {
277+
// Default: no-op for accumulators without Arrow buffers
278+
}
253279
}

datafusion/functions-aggregate-common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ name = "datafusion_functions_aggregate_common"
4343
[dependencies]
4444
ahash = { workspace = true }
4545
arrow = { workspace = true }
46+
arrow-buffer = { workspace = true, features = ["pool"] }
4647
datafusion-common = { workspace = true }
4748
datafusion-expr-common = { workspace = true }
4849
datafusion-physical-expr-common = { workspace = true }

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use arrow::{
3232
compute::take_arrays,
3333
datatypes::UInt32Type,
3434
};
35+
use arrow_buffer::MemoryPool;
3536
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
3637
use datafusion_expr_common::accumulator::Accumulator;
3738
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
@@ -395,6 +396,13 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
395396
self.allocation_bytes
396397
}
397398

399+
fn claim_buffers(&self, pool: &dyn MemoryPool) {
400+
// Delegate to each wrapped Accumulator
401+
for state in &self.states {
402+
state.accumulator.claim_buffers(pool);
403+
}
404+
}
405+
398406
fn convert_to_state(
399407
&self,
400408
values: &[ArrayRef],

datafusion/functions-aggregate/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ name = "datafusion_functions_aggregate"
4343
[dependencies]
4444
ahash = { workspace = true }
4545
arrow = { workspace = true }
46+
arrow-buffer = { workspace = true, features = ["pool"] }
4647
datafusion-common = { workspace = true }
4748
datafusion-doc = { workspace = true }
4849
datafusion-execution = { workspace = true }

datafusion/functions-aggregate/src/array_agg.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use arrow::array::{
2727
};
2828
use arrow::compute::{filter, SortOptions};
2929
use arrow::datatypes::{DataType, Field, FieldRef, Fields};
30+
use arrow_buffer::MemoryPool;
3031

3132
use datafusion_common::cast::as_list_array;
3233
use datafusion_common::utils::{
@@ -391,25 +392,17 @@ impl Accumulator for ArrayAggAccumulator {
391392
fn size(&self) -> usize {
392393
size_of_val(self)
393394
+ (size_of::<ArrayRef>() * self.values.capacity())
394-
+ self
395-
.values
396-
.iter()
397-
// Each ArrayRef might be just a reference to a bigger array, and many
398-
// ArrayRefs here might be referencing exactly the same array, so if we
399-
// were to call `arr.get_array_memory_size()`, we would be double-counting
400-
// the same underlying data many times.
401-
//
402-
// Instead, we do an approximation by estimating how much memory each
403-
// ArrayRef would occupy if its underlying data was fully owned by this
404-
// accumulator.
405-
//
406-
// Note that this is just an estimation, but the reality is that this
407-
// accumulator might not own any data.
408-
.map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default())
409-
.sum::<usize>()
410395
+ self.datatype.size()
411396
- size_of_val(&self.datatype)
412397
}
398+
399+
fn claim_buffers(&self, pool: &dyn MemoryPool) {
400+
for arr in &self.values {
401+
for buffer in arr.to_data().buffers() {
402+
buffer.claim(pool);
403+
}
404+
}
405+
}
413406
}
414407

415408
#[derive(Debug)]
@@ -1073,10 +1066,25 @@ mod tests {
10731066
acc2.update_batch(&[data(["b", "c", "a"])])?;
10741067
acc1 = merge(acc1, acc2)?;
10751068

1076-
assert_eq!(acc1.size(), 266);
1069+
// size() returns only non-Arrow allocations (Vec capacity, etc.)
1070+
let non_arrow_size = acc1.size();
1071+
assert_eq!(non_arrow_size, 236);
1072+
1073+
// Verify Arrow buffer tracking - claims full buffer capacity
1074+
let pool = arrow_buffer::TrackingMemoryPool::default();
1075+
acc1.claim_buffers(&pool);
1076+
let arrow_buffer_size = pool.used();
1077+
1078+
// With small arrays, buffer over-allocation creates high overhead
1079+
// This tracks actual physical memory (capacity), not logical slice size
1080+
assert_eq!(arrow_buffer_size, 2080);
1081+
1082+
// Total memory = non-Arrow + Arrow buffers (tracks actual RAM usage)
1083+
assert_eq!(non_arrow_size + arrow_buffer_size, 2316);
10771084

10781085
Ok(())
10791086
}
1087+
10801088
#[test]
10811089
fn does_not_over_account_memory_distinct() -> Result<()> {
10821090
let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()

datafusion/physical-plan/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ name = "datafusion_physical_plan"
4848
[dependencies]
4949
ahash = { workspace = true }
5050
arrow = { workspace = true }
51+
arrow-buffer = { workspace = true, features = ["pool"] }
5152
arrow-ord = { workspace = true }
5253
arrow-schema = { workspace = true }
5354
async-trait = { workspace = true }

datafusion/physical-plan/src/aggregates/group_values/mod.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use arrow::array::types::{
2424
};
2525
use arrow::array::{downcast_primitive, ArrayRef, RecordBatch};
2626
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
27+
use arrow_buffer::MemoryPool;
2728
use datafusion_common::Result;
2829

2930
use datafusion_expr::EmitTo;
@@ -99,9 +100,22 @@ pub trait GroupValues: Send {
99100
/// assigned.
100101
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;
101102

102-
/// Returns the number of bytes of memory used by this [`GroupValues`]
103+
/// Returns the size of non-Arrow allocations in bytes.
104+
///
105+
/// This includes Vec capacity, BufferBuilder capacity, and other
106+
/// non-Arrow data structures. Arrow Buffer memory should be tracked
107+
/// separately via [`claim_buffers`].
108+
///
109+
/// [`claim_buffers`]: GroupValues::claim_buffers
103110
fn size(&self) -> usize;
104111

112+
/// Claims all internal Arrow buffers with the provided memory pool.
113+
///
114+
/// Default implementation does nothing (for builders that don't store arrays with shared buffers).
115+
fn claim_buffers(&self, _pool: &dyn MemoryPool) {
116+
// Default: no-op
117+
}
118+
105119
/// Returns true if this [`GroupValues`] is empty
106120
fn is_empty(&self) -> bool;
107121

0 commit comments

Comments
 (0)