Skip to content

Commit 7c05b20

Browse files
authored
refactor: move metrics module to datafusion-common crate (#19247)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> #18456 requires this refactor. Now we have dependency physical-plan -> physical-expr -> common, and metrics module lives in physical-plan. We want metrics module to directly accessible from physical-expr crate, so they should be moved to common crate. For compile time concern, I've checked `cargo build --profile release-nonlto --timings` benchmark, compiling `common` crate takes 5s in 3min total, so it has no noticeable impact. They're all keep in this project wide root `datafusion-common` dependency for simplicity. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Metrics module depends on memory util `get_record_batch_memory_size`, so it's first moved to `common` crate - Move metrics module to `common` crate Above moves are all re-exported in the original place to keep the public APIs backward-compatible ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 899a762 commit 7c05b20

File tree

12 files changed

+240
-229
lines changed

12 files changed

+240
-229
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/utils/memory.rs

Lines changed: 198 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
//! This module provides a function to estimate the memory size of a HashTable prior to allocation
1919
20-
use crate::Result;
2120
use crate::error::_exec_datafusion_err;
22-
use std::mem::size_of;
21+
use crate::{HashSet, Result};
22+
use arrow::array::ArrayData;
23+
use arrow::record_batch::RecordBatch;
24+
use std::{mem::size_of, ptr::NonNull};
2325

2426
/// Estimates the memory size required for a hash table prior to allocation.
2527
///
@@ -99,6 +101,74 @@ pub fn estimate_memory_size<T>(num_elements: usize, fixed_size: usize) -> Result
99101
})
100102
}
101103

