Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 75 additions & 3 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
//! [`GroupValues`] trait for storing and interning group keys

use arrow::array::types::{
Date32Type, Date64Type, Decimal128Type, Time32MillisecondType, Time32SecondType,
Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
Date32Type, Date64Type, Decimal128Type, Int8Type, Int16Type, Int32Type, Int64Type,
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
};
use arrow::array::{ArrayRef, downcast_primitive};
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
Expand All @@ -35,6 +36,8 @@ mod single_group_by;
use datafusion_physical_expr::binary_map::OutputType;
use multi_group_by::GroupValuesColumn;
use row::GroupValuesRows;
use single_group_by::primitive_adaptive::GroupValuesPrimitiveAdaptive;
use single_group_by::primitive_flat::GroupValuesPrimitiveFlat;

pub(crate) use single_group_by::primitive::HashValue;

Expand Down Expand Up @@ -119,6 +122,12 @@ pub trait GroupValues: Send {
///
/// [`GroupValues`] implementations choosing logic:
///
/// - If group by single column with a small integer type (i8/u8/i16/u16),
/// or a larger integer type with statistics showing a small range
/// or a low fill rate (range / num_rows < 4),
/// [`GroupValuesPrimitiveFlat`] will be chosen for O(1) direct-indexed
/// lookups.
///
/// - If group by single column, and type of this column has
/// the specific [`GroupValues`] implementation, such implementation
/// will be chosen.
Expand All @@ -138,6 +147,11 @@ pub fn new_group_values(
if schema.fields.len() == 1 {
let d = schema.fields[0].data_type();

// Try flat (direct-indexed) implementation first
if let Some(flat) = try_create_flat_group_values(d) {
return Ok(flat);
}

macro_rules! downcast_helper {
($t:ty, $d:ident) => {
return Ok(Box::new(GroupValuesPrimitive::<$t>::new($d.clone())))
Expand Down Expand Up @@ -210,3 +224,61 @@ pub fn new_group_values(
Ok(Box::new(GroupValuesRows::try_new(schema)?))
}
}

/// Try to create a flat (direct-indexed) [`GroupValues`] implementation for a
/// single primitive column. Returns `None` if the type doesn't support flat
/// indexing.
///
/// For small types (i8/u8/i16/u16) the full type range is always flat-indexed.
/// For larger integer types an adaptive implementation is used that observes
/// the actual data range at runtime and transparently falls back to hashing
/// if the range exceeds the threshold.
fn try_create_flat_group_values(data_type: &DataType) -> Option<Box<dyn GroupValues>> {
match data_type {
// Small integer types: always use flat indexing (full type range)
DataType::Int8 => Some(Box::new(GroupValuesPrimitiveFlat::<Int8Type>::new(
data_type.clone(),
i8::MIN,
256,
))),
DataType::UInt8 => Some(Box::new(GroupValuesPrimitiveFlat::<UInt8Type>::new(
data_type.clone(),
0u8,
256,
))),
DataType::Int16 => Some(Box::new(GroupValuesPrimitiveFlat::<Int16Type>::new(
data_type.clone(),
i16::MIN,
65536,
))),
DataType::UInt16 => Some(Box::new(GroupValuesPrimitiveFlat::<UInt16Type>::new(
data_type.clone(),
0u16,
65536,
))),

// Larger integer types: adaptive flat → hash fallback at runtime.
// No statistics required — the implementation observes the actual
// data range and switches to hashing if it exceeds the threshold.
DataType::Int32 => Some(Box::new(
GroupValuesPrimitiveAdaptive::<Int32Type>::new(data_type.clone()),
)),
DataType::UInt32 => Some(Box::new(
GroupValuesPrimitiveAdaptive::<UInt32Type>::new(data_type.clone()),
)),
DataType::Int64 => Some(Box::new(
GroupValuesPrimitiveAdaptive::<Int64Type>::new(data_type.clone()),
)),
DataType::UInt64 => Some(Box::new(
GroupValuesPrimitiveAdaptive::<UInt64Type>::new(data_type.clone()),
)),
DataType::Date32 => Some(Box::new(
GroupValuesPrimitiveAdaptive::<Date32Type>::new(data_type.clone()),
)),
DataType::Date64 => Some(Box::new(
GroupValuesPrimitiveAdaptive::<Date64Type>::new(data_type.clone()),
)),

_ => None,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ mod boolean;
mod bytes;
pub mod bytes_view;
pub mod primitive;

use std::mem::{self, size_of};

use crate::aggregates::group_values::GroupValues;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ pub(crate) mod boolean;
pub(crate) mod bytes;
pub(crate) mod bytes_view;
pub(crate) mod primitive;
pub(crate) mod primitive_adaptive;
pub(crate) mod primitive_flat;
Loading
Loading