From 01b48837cdb67e591cd4d06f79cef8bd63755965 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Fri, 2 Jan 2026 02:01:01 +0100 Subject: [PATCH 1/2] Add heap_size to statistics This adds a heap_size method returning the amount of memory a statistics struct allocates on the heap. --- datafusion/common/src/stats.rs | 57 ++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index ba13ef392d912..ee6cb4ecc37c8 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -321,6 +321,13 @@ impl Statistics { } } + /// Returns the memory size in bytes. + pub fn heap_size(&self) -> usize { + // column_statistics + num_rows + total_byte_size + self.column_statistics.capacity() * size_of::() + + size_of::>() * 2 + } + /// Calculates `total_byte_size` based on the schema and `num_rows`. /// If any of the columns has non-primitive width, `total_byte_size` is set to inexact. pub fn calculate_total_byte_size(&mut self, schema: &Schema) { @@ -1757,4 +1764,54 @@ mod tests { // total_byte_size should fall back to scaling: 8000 * 0.1 = 800 assert_eq!(result.total_byte_size, Precision::Inexact(800)); } + + #[test] + fn test_statistics_heap_size() { + let stats = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(100), + column_statistics: vec![], + }; + + assert_eq!(stats.heap_size(), 32); + + let stats = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Exact(100), + }], + }; + + assert_eq!(stats.heap_size(), 320); + + let stats = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Exact(100), + }, + ColumnStatistics { + null_count: Precision::Exact(10), + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Exact(100), + }, + ], + }; + assert_eq!(stats.heap_size(), 608); + } } From 079512e2d3443dcbe7b75e652ab87bc8b9e020fe Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sun, 4 Jan 2026 23:20:59 +0100 Subject: [PATCH 2/2] Add HeapSize for Stats --- datafusion/common/src/heap_size.rs | 454 +++++++++++++++++++ datafusion/common/src/lib.rs | 1 + datafusion/common/src/stats.rs | 78 ++-- datafusion/execution/src/cache/cache_unit.rs | 10 +- 4 files changed, 496 insertions(+), 47 deletions(-) create mode 100644 datafusion/common/src/heap_size.rs diff --git a/datafusion/common/src/heap_size.rs b/datafusion/common/src/heap_size.rs new file mode 100644 index 0000000000000..e5dcb4b5b0f9f --- /dev/null +++ b/datafusion/common/src/heap_size.rs @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::stats::Precision; +use crate::{ColumnStatistics, ScalarValue, Statistics}; +use arrow::array::{ + Array, FixedSizeListArray, LargeListArray, ListArray, MapArray, StructArray, +}; +use arrow::datatypes::{ + DataType, Field, Fields, IntervalDayTime, IntervalMonthDayNano, IntervalUnit, + TimeUnit, UnionFields, UnionMode, i256, +}; +use half::f16; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; +/// Since HeapSize is private in arrow-rs this code is taken over +/// from +/// and extended to provide heap size estimation for Statistics. This needs to be cleaned +/// up and replaced once the HeapSize is publicly available from arrow-rs. +/// +/// +/// Trait for calculating the size of various containers +pub trait HeapSize { + /// Return the size of any bytes allocated on the heap by this object, + /// including heap memory in those structures + /// + /// Note that the size of the type itself is not included in the result -- + /// instead, that size is added by the caller (e.g. container). + fn heap_size(&self) -> usize; +} + +impl HeapSize for Statistics { + fn heap_size(&self) -> usize { + self.num_rows.heap_size() + + self.total_byte_size.heap_size() + + self + .column_statistics + .iter() + .map(|x| x.heap_size()) + .sum::() + } +} + +impl HeapSize + for Precision +{ + fn heap_size(&self) -> usize { + self.get_value().map_or_else(|| 0, |v| v.heap_size()) + } +} + +impl HeapSize for ColumnStatistics { + fn heap_size(&self) -> usize { + self.null_count.heap_size() + + self.max_value.heap_size() + + self.min_value.heap_size() + + self.sum_value.heap_size() + + self.distinct_count.heap_size() + + self.byte_size.heap_size() + } +} + +impl HeapSize for ScalarValue { + fn heap_size(&self) -> usize { + use crate::scalar::ScalarValue::*; + match self { + Null => 0, + Boolean(b) => b.heap_size(), + Float16(f) => f.heap_size(), + Float32(f) => f.heap_size(), + Float64(f) => f.heap_size(), + Decimal32(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal64(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal128(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal256(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Int8(i) => i.heap_size(), + Int16(i) => i.heap_size(), + Int32(i) => i.heap_size(), + Int64(i) => i.heap_size(), + UInt8(u) => u.heap_size(), + UInt16(u) => u.heap_size(), + UInt32(u) => u.heap_size(), + UInt64(u) => u.heap_size(), + Utf8(u) => u.heap_size(), + Utf8View(u) => u.heap_size(), + LargeUtf8(l) => l.heap_size(), + Binary(b) => b.heap_size(), + BinaryView(b) => b.heap_size(), + FixedSizeBinary(a, b) => a.heap_size() + b.heap_size(), + LargeBinary(l) => l.heap_size(), + FixedSizeList(f) => f.heap_size(), + List(l) => l.heap_size(), + LargeList(l) => l.heap_size(), + Struct(s) => s.heap_size(), + Map(m) => m.heap_size(), + Date32(d) => d.heap_size(), + Date64(d) => d.heap_size(), + Time32Second(t) => t.heap_size(), + Time32Millisecond(t) => t.heap_size(), + Time64Microsecond(t) => t.heap_size(), + Time64Nanosecond(t) => t.heap_size(), + TimestampSecond(a, b) => a.heap_size() + b.heap_size(), + TimestampMillisecond(a, b) => a.heap_size() + b.heap_size(), + TimestampMicrosecond(a, b) => a.heap_size() + b.heap_size(), + TimestampNanosecond(a, b) => a.heap_size() + b.heap_size(), + IntervalYearMonth(i) => i.heap_size(), + IntervalDayTime(i) => i.heap_size(), + IntervalMonthDayNano(i) => i.heap_size(), + DurationSecond(d) => d.heap_size(), + DurationMillisecond(d) => d.heap_size(), + DurationMicrosecond(d) => d.heap_size(), + DurationNanosecond(d) => d.heap_size(), + Union(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Dictionary(a, b) => a.heap_size() + b.heap_size(), + } + } +} + +impl HeapSize for DataType { + fn heap_size(&self) -> usize { + use DataType::*; + match self { + Null => 0, + Boolean => 0, + Int8 => 0, + Int16 => 0, + Int32 => 0, + Int64 => 0, + UInt8 => 0, + UInt16 => 0, + UInt32 => 0, + UInt64 => 0, + Float16 => 0, + Float32 => 0, + Float64 => 0, + Timestamp(t, s) => t.heap_size() + s.heap_size(), + Date32 => 0, + Date64 => 0, + Time32(t) => t.heap_size(), + Time64(t) => t.heap_size(), + Duration(t) => t.heap_size(), + Interval(i) => i.heap_size(), + Binary => 0, + FixedSizeBinary(i) => i.heap_size(), + LargeBinary => 0, + BinaryView => 0, + Utf8 => 0, + LargeUtf8 => 0, + Utf8View => 0, + List(v) => v.heap_size(), + ListView(v) => v.heap_size(), + FixedSizeList(f, i) => f.heap_size() + i.heap_size(), + LargeList(l) => l.heap_size(), + LargeListView(l) => l.heap_size(), + Struct(s) => s.heap_size(), + Union(u, m) => u.heap_size() + m.heap_size(), + Dictionary(a, b) => a.heap_size() + b.heap_size(), + Decimal32(u8, i8) => u8.heap_size() + i8.heap_size(), + Decimal64(u8, i8) => u8.heap_size() + i8.heap_size(), + Decimal128(u8, i8) => u8.heap_size() + i8.heap_size(), + Decimal256(u8, i8) => u8.heap_size() + i8.heap_size(), + Map(m, b) => m.heap_size() + b.heap_size(), + RunEndEncoded(a, b) => a.heap_size() + b.heap_size(), + } + } +} + +impl HeapSize for Vec { + fn heap_size(&self) -> usize { + let item_size = size_of::(); + // account for the contents of the Vec + (self.capacity() * item_size) + + // add any heap allocations by contents + self.iter().map(|t| t.heap_size()).sum::() + } +} + +impl HeapSize for HashMap { + fn heap_size(&self) -> usize { + let capacity = self.capacity(); + if capacity == 0 { + return 0; + } + + // HashMap doesn't provide a way to get its heap size, so this is an approximation based on + // the behavior of hashbrown::HashMap as at version 0.16.0, and may become inaccurate + // if the implementation changes. + let key_val_size = size_of::<(K, V)>(); + // Overhead for the control tags group, which may be smaller depending on architecture + let group_size = 16; + // 1 byte of metadata stored per bucket. + let metadata_size = 1; + + // Compute the number of buckets for the capacity. Based on hashbrown's capacity_to_buckets + let buckets = if capacity < 15 { + let min_cap = match key_val_size { + 0..=1 => 14, + 2..=3 => 7, + _ => 3, + }; + let cap = min_cap.max(capacity); + if cap < 4 { + 4 + } else if cap < 8 { + 8 + } else { + 16 + } + } else { + (capacity.saturating_mul(8) / 7).next_power_of_two() + }; + + group_size + + (buckets * (key_val_size + metadata_size)) + + self.keys().map(|k| k.heap_size()).sum::() + + self.values().map(|v| v.heap_size()).sum::() + } +} + +impl HeapSize for Arc { + fn heap_size(&self) -> usize { + // Arc stores weak and strong counts on the heap alongside an instance of T + 2 * size_of::() + size_of::() + self.as_ref().heap_size() + } +} + +impl HeapSize for Arc { + fn heap_size(&self) -> usize { + 2 * size_of::() + size_of_val(self.as_ref()) + self.as_ref().heap_size() + } +} + +impl HeapSize for Fields { + fn heap_size(&self) -> usize { + self.into_iter().map(|f| f.heap_size()).sum::() + } +} + +impl HeapSize for StructArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} + +impl HeapSize for LargeListArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} + +impl HeapSize for ListArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} + +impl HeapSize for FixedSizeListArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} +impl HeapSize for MapArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} + +impl HeapSize for Arc { + fn heap_size(&self) -> usize { + 2 * size_of::() + self.as_ref().heap_size() + } +} + +impl HeapSize for Box { + fn heap_size(&self) -> usize { + size_of::() + self.as_ref().heap_size() + } +} + +impl HeapSize for Option { + fn heap_size(&self) -> usize { + self.as_ref().map(|inner| inner.heap_size()).unwrap_or(0) + } +} + +impl HeapSize for (A, B) +where + A: HeapSize, + B: HeapSize, +{ + fn heap_size(&self) -> usize { + self.0.heap_size() + self.1.heap_size() + } +} + +impl HeapSize for String { + fn heap_size(&self) -> usize { + self.capacity() + } +} + +impl HeapSize for str { + fn heap_size(&self) -> usize { + self.to_string().capacity() + } +} + +impl HeapSize for UnionFields { + fn heap_size(&self) -> usize { + self.iter().map(|f| f.0.heap_size() + f.1.heap_size()).sum() + } +} + +impl HeapSize for UnionMode { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for TimeUnit { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for IntervalUnit { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for Field { + fn heap_size(&self) -> usize { + self.name().heap_size() + + self.data_type().heap_size() + + self.is_nullable().heap_size() + + self.dict_is_ordered().heap_size() + + self.metadata().heap_size() + } +} + +impl HeapSize for IntervalMonthDayNano { + fn heap_size(&self) -> usize { + self.days.heap_size() + self.months.heap_size() + self.nanoseconds.heap_size() + } +} + +impl HeapSize for IntervalDayTime { + fn heap_size(&self) -> usize { + self.days.heap_size() + self.milliseconds.heap_size() + } +} + +impl HeapSize for bool { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} +impl HeapSize for u8 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for u16 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for u32 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for u64 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for i8 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for i16 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for i32 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} +impl HeapSize for i64 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for i128 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for i256 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for f16 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for f32 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} +impl HeapSize for f64 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl HeapSize for usize { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index df6659c6f843c..c12d8d5cd3e2b 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -47,6 +47,7 @@ pub mod error; pub mod file_options; pub mod format; pub mod hash_utils; +pub mod heap_size; pub mod instant; pub mod metadata; pub mod nested_struct; diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index ee6cb4ecc37c8..ac930bf3a484f 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -22,12 +22,13 @@ use std::fmt::{self, Debug, Display}; use crate::{Result, ScalarValue}; use crate::error::_plan_err; +use crate::heap_size::HeapSize; use arrow::datatypes::{DataType, Schema}; /// Represents a value with a degree of certainty. `Precision` is used to /// propagate information the precision of statistical values. #[derive(Clone, PartialEq, Eq, Default, Copy)] -pub enum Precision { +pub enum Precision { /// The exact value is known Exact(T), /// The value is not known exactly, but is likely close to this value @@ -37,7 +38,7 @@ pub enum Precision { Absent, } -impl Precision { +impl Precision { /// If we have some value (exact or inexact), it returns that value. /// Otherwise, it returns `None`. pub fn get_value(&self) -> Option<&T> { @@ -52,7 +53,7 @@ impl Precision { pub fn map(self, f: F) -> Precision where F: Fn(T) -> U, - U: Debug + Clone + PartialEq + Eq + PartialOrd, + U: Debug + Clone + PartialEq + Eq + PartialOrd + HeapSize, { match self { Precision::Exact(val) => Precision::Exact(f(val)), @@ -245,7 +246,7 @@ impl Precision { } } -impl Debug for Precision { +impl Debug for Precision { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Precision::Exact(inner) => write!(f, "Exact({inner:?})"), @@ -255,7 +256,7 @@ impl Debug for Precision { } } -impl Display for Precision { +impl Display for Precision { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Precision::Exact(inner) => write!(f, "Exact({inner:?})"), @@ -321,13 +322,6 @@ impl Statistics { } } - /// Returns the memory size in bytes. - pub fn heap_size(&self) -> usize { - // column_statistics + num_rows + total_byte_size - self.column_statistics.capacity() * size_of::() - + size_of::>() * 2 - } - /// Calculates `total_byte_size` based on the schema and `num_rows`. /// If any of the columns has non-primitive width, `total_byte_size` is set to inexact. pub fn calculate_total_byte_size(&mut self, schema: &Schema) { @@ -832,6 +826,8 @@ impl ColumnStatistics { mod tests { use super::*; use crate::assert_contains; + use arrow::array::{Int32Array, ListArray}; + use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::Field; use std::sync::Arc; @@ -1767,15 +1763,7 @@ mod tests { #[test] fn test_statistics_heap_size() { - let stats = Statistics { - num_rows: Precision::Exact(100), - total_byte_size: Precision::Exact(100), - column_statistics: vec![], - }; - - assert_eq!(stats.heap_size(), 32); - - let stats = Statistics { + let stats_no_heap_allocations = Statistics { num_rows: Precision::Exact(100), total_byte_size: Precision::Exact(100), column_statistics: vec![ColumnStatistics { @@ -1788,30 +1776,36 @@ mod tests { }], }; - assert_eq!(stats.heap_size(), 320); + assert_eq!(stats_no_heap_allocations.heap_size(), 0); + + let values = Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7]); + let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 3, 6, 8])); + let field = Arc::new(Field::new_list_field(DataType::Int32, true)); + let list_array = ListArray::new(field, offsets, Arc::new(values), None); - let stats = Statistics { + let column_statistics = ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))), + min_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))), + sum_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))), + distinct_count: Precision::Exact(100), + byte_size: Precision::Exact(800), + }; + + let stats_1 = Statistics { num_rows: Precision::Exact(100), total_byte_size: Precision::Exact(100), - column_statistics: vec![ - ColumnStatistics { - null_count: Precision::Absent, - max_value: Precision::Absent, - min_value: Precision::Absent, - sum_value: Precision::Absent, - distinct_count: Precision::Absent, - byte_size: Precision::Exact(100), - }, - ColumnStatistics { - null_count: Precision::Exact(10), - max_value: Precision::Absent, - min_value: Precision::Absent, - sum_value: Precision::Absent, - distinct_count: Precision::Absent, - byte_size: Precision::Exact(100), - }, - ], + column_statistics: vec![column_statistics.clone()], + }; + + assert_eq!(stats_1.heap_size(), 1152); + + let heap_stats_2 = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(100), + column_statistics: vec![column_statistics.clone(), column_statistics.clone()], }; - assert_eq!(stats.heap_size(), 608); + + assert_eq!(heap_stats_2.heap_size(), 2304); } } diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index d98d23821ec7f..46e65cff91b7e 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -22,11 +22,11 @@ use crate::cache::cache_manager::{ CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry, }; +pub use crate::cache::DefaultFilesMetadataCache; use dashmap::DashMap; +use datafusion_common::heap_size::HeapSize; use object_store::path::Path; -pub use crate::cache::DefaultFilesMetadataCache; - /// Default implementation of [`FileStatisticsCache`] /// /// Stores cached file metadata (statistics and orderings) for files. @@ -88,7 +88,7 @@ impl FileStatisticsCache for DefaultFileStatisticsCache { num_rows: cached.statistics.num_rows, num_columns: cached.statistics.column_statistics.len(), table_size_bytes: cached.statistics.total_byte_size, - statistics_size_bytes: 0, // TODO: set to the real size in the future + statistics_size_bytes: cached.statistics.heap_size(), has_ordering: cached.ordering.is_some(), }, ); @@ -395,7 +395,7 @@ mod tests { num_rows: Precision::Absent, num_columns: 1, table_size_bytes: Precision::Absent, - statistics_size_bytes: 0, + statistics_size_bytes: 72, has_ordering: false, } ), @@ -406,7 +406,7 @@ mod tests { num_rows: Precision::Absent, num_columns: 1, table_size_bytes: Precision::Absent, - statistics_size_bytes: 0, + statistics_size_bytes: 72, has_ordering: true, } ),