104+
/// Calculate total used memory of this batch.
105+
///
106+
/// This function is used to estimate the physical memory usage of the `RecordBatch`.
107+
/// It only counts the memory of large data `Buffer`s, and ignores metadata like
108+
/// types and pointers.
109+
/// The implementation will add up all unique `Buffer`'s memory
110+
/// size, due to:
111+
/// - The data pointer inside `Buffer` are memory regions returned by global memory
112+
/// allocator, those regions can't have overlap.
113+
/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have overlap
114+
/// or reuse the same `Buffer`. For example: taking a slice from `Array`.
115+
///
116+
/// Example:
117+
/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are pointing
118+
/// to a sub-region of the same buffer.
119+
///
120+
/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
121+
/// ^ ^ ^ ^
122+
/// | | | |
123+
/// col1->{ } | |
124+
/// col2--------->{ }
125+
///
126+
/// In the above case, `get_record_batch_memory_size` will return the size of
127+
/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size.
128+
///
129+
/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the
130+
/// buffer memory size if multiple arrays within the batch are sharing the same
131+
/// `Buffer`. This method provides temporary fix until the issue is resolved:
132+
/// <https://github.com/apache/arrow-rs/issues/6439>
133+
pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
134+
// Store pointers to `Buffer`'s start memory address (instead of actual
135+
// used data region's pointer represented by current `Array`)
136+
let mut counted_buffers: HashSet<NonNull<u8>> = HashSet::new();
137+
let mut total_size = 0;
138+
139+
for array in batch.columns() {
140+
let array_data = array.to_data();
141+
count_array_data_memory_size(&array_data, &mut counted_buffers, &mut total_size);
142+
}
143+
144+
total_size
145+
}
146+
147+
/// Count the memory usage of `array_data` and its children recursively.
148+
fn count_array_data_memory_size(
149+
array_data: &ArrayData,
150+
counted_buffers: &mut HashSet<NonNull<u8>>,
151+
total_size: &mut usize,
152+
) {
153+
// Count memory usage for `array_data`
154+
for buffer in array_data.buffers() {
155+
if counted_buffers.insert(buffer.data_ptr()) {
156+
*total_size += buffer.capacity();
157+
} // Otherwise the buffer's memory is already counted
158+
}
159+
160+
if let Some(null_buffer) = array_data.nulls()
161+
&& counted_buffers.insert(null_buffer.inner().inner().data_ptr())
162+
{
163+
*total_size += null_buffer.inner().inner().capacity();
164+
}
165+
166+
// Count all children `ArrayData` recursively
167+
for child in array_data.child_data() {
168+
count_array_data_memory_size(child, counted_buffers, total_size);
169+
}
170+
}
171+
102172
#[cfg(test)]
103173
mod tests {
104174
use std::{collections::HashSet, mem::size_of};
@@ -132,3 +202,129 @@ mod tests {
132202
assert!(estimated.is_err());
133203
}
134204
}
205+
206+
#[cfg(test)]
207+
mod record_batch_tests {
208+
use super::*;
209+
use arrow::array::{Float64Array, Int32Array, ListArray};
210+
use arrow::datatypes::{DataType, Field, Int32Type, Schema};
211+
use std::sync::Arc;
212+
213+
#[test]
214+
fn test_get_record_batch_memory_size() {
215+
let schema = Arc::new(Schema::new(vec![
216+
Field::new("ints", DataType::Int32, true),
217+
Field::new("float64", DataType::Float64, false),
218+
]));
219+
220+
let int_array =
221+
Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
222+
let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
223+
224+
let batch = RecordBatch::try_new(
225+
schema,
226+
vec![Arc::new(int_array), Arc::new(float64_array)],
227+
)
228+
.unwrap();
229+
230+
let size = get_record_batch_memory_size(&batch);
231+
assert_eq!(size, 60);
232+
}
233+
234+
#[test]
235+
fn test_get_record_batch_memory_size_with_null() {
236+
let schema = Arc::new(Schema::new(vec![
237+
Field::new("ints", DataType::Int32, true),
238+
Field::new("float64", DataType::Float64, false),
239+
]));
240+
241+
let int_array = Int32Array::from(vec![None, Some(2), Some(3)]);
242+
let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0]);
243+
244+
let batch = RecordBatch::try_new(
245+
schema,
246+
vec![Arc::new(int_array), Arc::new(float64_array)],
247+
)
248+
.unwrap();
249+
250+
let size = get_record_batch_memory_size(&batch);
251+
assert_eq!(size, 100);
252+
}
253+
254+
#[test]
255+
fn test_get_record_batch_memory_size_empty() {
256+
let schema = Arc::new(Schema::new(vec![Field::new(
257+
"ints",
258+
DataType::Int32,
259+
false,
260+
)]));
261+
262+
let int_array: Int32Array = Int32Array::from(vec![] as Vec<i32>);
263+
let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)]).unwrap();
264+
265+
let size = get_record_batch_memory_size(&batch);
266+
assert_eq!(size, 0, "Empty batch should have 0 memory size");
267+
}
268+
269+
#[test]
270+
fn test_get_record_batch_memory_size_shared_buffer() {
271+
let original = Int32Array::from(vec![1, 2, 3, 4, 5]);
272+
let slice1 = original.slice(0, 3);
273+
let slice2 = original.slice(2, 3);
274+
275+
let schema_origin = Arc::new(Schema::new(vec![Field::new(
276+
"origin_col",
277+
DataType::Int32,
278+
false,
279+
)]));
280+
let batch_origin =
281+
RecordBatch::try_new(schema_origin, vec![Arc::new(original)]).unwrap();
282+
283+
let schema = Arc::new(Schema::new(vec![
284+
Field::new("slice1", DataType::Int32, false),
285+
Field::new("slice2", DataType::Int32, false),
286+
]));
287+
288+
let batch_sliced =
289+
RecordBatch::try_new(schema, vec![Arc::new(slice1), Arc::new(slice2)])
290+
.unwrap();
291+
292+
let size_origin = get_record_batch_memory_size(&batch_origin);
293+
let size_sliced = get_record_batch_memory_size(&batch_sliced);
294+
295+
assert_eq!(size_origin, size_sliced);
296+
}
297+
298+
#[test]
299+
fn test_get_record_batch_memory_size_nested_array() {
300+
let schema = Arc::new(Schema::new(vec![
301+
Field::new(
302+
"nested_int",
303+
DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
304+
false,
305+
),
306+
Field::new(
307+
"nested_int2",
308+
DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
309+
false,
310+
),
311+
]));
312+
313+
let int_list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
314+
Some(vec![Some(1), Some(2), Some(3)]),
315+
]);
316+
317+
let int_list_array2 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
318+
Some(vec![Some(4), Some(5), Some(6)]),
319+
]);
320+
321+
let batch = RecordBatch::try_new(
322+
schema,
323+
vec![Arc::new(int_list_array), Arc::new(int_list_array2)],
324+
)
325+
.unwrap();
326+
327+
let size = get_record_batch_memory_size(&batch);
328+
assert_eq!(size, 8208);
329+
}
330+
}

datafusion/execution/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ sql = []
5151
[dependencies]
5252
arrow = { workspace = true }
5353
async-trait = { workspace = true }
54+
chrono = { workspace = true }
5455
dashmap = { workspace = true }
5556
datafusion-common = { workspace = true, default-features = false }
5657
datafusion-expr = { workspace = true, default-features = false }
@@ -64,5 +65,4 @@ tempfile = { workspace = true }
6465
url = { workspace = true }
6566

6667
[dev-dependencies]
67-
chrono = { workspace = true }
6868
insta = { workspace = true }

datafusion/execution/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub mod cache;
3232
pub mod config;
3333
pub mod disk_manager;
3434
pub mod memory_pool;
35+
pub mod metrics;
3536
pub mod object_store;
3637
#[cfg(feature = "parquet_encryption")]
3738
pub mod parquet_encryption;

datafusion/physical-plan/src/metrics/baseline.rs renamed to datafusion/execution/src/metrics/baseline.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,16 @@
2020
use std::task::Poll;
2121

2222
use arrow::record_batch::RecordBatch;
23-
24-
use crate::spill::get_record_batch_memory_size;
23+
use datafusion_common::{Result, utils::memory::get_record_batch_memory_size};
2524

2625
use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
27-
use datafusion_common::Result;
2826

2927
/// Helper for creating and tracking common "baseline" metrics for
3028
/// each operator
3129
///
3230
/// Example:
3331
/// ```
34-
/// use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
32+
/// use datafusion_execution::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
3533
/// let metrics = ExecutionPlanMetricsSet::new();
3634
///
3735
/// let partition = 2;
@@ -203,7 +201,7 @@ impl SpillMetrics {
203201
}
204202
}
205203

206-
/// Metrics for tracking [`crate::stream::BatchSplitStream`] activity
204+
/// Metrics for tracking batch splitting activity
207205
#[derive(Debug, Clone)]
208206
pub struct SplitMetrics {
209207
/// Number of times an input [`RecordBatch`] was split

datafusion/physical-plan/src/metrics/builder.rs renamed to datafusion/execution/src/metrics/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use super::{
3434
/// case of constant strings
3535
///
3636
/// ```rust
37-
/// use datafusion_physical_plan::metrics::*;
37+
/// use datafusion_execution::metrics::*;
3838
///
3939
/// let metrics = ExecutionPlanMetricsSet::new();
4040
/// let partition = 1;

datafusion/physical-plan/src/metrics/custom.rs renamed to datafusion/execution/src/metrics/custom.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use std::{any::Any, fmt::Debug, fmt::Display, sync::Arc};
4444
/// # use std::any::Any;
4545
/// # use std::sync::atomic::{AtomicUsize, Ordering};
4646
///
47-
/// # use datafusion_physical_plan::metrics::CustomMetricValue;
47+
/// # use datafusion_execution::metrics::CustomMetricValue;
4848
///
4949
/// #[derive(Debug, Default)]
5050
/// struct MyCounter {

datafusion/physical-plan/src/metrics/mod.rs renamed to datafusion/execution/src/metrics/mod.rs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@ mod builder;
2222
mod custom;
2323
mod value;
2424

25+
use datafusion_common::HashMap;
2526
use parking_lot::Mutex;
2627
use std::{
2728
borrow::Cow,
2829
fmt::{Debug, Display},
2930
sync::Arc,
3031
};
3132

32-
use datafusion_common::HashMap;
33-
3433
// public exports
34+
3535
pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics};
3636
pub use builder::MetricBuilder;
3737
pub use custom::CustomMetricValue;
@@ -40,15 +40,14 @@ pub use value::{
4040
ScopedTimerGuard, Time, Timestamp,
4141
};
4242

43-
/// Something that tracks a value of interest (metric) of a DataFusion
44-
/// [`ExecutionPlan`] execution.
43+
/// Something that tracks a value of interest (metric) during execution.
4544
///
4645
/// Typically [`Metric`]s are not created directly, but instead
4746
/// are created using [`MetricBuilder`] or methods on
4847
/// [`ExecutionPlanMetricsSet`].
4948
///
5049
/// ```
51-
/// use datafusion_physical_plan::metrics::*;
50+
/// use datafusion_execution::metrics::*;
5251
///
5352
/// let metrics = ExecutionPlanMetricsSet::new();
5453
/// assert!(metrics.clone_inner().output_rows().is_none());
@@ -66,8 +65,6 @@ pub use value::{
6665
/// // As well as from the metrics set
6766
/// assert_eq!(metrics.clone_inner().output_rows(), Some(13));
6867
/// ```
69-
///
70-
/// [`ExecutionPlan`]: super::ExecutionPlan
7168
7269
#[derive(Debug)]
7370
pub struct Metric {
@@ -204,9 +201,7 @@ impl Metric {
204201
}
205202
}
206203

207-
/// A snapshot of the metrics for a particular ([`ExecutionPlan`]).
208-
///
209-
/// [`ExecutionPlan`]: super::ExecutionPlan
204+
/// A snapshot of the metrics for a particular execution plan.
210205
#[derive(Default, Debug, Clone)]
211206
pub struct MetricsSet {
212207
metrics: Vec<Arc<Metric>>,
@@ -401,17 +396,14 @@ impl Display for MetricsSet {
401396
}
402397
}
403398

404-
/// A set of [`Metric`]s for an individual "operator" (e.g. `&dyn
405-
/// ExecutionPlan`).
399+
/// A set of [`Metric`]s for an individual operator.
406400
///
407-
/// This structure is intended as a convenience for [`ExecutionPlan`]
401+
/// This structure is intended as a convenience for execution plan
408402
/// implementations so they can generate different streams for multiple
409403
/// partitions but easily report them together.
410404
///
411405
/// Each `clone()` of this structure will add metrics to the same
412406
/// underlying metrics set
413-
///
414-
/// [`ExecutionPlan`]: super::ExecutionPlan
415407
#[derive(Default, Debug, Clone)]
416408
pub struct ExecutionPlanMetricsSet {
417409
inner: Arc<Mutex<MetricsSet>>,

datafusion/physical-plan/src/metrics/value.rs renamed to datafusion/execution/src/metrics/value.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919
2020
use super::CustomMetricValue;
2121
use chrono::{DateTime, Utc};
22-
use datafusion_common::instant::Instant;
2322
use datafusion_common::{
24-
human_readable_count, human_readable_duration, human_readable_size,
23+
human_readable_count, human_readable_duration, human_readable_size, instant::Instant,
2524
};
2625
use parking_lot::Mutex;
2726
use std::{

datafusion/physical-plan/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ arrow = { workspace = true }
5252
arrow-ord = { workspace = true }
5353
arrow-schema = { workspace = true }
5454
async-trait = { workspace = true }
55-
chrono = { workspace = true }
5655
datafusion-common = { workspace = true }
5756
datafusion-common-runtime = { workspace = true, default-features = true }
5857
datafusion-execution = { workspace = true }

0 commit comments

Comments
 (0)