diff --git a/Cargo.lock b/Cargo.lock index a2939f425712..10ddeb66aef9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2277,6 +2277,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "half", + "hashbrown 0.14.5", "log", "paste", "rand 0.9.2", diff --git a/datafusion/functions-aggregate/Cargo.toml b/datafusion/functions-aggregate/Cargo.toml index ec6e6b633bb8..f558b9d053f5 100644 --- a/datafusion/functions-aggregate/Cargo.toml +++ b/datafusion/functions-aggregate/Cargo.toml @@ -49,6 +49,7 @@ datafusion-macros = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } half = { workspace = true } +hashbrown = { workspace = true } log = { workspace = true } paste = "1.0.14" @@ -68,3 +69,7 @@ harness = false [[bench]] name = "array_agg" harness = false + +[[bench]] +name = "min_max_bytes" +harness = false diff --git a/datafusion/functions-aggregate/benches/min_max_bytes.rs b/datafusion/functions-aggregate/benches/min_max_bytes.rs new file mode 100644 index 000000000000..7f126a972aae --- /dev/null +++ b/datafusion/functions-aggregate/benches/min_max_bytes.rs @@ -0,0 +1,698 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks included (rationale) +//! +//! The benchmarks included here were designed to exercise the adaptive +//! mode-selection heuristics implemented by the MinMax "bytes" accumulator. +//! Each benchmark targets a specific workload shape to demonstrate why the +//! DenseInline, Simple, or SparseOptimized paths were chosen and to quantify +//! the performance trade-offs between them. +//! +//! - `min bytes dense duplicate groups`: +//! Simulates batches where group ids are densely packed but many consecutive +//! rows target the same group (duplicate consecutive group ids). This +//! exercises the fast-path in the dense-inline implementation that detects +//! consecutive runs and avoids repeated checks/marks. +//! +//! - `min bytes dense reused accumulator`: +//! Multi-batch workload with a stable set of groups across batches. This +//! measures the benefit of reusing lazily-allocated dense scratch/state and +//! ensures the epoch-based marking correctly avoids per-batch clearing. +//! +//! - `min bytes monotonic group ids`: +//! Groups are produced in a growing/monotonic order across rows and batches. +//! This pattern favours simple dense approaches and validates that the +//! algorithm recognises monotonic access to enable the inline fast path. +//! +//! - `min bytes multi batch large`: +//! A large multi-batch scenario (many batches and many groups) intended to +//! capture the behaviour of the adaptive switch under realistic streaming +//! workloads where amortised costs matter most. This benchmark highlights +//! the worst-case gains from choosing the DenseInline/SparseOptimized paths. +//! +//! - `min bytes sparse groups`: +//! Sparse and high-cardinality access patterns where only a tiny fraction of +//! the group domain is touched in each batch. This validates the +//! SparseOptimized implementation which uses hash-based tracking to avoid +//! allocating or zeroing a large dense scratch table every batch. +//! +//! - `min bytes dense first batch`: +//! A single-batch dense workload used to measure the overhead of the +//! undecided/mode-selection phase. It demonstrates the small constant +//! bookkeeping cost before a mode is chosen (the measured ~1% regression). +//! +//! - `min bytes large dense groups`: +//! A single-batch scenario with many dense groups (large N). It ensures the +//! heuristic threshold (e.g. 100k) and memory trade-offs do not cause +//! excessive allocations or regress the single-batch path significantly. +//! +//! - `min bytes single batch large`: +//! A single-batch run with a large number of groups to ensure the simple +//! path remains efficient for one-off aggregations and to quantify the +//! fixed overhead of adaptive bookkeeping. +//! +//! - `min bytes single batch small`: +//! A small single-batch workload used to show that the overhead of the +//! adaptive approach is negligible when groups and data are tiny (micro +//! workloads), and that the simple path remains the fastest for these +//! cases. +//! +//! - `min bytes extreme duplicates`: +//! Very small number of unique groups with many repeated rows per group. +//! This stresses run-length and duplicate detection behaviour and ensures +//! the accumulator handles extreme duplicate patterns without excessive +//! per-row overhead. +//! +//! - `min bytes quadratic growing total groups`: +//! Demonstration benchmark that grows `total_num_groups` across batches +//! while each batch remains dense. It exposes pathological allocation or +//! resize behaviour which scales with the historical total number of +//! groups (quadratic behaviour) so the heuristic/resizing strategy can be +//! validated. +//! +//! - `min bytes sequential stable groups`: +//! Multiple batches where the same contiguous set of groups is touched in +//! each batch. This measures benefit of reusing lazily-allocated dense +//! scratch/state across batches. +//! +//! - `min bytes sequential dense large stable`: +//! A multi-batch benchmark with a large dense domain that remains stable +//! across batches. Used to confirm the sequential dense path reuses +//! allocations and remains consistent. +//! +//! - `min bytes sequential dense large allocations`: +//! Similar to the stable sequential benchmark but each batch allocates +//! different values to ensure the accumulator reuses its scratch +//! allocation across batches (asserts stable size). +//! +//! - `min bytes medium cardinality stable`: +//! Medium-cardinality workload where each batch touches a large fraction +//! (e.g. 80%) of the full domain. This captures behaviour between dense +//! and sparse extremes and validates the heuristic choices. +//! +//! - `min bytes ultra sparse`: +//! Extremely high-cardinality domain where each batch touches only a tiny +//! number of groups. This validates the SparseOptimized implementation and +//! hash-based tracking for low-touch workloads. +//! +//! - `min bytes mode transition`: +//! Alternating phase benchmark that flips between dense and sparse phases. +//! This checks the accumulator's ability to recognise and adapt between +//! modes over time without catastrophic thrashing. + +use arrow::{ + array::{ArrayRef, StringArray}, + datatypes::{DataType, Field, Schema}, +}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator}; +use datafusion_functions_aggregate::min_max::Min; +use datafusion_physical_expr::expressions::col; +use std::sync::Arc; + +const BATCH_SIZE: usize = 512; +const SPARSE_GROUPS: usize = 16; +const LARGE_TOTAL_GROUPS: usize = 10_000; +const MONOTONIC_BATCHES: usize = 32; +const MONOTONIC_TOTAL_GROUPS: usize = MONOTONIC_BATCHES * BATCH_SIZE; +const LARGE_DENSE_GROUPS: usize = MONOTONIC_TOTAL_GROUPS; +const STABLE_GROUPS: usize = 1_000; +const STABLE_BATCHES: usize = 50; +const SEQUENTIAL_DENSE_LARGE_GROUPS: usize = 65_536; +const SEQUENTIAL_DENSE_LARGE_BATCHES: usize = 8; +const MEDIUM_TOTAL_GROUPS: usize = 50_000; +const MEDIUM_BATCHES: usize = 20; +const ULTRA_SPARSE_TOTAL_GROUPS: usize = 1_000_000; +const ULTRA_SPARSE_BATCHES: usize = 20; +const ULTRA_SPARSE_ACTIVE: usize = 100; +const MODE_TRANSITION_PHASES: usize = 20; // 10 dense + 10 sparse + +fn prepare_min_accumulator(data_type: &DataType) -> Box { + let field = Field::new("f", data_type.clone(), true).into(); + let schema = Arc::new(Schema::new(vec![Arc::clone(&field)])); + let accumulator_args = AccumulatorArgs { + return_field: field, + schema: &schema, + ignore_nulls: false, + order_bys: &[], + is_reversed: false, + name: "MIN(f)", + is_distinct: false, + exprs: &[col("f", &schema).unwrap()], + }; + + Min::new() + .create_groups_accumulator(accumulator_args) + .expect("create min accumulator") +} + +fn make_string_values(len: usize) -> ArrayRef { + Arc::new(StringArray::from_iter_values( + (0..len).map(|i| format!("value_{i:05}")), + )) +} + +fn bench_batches( + c: &mut Criterion, + name: &str, + total_num_groups: usize, + group_batches: &[Vec], + mut with_values: F, +) where + F: FnMut(usize) -> ArrayRef, +{ + c.bench_function(name, |b| { + b.iter(|| { + let mut accumulator = prepare_min_accumulator(&DataType::Utf8); + for (batch_idx, group_indices) in group_batches.iter().enumerate() { + let values = with_values(batch_idx); + let _: () = accumulator + .update_batch( + std::slice::from_ref(&values), + group_indices, + None, + total_num_groups, + ) + .expect("update batch"); + black_box(()); + } + }) + }); +} + +fn min_bytes_single_batch_small(c: &mut Criterion) { + let values: ArrayRef = Arc::new(StringArray::from_iter_values( + (0..BATCH_SIZE).map(|i| format!("value_{i:04}")), + )); + let group_indices: Vec = (0..BATCH_SIZE).collect(); + + c.bench_function("min bytes single batch small", |b| { + b.iter(|| { + let mut accumulator = prepare_min_accumulator(&DataType::Utf8); + let _: () = accumulator + .update_batch( + std::slice::from_ref(&values), + &group_indices, + None, + BATCH_SIZE, + ) + .expect("update batch"); + black_box(()); + }) + }); +} + +fn min_bytes_single_batch_large(c: &mut Criterion) { + let values: ArrayRef = Arc::new(StringArray::from_iter_values( + (0..LARGE_DENSE_GROUPS).map(|i| format!("value_{i:04}")), + )); + let group_indices: Vec = (0..LARGE_DENSE_GROUPS).collect(); + + c.bench_function("min bytes single batch large", |b| { + b.iter(|| { + let mut accumulator = prepare_min_accumulator(&DataType::Utf8); + let _: () = accumulator + .update_batch( + std::slice::from_ref(&values), + &group_indices, + None, + LARGE_DENSE_GROUPS, + ) + .expect("update batch"); + black_box(()); + }) + }); +} + +fn min_bytes_multi_batch_large(c: &mut Criterion) { + let values: ArrayRef = Arc::new(StringArray::from_iter_values( + (0..BATCH_SIZE).map(|i| format!("value_{i:04}")), + )); + let group_batches: Vec> = (0..MONOTONIC_BATCHES) + .map(|batch| { + let start = batch * BATCH_SIZE; + (0..BATCH_SIZE).map(|i| start + i).collect() + }) + .collect(); + + c.bench_function("min bytes multi batch large", |b| { + b.iter(|| { + let mut accumulator = prepare_min_accumulator(&DataType::Utf8); + for group_indices in &group_batches { + let _: () = accumulator + .update_batch( + std::slice::from_ref(&values), + group_indices, + None, + LARGE_DENSE_GROUPS, + ) + .expect("update batch"); + black_box(()); + } + }) + }); +} + +fn min_bytes_sparse_groups(c: &mut Criterion) { + let values: ArrayRef = Arc::new(StringArray::from_iter_values( + (0..BATCH_SIZE).map(|i| format!("value_{i:04}", i = i % 1024)), + )); + let group_indices: Vec = (0..BATCH_SIZE).map(|i| i % SPARSE_GROUPS).collect(); + + c.bench_function("min bytes sparse groups", |b| { + b.iter(|| { + let mut accumulator = prepare_min_accumulator(&DataType::Utf8); + let _: () = accumulator + .update_batch( + std::slice::from_ref(&values), + &group_indices, + None, + LARGE_TOTAL_GROUPS, + ) + .expect("update batch"); + black_box(()); + }) + }); +} + +fn min_bytes_dense_first_batch(c: &mut Criterion) { + let values: ArrayRef = Arc::new(StringArray::from_iter_values( + (0..BATCH_SIZE).map(|i| format!("value_{i:04}")), + )); + let group_indices: Vec = (0..BATCH_SIZE).collect(); + + c.bench_function("min bytes dense first batch", |b| { + b.iter(|| { + let mut accumulator = prepare_min_accumulator(&DataType::Utf8); + let _: () = accumulator + .update_batch( + std::slice::from_ref(&values), + &group_indices, + None, + BATCH_SIZE, + ) + .expect("update batch"); + black_box(()); + }) + }); +} + +fn min_bytes_dense_reused_batches(c: &mut Criterion) { + let values: ArrayRef = Arc::new(StringArray::from_iter_values( + (0..BATCH_SIZE).map(|i| format!("value_{i:04}")), + )); + let group_indices: Vec = (0..BATCH_SIZE).collect(); + + c.bench_function("min bytes dense reused accumulator", |b| { + b.iter(|| { + let mut accumulator = prepare_min_accumulator(&DataType::Utf8); + for _ in 0..MONOTONIC_BATCHES { + let _: () = accumulator + .update_batch( + std::slice::from_ref(&values), + &group_indices, + None, + BATCH_SIZE, + ) + .expect("update batch"); + black_box(()); + } + }) + }); +} + +fn min_bytes_dense_duplicate_groups(c: &mut Criterion) { + let unique_groups = BATCH_SIZE / 2; + let values: ArrayRef = Arc::new(StringArray::from_iter_values( + (0..BATCH_SIZE).map(|i| format!("value_{i:04}", i = i / 2)), + )); + let group_indices: Vec = (0..unique_groups).flat_map(|i| [i, i]).collect(); + + c.bench_function("min bytes dense duplicate groups", |b| { + b.iter(|| { + let mut accumulator = prepare_min_accumulator(&DataType::Utf8); + for _ in 0..MONOTONIC_BATCHES { + let _: () = accumulator + .update_batch( + std::slice::from_ref(&values), + &group_indices, + None, + unique_groups, + ) + .expect("update batch"); + black_box(()); + } + }) + }); +} + +fn min_bytes_extreme_duplicates(c: &mut Criterion) { + let unique_groups = 50; + let repeats_per_group = 10; + let total_rows = unique_groups * repeats_per_group; + + let mut value_strings = Vec::with_capacity(total_rows); + for group in 0..unique_groups { + for _ in 0..repeats_per_group { + value_strings.push(format!("value_{group:04}")); + } + } + let values: ArrayRef = Arc::new(StringArray::from(value_strings)); + let group_indices: Vec = (0..unique_groups) + .flat_map(|group| std::iter::repeat_n(group, repeats_per_group)) + .collect(); + + debug_assert_eq!(values.len(), total_rows); + debug_assert_eq!(group_indices.len(), total_rows); + + c.bench_function("min bytes extreme duplicates", |b| { + b.iter(|| { + let mut accumulator = prepare_min_accumulator(&DataType::Utf8); + for _ in 0..MONOTONIC_BATCHES { + let _: () = accumulator + .update_batch( + std::slice::from_ref(&values), + &group_indices, + None, + unique_groups, + ) + .expect("update batch"); + black_box(()); + } + }) + }); +} + +fn min_bytes_sequential_stable_groups(c: &mut Criterion) { + let batches: Vec> = (0..STABLE_BATCHES) + .map(|_| (0..STABLE_GROUPS).collect()) + .collect(); + + bench_batches( + c, + "min bytes sequential stable groups", + STABLE_GROUPS, + &batches, + |_| make_string_values(STABLE_GROUPS), + ); +} + +fn min_bytes_sequential_dense_large_stable(c: &mut Criterion) { + let batches: Vec> = (0..SEQUENTIAL_DENSE_LARGE_BATCHES) + .map(|_| (0..SEQUENTIAL_DENSE_LARGE_GROUPS).collect()) + .collect(); + + let baseline = make_string_values(SEQUENTIAL_DENSE_LARGE_GROUPS); + bench_batches( + c, + "min bytes sequential dense large stable", + SEQUENTIAL_DENSE_LARGE_GROUPS, + &batches, + move |_| baseline.clone(), + ); +} + +fn min_bytes_sequential_dense_large_allocations(c: &mut Criterion) { + let group_indices: Vec = (0..SEQUENTIAL_DENSE_LARGE_GROUPS).collect(); + let batches: Vec = (0..SEQUENTIAL_DENSE_LARGE_BATCHES) + .map(|step| { + let prefix = (b'z' - step as u8) as char; + Arc::new(StringArray::from_iter_values( + (0..SEQUENTIAL_DENSE_LARGE_GROUPS) + .map(|group| format!("{prefix}{prefix}_{group:05}")), + )) as ArrayRef + }) + .collect(); + + c.bench_function("min bytes sequential dense large allocations", |b| { + b.iter(|| { + let mut accumulator = prepare_min_accumulator(&DataType::Utf8); + let mut baseline_size: Option = None; + + for values in &batches { + let _: () = accumulator + .update_batch( + std::slice::from_ref(values), + &group_indices, + None, + SEQUENTIAL_DENSE_LARGE_GROUPS, + ) + .expect("update batch"); + black_box(()); + + let current_size = accumulator.size(); + if let Some(expected) = baseline_size { + assert_eq!( + current_size, expected, + "sequential dense path should reuse its scratch allocation" + ); + } else { + baseline_size = Some(current_size); + } + } + }) + }); +} + +fn min_bytes_medium_cardinality_stable(c: &mut Criterion) { + let touched_per_batch = (MEDIUM_TOTAL_GROUPS as f64 * 0.8) as usize; + let batches: Vec> = (0..MEDIUM_BATCHES) + .map(|batch| { + let start = (batch * touched_per_batch) % MEDIUM_TOTAL_GROUPS; + (0..touched_per_batch) + .map(|offset| (start + offset) % MEDIUM_TOTAL_GROUPS) + .collect() + }) + .collect(); + + bench_batches( + c, + "min bytes medium cardinality stable", + MEDIUM_TOTAL_GROUPS, + &batches, + |_| make_string_values(touched_per_batch), + ); +} + +fn min_bytes_ultra_sparse(c: &mut Criterion) { + let batches: Vec> = (0..ULTRA_SPARSE_BATCHES) + .map(|batch| { + let base = (batch * ULTRA_SPARSE_ACTIVE) % ULTRA_SPARSE_TOTAL_GROUPS; + (0..ULTRA_SPARSE_ACTIVE) + .map(|offset| (base + offset * 8_129) % ULTRA_SPARSE_TOTAL_GROUPS) + .collect() + }) + .collect(); + + bench_batches( + c, + "min bytes ultra sparse", + ULTRA_SPARSE_TOTAL_GROUPS, + &batches, + |_| make_string_values(ULTRA_SPARSE_ACTIVE), + ); +} + +fn min_bytes_mode_transition(c: &mut Criterion) { + let mut batches = Vec::with_capacity(MODE_TRANSITION_PHASES * 2); + + let dense_touch = (STABLE_GROUPS as f64 * 0.9) as usize; + for batch in 0..MODE_TRANSITION_PHASES { + let start = (batch * dense_touch) % STABLE_GROUPS; + batches.push( + (0..dense_touch) + .map(|offset| (start + offset) % STABLE_GROUPS) + .collect(), + ); + } + + let sparse_total = 100_000; + let sparse_touch = (sparse_total as f64 * 0.05) as usize; + for batch in 0..MODE_TRANSITION_PHASES { + let start = (batch * sparse_touch * 13) % sparse_total; + batches.push( + (0..sparse_touch) + .map(|offset| (start + offset * 17) % sparse_total) + .collect(), + ); + } + + bench_batches( + c, + "min bytes mode transition", + sparse_total, + &batches, + |batch_idx| { + if batch_idx < MODE_TRANSITION_PHASES { + make_string_values(dense_touch) + } else { + make_string_values(sparse_touch) + } + }, + ); +} + +/// Demonstration benchmark: simulate growing `total_num_groups` across batches +/// while group indices remain dense in each batch. This exposes quadratic +/// allocation behaviour when per-batch allocations scale with the historical +/// total number of groups (the pathological case discussed in the issue). +fn min_bytes_quadratic_growing_total_groups(c: &mut Criterion) { + // Start small and grow total_num_groups across batches to simulate a + // workload that discovers more groups over time. Each batch contains + // BATCH_SIZE rows with dense group indices in the current domain. + let base_batch_values: ArrayRef = Arc::new(StringArray::from_iter_values( + (0..BATCH_SIZE).map(|i| format!("value_{i:04}")), + )); + + c.bench_function("min bytes quadratic growing total groups", |b| { + b.iter(|| { + let mut accumulator = prepare_min_accumulator(&DataType::Utf8); + + // Grow total_num_groups by increments of BATCH_SIZE for several + // batches to expose allocations proportional to the growing domain. + let mut total_groups = BATCH_SIZE; + for _ in 0..MONOTONIC_BATCHES { + let group_indices: Vec = + (0..BATCH_SIZE).map(|i| i % total_groups).collect(); + + let _: () = accumulator + .update_batch( + std::slice::from_ref(&base_batch_values), + &group_indices, + None, + total_groups, + ) + .expect("update batch"); + black_box(()); + + total_groups = total_groups.saturating_add(BATCH_SIZE); + } + }) + }); +} + +fn min_bytes_monotonic_group_ids(c: &mut Criterion) { + let values: ArrayRef = Arc::new(StringArray::from_iter_values( + (0..BATCH_SIZE).map(|i| format!("value_{i:04}", i = i % 1024)), + )); + let group_batches: Vec> = (0..MONOTONIC_BATCHES) + .map(|batch| { + let start = batch * BATCH_SIZE; + (0..BATCH_SIZE).map(|i| start + i).collect() + }) + .collect(); + + c.bench_function("min bytes monotonic group ids", |b| { + b.iter(|| { + let mut accumulator = prepare_min_accumulator(&DataType::Utf8); + for group_indices in &group_batches { + let _: () = accumulator + .update_batch( + std::slice::from_ref(&values), + group_indices, + None, + MONOTONIC_TOTAL_GROUPS, + ) + .expect("update batch"); + black_box(()); + } + }) + }); +} + +fn min_bytes_growing_total_groups(c: &mut Criterion) { + // Each batch introduces a new contiguous block of group ids so the + // 'total_num_groups' parameter grows with each iteration. This simulates + // workloads where the domain of groups increases over time and exposes + // alloc/resize behaviour that scales with the historical number of groups. + let values: ArrayRef = Arc::new(StringArray::from_iter_values( + (0..BATCH_SIZE).map(|i| format!("value_{i:04}", i = i % 1024)), + )); + let group_batches: Vec> = (0..MONOTONIC_BATCHES) + .map(|batch| { + let start = batch * BATCH_SIZE; + (0..BATCH_SIZE).map(|i| start + i).collect() + }) + .collect(); + + c.bench_function("min bytes growing total groups", |b| { + b.iter(|| { + let mut accumulator = prepare_min_accumulator(&DataType::Utf8); + for (batch_idx, group_indices) in group_batches.iter().enumerate() { + // Simulate the increasing total_num_groups observed by the + // accumulator: each batch's total groups equals the highest + // group index observed so far plus one. + let total_num_groups = (batch_idx + 1) * BATCH_SIZE; + let _: () = accumulator + .update_batch( + std::slice::from_ref(&values), + group_indices, + None, + total_num_groups, + ) + .expect("update batch"); + black_box(()); + } + }) + }); +} + +fn min_bytes_large_dense_groups(c: &mut Criterion) { + let values: ArrayRef = Arc::new(StringArray::from_iter_values( + (0..LARGE_DENSE_GROUPS).map(|i| format!("value_{i:04}")), + )); + let group_indices: Vec = (0..LARGE_DENSE_GROUPS).collect(); + + c.bench_function("min bytes large dense groups", |b| { + b.iter(|| { + let mut accumulator = prepare_min_accumulator(&DataType::Utf8); + let _: () = accumulator + .update_batch( + std::slice::from_ref(&values), + &group_indices, + None, + LARGE_DENSE_GROUPS, + ) + .expect("update batch"); + black_box(()); + }) + }); +} + +criterion_group!( + benches, + min_bytes_single_batch_small, + min_bytes_single_batch_large, + min_bytes_multi_batch_large, + min_bytes_dense_first_batch, + min_bytes_dense_reused_batches, + min_bytes_dense_duplicate_groups, + min_bytes_extreme_duplicates, + min_bytes_quadratic_growing_total_groups, + min_bytes_sparse_groups, + min_bytes_monotonic_group_ids, + min_bytes_growing_total_groups, + min_bytes_large_dense_groups, + min_bytes_sequential_stable_groups, + min_bytes_sequential_dense_large_stable, + min_bytes_sequential_dense_large_allocations, + min_bytes_medium_cardinality_stable, + min_bytes_ultra_sparse, + min_bytes_mode_transition +); +criterion_main!(benches); diff --git a/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs b/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs index 05321c2ff52d..00e5a22bdc68 100644 --- a/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs +++ b/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs @@ -20,9 +20,10 @@ use arrow::array::{ LargeBinaryBuilder, LargeStringBuilder, StringBuilder, StringViewBuilder, }; use arrow::datatypes::DataType; -use datafusion_common::{internal_err, Result}; +use datafusion_common::{internal_err, HashMap, Result}; use datafusion_expr::{EmitTo, GroupsAccumulator}; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::apply_filter_as_nulls; +use hashbrown::hash_map::Entry; use std::mem::size_of; use std::sync::Arc; @@ -35,6 +36,62 @@ use std::sync::Arc; /// [`StringArray`]: arrow::array::StringArray /// [`BinaryArray`]: arrow::array::BinaryArray /// [`StringViewArray`]: arrow::array::StringViewArray +/// Captures the heuristic driven execution strategy for a given accumulator. +/// +/// The state machine starts in [`WorkloadMode::Undecided`] until the first +/// non-null values arrive. Once the workload shape is known we switch to one of +/// the specialised implementations: +/// +/// * [`WorkloadMode::DenseInline`] – enabled for dense group domains with a +/// stable `total_num_groups` (≤ 100k) **and** evidence that the accumulator is +/// reused across batches. Marks used to detect first touches are allocated +/// lazily: they are prepared once the accumulator has observed a previous +/// processed batch (i.e. on the second processed batch), so single-batch +/// workloads avoid the allocation cost. After a small number of consecutive +/// stable batches the implementation "commits" to the dense-inline fast +/// path and disables per-batch statistics and mark tracking. +/// * [`WorkloadMode::Simple`] – chosen for single-batch dense workloads where +/// reuse is unlikely. This path stages updates per-batch and then writes +/// results in-place without using the dense-inline marks. +/// * [`WorkloadMode::SparseOptimized`] – kicks in when the cardinality is high +/// or the batches are sparse/irregular; it retains and reuses the sparse +/// scratch machinery (hash-based tracking) introduced by the dense-inline +/// heuristics. Optimized for sparse access patterns. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum WorkloadMode { + /// The accumulator has not yet observed any non-null values and therefore + /// cannot decide between the simple dense path and the sparse-optimised + /// implementation. + Undecided, + /// Use an inline dense path that updates the accumulator directly without + /// any per-batch scratch allocation. This path is optimised for small, + /// repeatedly accessed group domains where the group ids are densely + /// populated. + DenseInline, + /// Use the original per-batch dense array that favours cache locality and + /// straight-line execution. This is ideal for workloads that repeatedly + /// touch most groups ("dense" workloads). + Simple, + /// Use the sparse/dense scratch machinery introduced to cope with + /// high-cardinality workloads that would otherwise allocate + /// `total_num_groups` scratch entries on every batch. + SparseOptimized, +} + +#[derive(Debug, Clone, Copy, Default)] +pub(super) struct BatchStats { + /// Number of **unique** group ids observed in the processed batch. The + /// counter is strictly per-batch – duplicates within the batch do not + /// contribute multiple times and the value intentionally ignores groups + /// touched in prior batches. This makes the density heuristics resilient to + /// workloads that repeatedly touch the same domain across many batches. + pub(super) unique_groups: usize, + /// Highest group index encountered in the batch. Unlike `unique_groups` + /// duplicates matter here because it is used to derive the effective domain + /// size for density comparisons. + pub(super) max_group_index: Option, +} + #[derive(Debug)] pub(crate) struct MinMaxBytesAccumulator { /// Inner data storage. @@ -381,24 +438,184 @@ fn capacity_to_view_block_size(data_capacity: usize) -> u32 { /// /// See discussion on #[derive(Debug)] -struct MinMaxBytesState { +pub(super) struct MinMaxBytesState { /// The minimum/maximum value for each group - min_max: Vec>>, + pub(super) min_max: Vec>>, /// The data type of the array - data_type: DataType, + pub(super) data_type: DataType, /// The total bytes of the string data (for pre-allocating the final array, /// and tracking memory usage) - total_data_bytes: usize, + pub(super) total_data_bytes: usize, + /// Scratch storage tracking which groups were updated in the current batch + pub(super) scratch_group_ids: Vec, + /// Dense scratch table indexed by group id. Entries are tagged with an + /// epoch so we can reuse the allocation across batches without clearing it. + pub(super) scratch_dense: Vec, + /// Epoch corresponding to the current batch. + pub(super) scratch_epoch: u64, + /// Sparse scratch entries keyed by group id describing where the candidate + /// value for the group is stored during the current batch. + pub(super) scratch_sparse: HashMap, + /// Upper bound on the dense scratch size we are willing to allocate. The + /// bound is updated after each batch based on how "dense" the accessed + /// groups were so that we only pay for dense initialisation when we have + /// evidence that it will be reused. + pub(super) scratch_dense_limit: usize, + /// Whether the dense scratch table has been initialised. We defer creating + /// the dense table until the accumulator has processed at least one batch + /// so that short-lived accumulators can stick to the sparse path and avoid + /// zeroing large dense allocations upfront. + pub(super) scratch_dense_enabled: bool, + /// Tracks which implementation should be used for future batches. + pub(super) workload_mode: WorkloadMode, + /// Number of batches processed so far. Used in conjunction with + /// `total_groups_seen` when evaluating mode switches. + pub(super) processed_batches: usize, + /// Total number of groups observed across the lifetime of the accumulator. + pub(super) total_groups_seen: usize, + /// Highest group index seen so far. + pub(super) lifetime_max_group_index: Option, + /// Number of groups that currently have a materialised min/max value. + pub(super) populated_groups: usize, + /// Scratch entries reused by the classic simple implementation. + pub(super) simple_slots: Vec, + /// Epoch used to lazily reset `simple_slots` between batches. + pub(super) simple_epoch: u64, + /// Reusable list of groups touched by the simple path. + pub(super) simple_touched_groups: Vec, + /// Marker vector used by the dense inline implementation to detect first + /// touches without clearing a bitmap on every batch. + pub(super) dense_inline_marks: Vec, + /// Whether the dense inline marks vector should be prepared for the current + /// batch. We keep this disabled for the very first batch processed in dense + /// inline mode so that short-lived accumulators avoid the upfront + /// allocation and zeroing costs. Once a batch with values has been + /// observed we enable the flag so that subsequent batches allocate the mark + /// table on demand. + pub(super) dense_inline_marks_ready: bool, + /// Epoch associated with `dense_inline_marks`. + pub(super) dense_inline_epoch: u64, + /// Number of consecutive batches processed while remaining in + /// `DenseInline` mode. + pub(super) dense_inline_stable_batches: usize, + /// Whether the accumulator has committed to the dense inline fast path and + /// no longer needs to track per-batch statistics. + pub(super) dense_inline_committed: bool, + /// Total number of groups observed when the dense inline fast path was + /// committed. If the group domain grows beyond this value we need to + /// reconsider the workload mode. + pub(super) dense_inline_committed_groups: usize, + #[cfg(test)] + pub(super) dense_enable_invocations: usize, + #[cfg(test)] + pub(super) dense_sparse_detours: usize, } #[derive(Debug, Clone, Copy)] -enum MinMaxLocation<'a> { - /// the min/max value is stored in the existing `min_max` array - ExistingMinMax, - /// the min/max value is stored in the input array at the given index - Input(&'a [u8]), +pub(super) struct SimpleSlot { + epoch: u64, + location: SimpleLocation, } +impl SimpleSlot { + pub(super) fn new() -> Self { + Self { + epoch: 0, + location: SimpleLocation::Untouched, + } + } +} + +#[derive(Debug, Clone, Copy)] +pub(super) enum SimpleLocation { + Untouched, + Existing, + Batch(usize), +} + +#[derive(Debug, Clone, Copy)] +pub(super) enum ScratchLocation { + Existing, + Batch(usize), +} + +#[derive(Debug, Clone, Copy)] +pub(super) struct ScratchEntry { + pub(super) epoch: u64, + pub(super) location: ScratchLocation, +} + +impl ScratchEntry { + pub(super) fn new() -> Self { + Self { + epoch: 0, + location: ScratchLocation::Existing, + } + } +} + +/// Grow the dense scratch table by at least this many entries whenever we need +/// to expand it. Chunked growth keeps the amortized cost low while capping the +/// amount of zeroing we do per batch. +pub(super) const SCRATCH_DENSE_GROWTH_STEP: usize = 1024; + +/// Maximum number of groups for which the inline dense path is considered. +/// +/// Mode selection overview: +/// | Mode | Optimal For | Memory Footprint | Description | +/// | --------------- | -------------------------- | ---------------- | --------------------------------- | +/// | DenseInline | `N ≤ 100k`, ≥ 50% density | `O(N)` | Epoch-tracked, zero additional allocation. | +/// | Simple | `N > 100k`, medium density | `≈ 3 × O(N)` | Deferred materialization with scratch staging. | +/// | SparseOptimized | Very sparse or huge `N` | `O(touched)` | Hash-based tracking of populated groups. | +/// | Undecided | Initial batch | - | Gathers statistics then picks a mode. | +/// +/// Flowchart: +/// ```text +/// Undecided +/// ├─ N ≤ threshold & density ≥ 50% → DenseInline +/// ├─ N ≤ 100k & density ≥ 10% → Simple +/// └─ otherwise → SparseOptimized +/// ``` +/// +/// `100_000` was chosen from benchmark analysis. Even in the worst case the +/// DenseInline epoch vector consumes ≈ 800 KiB, which is still significantly +/// smaller than the multi-vector Simple mode and avoids its cache penalties. +/// +pub(super) const DENSE_INLINE_MAX_TOTAL_GROUPS: usize = 100_000; +/// Minimum observed density (in percent) required to remain on the inline dense +/// path. +pub(super) const DENSE_INLINE_MIN_DENSITY_PERCENT: usize = 50; + +/// Maximum number of groups for which the simple dense path is considered. +pub(super) const SIMPLE_MODE_MAX_TOTAL_GROUPS: usize = 100_000; +/// Minimum observed density (in percent) required to remain on the simple path. +/// +/// The density calculation compares the per-batch `unique_groups` against the +/// effective domain derived from `max_group_index`. Prior to fixing the +/// statistics bug described in docs/tasks/min_max_bytes_regression_v2.md the +/// thresholds were evaluated using inflated unique counts (effectively counting +/// every non-null row). Re-validating with the corrected per-batch counts shows +/// that a 10% density remains the tipping point where the simple path starts to +/// outperform the sparse implementation while avoiding the inline dense path's +/// mark bookkeeping. +pub(super) const SIMPLE_MODE_MIN_DENSITY_PERCENT: usize = 10; +/// Threshold after which the accumulator reevaluates whether it should switch +/// to the sparse implementation. +pub(super) const SPARSE_SWITCH_GROUP_THRESHOLD: usize = 100_000; +/// Maximum density (in percent) tolerated before switching from the simple path +/// to the sparse implementation. +pub(super) const SPARSE_SWITCH_MAX_DENSITY_PERCENT: usize = 1; + +/// Heuristic multiplier that determines whether a batch of groups should be +/// considered "dense". If the maximum group index touched is within this +/// multiple of the number of unique groups observed, we enable the dense +/// scratch for subsequent batches. +pub(super) const SCRATCH_DENSE_ENABLE_MULTIPLIER: usize = 8; + +/// After this many consecutive batches we consider DenseInline stable and +/// disable per-batch statistics tracking. +pub(super) const DENSE_INLINE_STABILITY_THRESHOLD: usize = 3; + /// Implement the MinMaxBytesAccumulator with a comparison function /// for comparing strings impl MinMaxBytesState { @@ -406,20 +623,45 @@ impl MinMaxBytesState { /// /// # Arguments: /// * `data_type`: The data type of the arrays that will be passed to this accumulator - fn new(data_type: DataType) -> Self { + pub(super) fn new(data_type: DataType) -> Self { Self { min_max: vec![], data_type, total_data_bytes: 0, + scratch_group_ids: vec![], + scratch_dense: vec![], + scratch_epoch: 0, + scratch_sparse: HashMap::new(), + scratch_dense_limit: 0, + scratch_dense_enabled: false, + workload_mode: WorkloadMode::Undecided, + processed_batches: 0, + total_groups_seen: 0, + lifetime_max_group_index: None, + populated_groups: 0, + simple_slots: vec![], + simple_epoch: 0, + simple_touched_groups: vec![], + dense_inline_marks: vec![], + dense_inline_marks_ready: false, + dense_inline_epoch: 0, + dense_inline_stable_batches: 0, + dense_inline_committed: false, + dense_inline_committed_groups: 0, + #[cfg(test)] + dense_enable_invocations: 0, + #[cfg(test)] + dense_sparse_detours: 0, } } /// Set the specified group to the given value, updating memory usage appropriately - fn set_value(&mut self, group_index: usize, new_val: &[u8]) { + pub(super) fn set_value(&mut self, group_index: usize, new_val: &[u8]) { match self.min_max[group_index].as_mut() { None => { self.min_max[group_index] = Some(new_val.to_vec()); self.total_data_bytes += new_val.len(); + self.populated_groups += 1; } Some(existing_val) => { // Copy data over to avoid re-allocating @@ -431,90 +673,1495 @@ impl MinMaxBytesState { } } - /// Updates the min/max values for the given string values + pub(super) fn resize_min_max(&mut self, total_num_groups: usize) { + if total_num_groups < self.min_max.len() { + let truncated = self.min_max.split_off(total_num_groups); + // iterate only over Some variants + for bytes in truncated.into_iter().flatten() { + debug_assert!(self.total_data_bytes >= bytes.len()); + debug_assert!(self.populated_groups > 0); + self.total_data_bytes -= bytes.len(); + self.populated_groups -= 1; + } + } else if total_num_groups > self.min_max.len() { + self.min_max.resize(total_num_groups, None); + } + } + + /// Dispatch to the appropriate implementation based on workload mode. + pub(super) fn update_batch<'a, F, I>( + &mut self, + iter: I, + group_indices: &[usize], + total_num_groups: usize, + cmp: F, + ) -> Result<()> + where + F: FnMut(&[u8], &[u8]) -> bool + Send + Sync, + I: IntoIterator>, + { + // Fast path: detect perfectly sequential dense group indices [0, 1, 2, ..., N-1] + // This is the common case for dense aggregations and matches the original + // pre-optimization algorithm behavior with zero overhead. + // + // We use a lightweight heuristic check: verify the batch covers every group + // exactly once by ensuring it spans the full domain and the indices are + // strictly sequential. + if group_indices.len() == total_num_groups + && !group_indices.is_empty() + && group_indices[0] == 0 + && group_indices[total_num_groups - 1] == total_num_groups - 1 + && group_indices.windows(2).all(|pair| pair[1] == pair[0] + 1) + { + let stats = self.update_batch_sequential_dense( + iter, + group_indices, + total_num_groups, + cmp, + )?; + self.record_batch_stats(stats, total_num_groups); + return Ok(()); + } + + let mut cmp = cmp; + match self.workload_mode { + WorkloadMode::SparseOptimized => { + let stats = self.update_batch_sparse_impl( + iter, + group_indices, + total_num_groups, + &mut cmp, + )?; + self.record_batch_stats(stats, total_num_groups); + Ok(()) + } + WorkloadMode::DenseInline => { + if self.dense_inline_committed + && total_num_groups > self.dense_inline_committed_groups + { + self.dense_inline_committed = false; + self.dense_inline_committed_groups = 0; + self.dense_inline_stable_batches = 0; + self.dense_inline_marks_ready = false; + } + + if self.dense_inline_committed { + self.update_batch_dense_inline_committed( + iter, + group_indices, + total_num_groups, + &mut cmp, + ) + } else { + let stats = self.update_batch_dense_inline_impl( + iter, + group_indices, + total_num_groups, + &mut cmp, + )?; + self.record_batch_stats(stats, total_num_groups); + Ok(()) + } + } + WorkloadMode::Simple => { + let stats = self.update_batch_simple_impl( + iter, + group_indices, + total_num_groups, + &mut cmp, + )?; + self.record_batch_stats(stats, total_num_groups); + Ok(()) + } + WorkloadMode::Undecided => { + let stats = if total_num_groups <= DENSE_INLINE_MAX_TOTAL_GROUPS { + self.update_batch_dense_inline_impl( + iter, + group_indices, + total_num_groups, + &mut cmp, + )? + } else { + self.update_batch_sparse_impl( + iter, + group_indices, + total_num_groups, + &mut cmp, + )? + }; + self.record_batch_stats(stats, total_num_groups); + Ok(()) + } + } + } + + fn update_batch_dense_inline_impl<'a, F, I>( + &mut self, + iter: I, + group_indices: &[usize], + total_num_groups: usize, + cmp: &mut F, + ) -> Result + where + F: FnMut(&[u8], &[u8]) -> bool + Send + Sync, + I: IntoIterator>, + { + self.resize_min_max(total_num_groups); + + let mut marks_prepared = false; + + let mut unique_groups = 0_usize; + let mut max_group_index: Option = None; + let mut fast_path = true; + let mut fast_rows = 0_usize; + let mut fast_start = 0_usize; + let mut fast_last = 0_usize; + + let mut last_group_index: Option = None; + let mut processed_any = false; + + for (&group_index, new_val) in group_indices.iter().zip(iter.into_iter()) { + let Some(new_val) = new_val else { + continue; + }; + + processed_any = true; + + if group_index >= self.min_max.len() { + return internal_err!( + "group index {group_index} out of bounds for {} groups", + self.min_max.len() + ); + } + + let is_consecutive_duplicate = last_group_index == Some(group_index); + last_group_index = Some(group_index); + + if fast_path { + if fast_rows == 0 { + fast_start = group_index; + fast_last = group_index; + } else if group_index == fast_last + 1 { + fast_last = group_index; + } else { + if !marks_prepared { + self.prepare_dense_inline_marks(total_num_groups); + marks_prepared = true; + } + fast_path = false; + if fast_rows > 0 { + let fast_unique = + fast_last.saturating_sub(fast_start).saturating_add(1); + unique_groups = fast_unique; + max_group_index = Some(match max_group_index { + Some(current_max) => current_max.max(fast_last), + None => fast_last, + }); + + let epoch = self.dense_inline_epoch; + // iterate over the mutable slice instead of indexing by range + let marks = &mut self.dense_inline_marks; + for mark in marks.iter_mut().take(fast_last + 1).skip(fast_start) + { + *mark = epoch; + } + } + } + + if fast_path { + fast_rows = fast_rows.saturating_add(1); + } + } + + if !fast_path && !is_consecutive_duplicate { + if !marks_prepared { + self.prepare_dense_inline_marks(total_num_groups); + marks_prepared = true; + } + let mark = &mut self.dense_inline_marks[group_index]; + if *mark != self.dense_inline_epoch { + *mark = self.dense_inline_epoch; + unique_groups = unique_groups.saturating_add(1); + max_group_index = Some(match max_group_index { + Some(current_max) => current_max.max(group_index), + None => group_index, + }); + } + } + + let should_replace = match self.min_max[group_index].as_ref() { + Some(existing_val) => cmp(new_val, existing_val.as_ref()), + None => true, + }; + + if should_replace { + self.set_value(group_index, new_val); + } + } + + if fast_path && fast_rows > 0 { + let fast_unique = fast_last.saturating_sub(fast_start).saturating_add(1); + unique_groups = fast_unique; + max_group_index = Some(match max_group_index { + Some(current_max) => current_max.max(fast_last), + None => fast_last, + }); + } + + // Only prepare marks if we've processed at least one batch already. + // This indicates the accumulator is being reused across multiple batches. + // For single-batch scenarios, we avoid the allocation overhead entirely. + if processed_any && self.processed_batches > 0 { + self.dense_inline_marks_ready = true; + } + + Ok(BatchStats { + unique_groups, + max_group_index, + }) + } + + fn prepare_dense_inline_marks(&mut self, total_num_groups: usize) { + if !self.dense_inline_marks_ready { + self.dense_inline_marks_ready = true; + } + + if self.dense_inline_marks.len() < total_num_groups { + self.dense_inline_marks.resize(total_num_groups, 0_u64); + } + + self.dense_inline_epoch = self.dense_inline_epoch.wrapping_add(1); + if self.dense_inline_epoch == 0 { + for mark in &mut self.dense_inline_marks { + *mark = 0; + } + self.dense_inline_epoch = 1; + } + } + + /// Fast path for perfectly sequential dense group indices [0, 1, 2, ..., N-1]. /// - /// `cmp` is the comparison function to use, called like `cmp(new_val, existing_val)` - /// returns true if the `new_val` should replace `existing_val` - fn update_batch<'a, F, I>( + /// This implementation exactly replicates the original pre-optimization algorithm + /// to achieve zero overhead for the common dense case. Each group appears at most + /// once per batch so we can evaluate the winning value in a single pass and update + /// `self.min_max` immediately when the new value beats the current minimum/maximum. + pub(super) fn update_batch_sequential_dense<'a, F, I>( &mut self, iter: I, group_indices: &[usize], total_num_groups: usize, mut cmp: F, - ) -> Result<()> + ) -> Result where F: FnMut(&[u8], &[u8]) -> bool + Send + Sync, I: IntoIterator>, { - self.min_max.resize(total_num_groups, None); - // Minimize value copies by calculating the new min/maxes for each group - // in this batch (either the existing min/max or the new input value) - // and updating the owned values in `self.min_maxes` at most once - let mut locations = vec![MinMaxLocation::ExistingMinMax; total_num_groups]; - - // Figure out the new min value for each group - for (new_val, group_index) in iter.into_iter().zip(group_indices.iter()) { + self.resize_min_max(total_num_groups); + + let mut unique_groups = 0_usize; + let mut max_group_index: Option = None; + + // Figure out the new min/max value for each group. The sequential fast + // path is only selected when `group_indices` is exactly `[0, 1, ..., N-1]` + // for the supplied `total_num_groups`, so each non-null row corresponds + // to a unique group id. This keeps the loop read-mostly: we only write + // into `self.min_max` when a new value actually wins. + for (position, (new_val, group_index)) in + iter.into_iter().zip(group_indices.iter()).enumerate() + { let group_index = *group_index; + debug_assert_eq!( + group_index, position, + "sequential dense path expects strictly sequential group ids" + ); + + // Track the largest group index encountered in this batch. Unlike + // `unique_groups`, this intentionally considers every row (including + // duplicates) because the domain size we derive from + // `max_group_index` only depends on the highest index touched, not on + // how many distinct groups contributed to it. This must happen even + // for null rows to ensure the dense fast path sees the full domain. + max_group_index = Some(match max_group_index { + Some(current_max) => current_max.max(group_index), + None => group_index, + }); + let Some(new_val) = new_val else { continue; // skip nulls }; - let existing_val = match locations[group_index] { - // previous input value was the min/max, so compare it - MinMaxLocation::Input(existing_val) => existing_val, - MinMaxLocation::ExistingMinMax => { - let Some(existing_val) = self.min_max[group_index].as_ref() else { - // no existing min/max, so this is the new min/max - locations[group_index] = MinMaxLocation::Input(new_val); - continue; - }; - existing_val.as_ref() - } + unique_groups = unique_groups.saturating_add(1); + + let should_replace = match self.min_max[group_index].as_ref() { + Some(existing_val) => cmp(new_val, existing_val.as_ref()), + None => true, }; - // Compare the new value to the existing value, replacing if necessary - if cmp(new_val, existing_val) { - locations[group_index] = MinMaxLocation::Input(new_val); + if should_replace { + self.set_value(group_index, new_val); } } + Ok(BatchStats { + unique_groups, + max_group_index, + }) + } + + /// Fast path for DenseInline once the workload has been deemed stable. + /// + /// No statistics or mark tracking is required: simply update the + /// materialised values in place. + fn update_batch_dense_inline_committed<'a, F, I>( + &mut self, + iter: I, + group_indices: &[usize], + total_num_groups: usize, + cmp: &mut F, + ) -> Result<()> + where + F: FnMut(&[u8], &[u8]) -> bool + Send + Sync, + I: IntoIterator>, + { + self.resize_min_max(total_num_groups); + + for (&group_index, new_val) in group_indices.iter().zip(iter.into_iter()) { + let Some(new_val) = new_val else { + continue; + }; + + if group_index >= self.min_max.len() { + return internal_err!( + "group index {group_index} out of bounds for {} groups", + self.min_max.len() + ); + } - // Update self.min_max with any new min/max values we found in the input - for (group_index, location) in locations.iter().enumerate() { - match location { - MinMaxLocation::ExistingMinMax => {} - MinMaxLocation::Input(new_val) => self.set_value(group_index, new_val), + let should_replace = match self.min_max[group_index].as_ref() { + Some(existing_val) => cmp(new_val, existing_val.as_ref()), + None => true, + }; + + if should_replace { + self.set_value(group_index, new_val); } } + Ok(()) } + fn update_batch_simple_impl<'a, F, I>( + &mut self, + iter: I, + group_indices: &[usize], + total_num_groups: usize, + cmp: &mut F, + ) -> Result + where + F: FnMut(&[u8], &[u8]) -> bool + Send + Sync, + I: IntoIterator>, + { + self.resize_min_max(total_num_groups); + + if self.simple_slots.len() < total_num_groups { + self.simple_slots + .resize_with(total_num_groups, SimpleSlot::new); + } + + self.simple_epoch = self.simple_epoch.wrapping_add(1); + if self.simple_epoch == 0 { + for slot in &mut self.simple_slots { + slot.epoch = 0; + slot.location = SimpleLocation::Untouched; + } + self.simple_epoch = 1; + } + + let mut touched_groups = std::mem::take(&mut self.simple_touched_groups); + touched_groups.clear(); + let mut batch_inputs: Vec<&[u8]> = Vec::with_capacity(group_indices.len()); + let mut unique_groups = 0_usize; + let mut max_group_index: Option = None; + + for (&group_index, new_val) in group_indices.iter().zip(iter.into_iter()) { + let Some(new_val) = new_val else { + continue; + }; + + if group_index >= self.simple_slots.len() { + return internal_err!( + "group index {group_index} out of bounds for {} simple slots", + self.simple_slots.len() + ); + } + + let slot = &mut self.simple_slots[group_index]; + if slot.epoch != self.simple_epoch { + slot.epoch = self.simple_epoch; + slot.location = SimpleLocation::Untouched; + touched_groups.push(group_index); + unique_groups += 1; + max_group_index = Some(match max_group_index { + Some(current_max) => current_max.max(group_index), + None => group_index, + }); + } + + match slot.location { + SimpleLocation::Untouched => match self.min_max[group_index].as_ref() { + Some(existing_val) => { + if cmp(new_val, existing_val.as_ref()) { + let batch_index = batch_inputs.len(); + batch_inputs.push(new_val); + slot.location = SimpleLocation::Batch(batch_index); + } else { + slot.location = SimpleLocation::Existing; + } + } + None => { + let batch_index = batch_inputs.len(); + batch_inputs.push(new_val); + slot.location = SimpleLocation::Batch(batch_index); + } + }, + SimpleLocation::Existing => { + let existing_val = self.min_max[group_index] + .as_ref() + .expect("existing value must be present") + .as_ref(); + if cmp(new_val, existing_val) { + let batch_index = batch_inputs.len(); + batch_inputs.push(new_val); + slot.location = SimpleLocation::Batch(batch_index); + } + } + SimpleLocation::Batch(existing_index) => { + let existing_val = batch_inputs[existing_index]; + if cmp(new_val, existing_val) { + let batch_index = batch_inputs.len(); + batch_inputs.push(new_val); + slot.location = SimpleLocation::Batch(batch_index); + } + } + } + } + + for &group_index in &touched_groups { + if let SimpleLocation::Batch(batch_index) = + self.simple_slots[group_index].location + { + self.set_value(group_index, batch_inputs[batch_index]); + } + } + + touched_groups.clear(); + self.simple_touched_groups = touched_groups; + + Ok(BatchStats { + unique_groups, + max_group_index, + }) + } + + /// Record batch statistics and adaptively select or transition workload mode. + /// + /// This function implements the adaptive mode selection heuristic that + /// improves performance in multi-batch workloads at the cost of some + /// overhead in single-batch scenarios. The overhead comes from tracking + /// `unique_groups` and `max_group_index` statistics needed to evaluate + /// density and choose the optimal execution path. + /// Capture per-batch statistics and feed them into the adaptive mode + /// selection heuristic. + /// + /// * `stats.unique_groups` counts the distinct group ids in **this** batch. + /// It is accumulated into `self.total_groups_seen` so the sparse path can + /// reason about long-lived density trends. + /// * `stats.max_group_index` captures the largest identifier touched in the + /// batch and therefore the effective domain size used for density + /// comparisons. + /// * `total_num_groups` is the logical domain configured by the execution + /// plan. It acts as an upper bound for allocations and is used alongside + /// `unique_groups` to reason about per-batch density. + pub(super) fn record_batch_stats( + &mut self, + stats: BatchStats, + total_num_groups: usize, + ) { + self.processed_batches = self.processed_batches.saturating_add(1); + if stats.unique_groups == 0 { + return; + } + + self.total_groups_seen = + self.total_groups_seen.saturating_add(stats.unique_groups); + if let Some(max_group_index) = stats.max_group_index { + self.lifetime_max_group_index = Some(match self.lifetime_max_group_index { + Some(previous) => previous.max(max_group_index), + None => max_group_index, + }); + } + + match self.workload_mode { + WorkloadMode::Undecided => { + if let Some(max_group_index) = stats.max_group_index { + let domain = max_group_index + 1; + if self.should_use_dense_inline(total_num_groups, stats.unique_groups) + { + if !matches!(self.workload_mode, WorkloadMode::DenseInline) { + self.enter_dense_inline_mode(); + } + self.workload_mode = WorkloadMode::DenseInline; + self.dense_inline_marks_ready = true; + } else if self.should_use_simple( + total_num_groups, + stats.unique_groups, + domain, + ) { + if !matches!(self.workload_mode, WorkloadMode::Simple) { + self.enter_simple_mode(); + } + self.workload_mode = WorkloadMode::Simple; + } else { + if !matches!(self.workload_mode, WorkloadMode::SparseOptimized) { + self.enter_sparse_mode(); + } + self.workload_mode = WorkloadMode::SparseOptimized; + } + } + } + WorkloadMode::DenseInline => { + if self.dense_inline_committed { + return; + } + + if self.should_switch_to_sparse() { + self.enter_sparse_mode(); + self.workload_mode = WorkloadMode::SparseOptimized; + self.dense_inline_stable_batches = 0; + } else if let Some(max_group_index) = stats.max_group_index { + let domain = max_group_index + 1; + if !self + .should_use_dense_inline(total_num_groups, stats.unique_groups) + { + self.dense_inline_stable_batches = 0; + if self.should_use_simple( + total_num_groups, + stats.unique_groups, + domain, + ) { + self.enter_simple_mode(); + self.workload_mode = WorkloadMode::Simple; + } else { + self.enter_sparse_mode(); + self.workload_mode = WorkloadMode::SparseOptimized; + } + } else { + self.dense_inline_stable_batches = + self.dense_inline_stable_batches.saturating_add(1); + if self.dense_inline_stable_batches + >= DENSE_INLINE_STABILITY_THRESHOLD + { + self.dense_inline_committed = true; + self.dense_inline_committed_groups = total_num_groups; + self.dense_inline_marks.clear(); + self.dense_inline_marks_ready = false; + } + } + } + } + WorkloadMode::Simple => { + if self.should_switch_to_sparse() { + self.enter_sparse_mode(); + self.workload_mode = WorkloadMode::SparseOptimized; + } + } + WorkloadMode::SparseOptimized => { + // Remain in sparse mode. We currently do not switch back to the + // simple mode because sparse workloads tend to stay sparse. + } + } + } + + fn should_use_dense_inline( + &self, + total_num_groups: usize, + unique_groups: usize, + ) -> bool { + if total_num_groups == 0 || total_num_groups > DENSE_INLINE_MAX_TOTAL_GROUPS { + return false; + } + + Self::density_at_least( + unique_groups, + total_num_groups, + DENSE_INLINE_MIN_DENSITY_PERCENT, + ) + } + + fn should_use_simple( + &self, + total_num_groups: usize, + unique_groups: usize, + domain: usize, + ) -> bool { + if total_num_groups > SIMPLE_MODE_MAX_TOTAL_GROUPS || domain == 0 { + return false; + } + Self::density_at_least(unique_groups, domain, SIMPLE_MODE_MIN_DENSITY_PERCENT) + } + + fn should_switch_to_sparse(&self) -> bool { + if self.populated_groups <= SPARSE_SWITCH_GROUP_THRESHOLD { + return false; + } + let Some(max_group_index) = self.lifetime_max_group_index else { + return false; + }; + let domain = max_group_index + 1; + if domain == 0 { + return false; + } + + !Self::density_at_least( + self.populated_groups, + domain, + SPARSE_SWITCH_MAX_DENSITY_PERCENT, + ) + } + + /// Returns `true` when the observed population covers at least `percent` + /// percent of the provided domain. + #[inline] + fn density_at_least(observed: usize, domain: usize, percent: usize) -> bool { + if domain == 0 || percent == 0 { + return false; + } + + let observed_scaled = observed.saturating_mul(100); + let required_scaled = domain.saturating_mul(percent); + observed_scaled >= required_scaled + } + + fn enter_simple_mode(&mut self) { + self.scratch_group_ids.clear(); + self.scratch_sparse.clear(); + self.scratch_dense.clear(); + self.scratch_dense_limit = 0; + self.scratch_dense_enabled = false; + self.simple_touched_groups.clear(); + self.dense_inline_stable_batches = 0; + self.dense_inline_committed = false; + self.dense_inline_committed_groups = 0; + self.dense_inline_marks_ready = false; + } + + fn enter_sparse_mode(&mut self) { + // Ensure the dense scratch table starts from a clean slate when we + // enter sparse mode. Subsequent batches will lazily enable and grow the + // dense scratch as required by the existing heuristics. + self.scratch_dense_enabled = false; + self.scratch_dense_limit = 0; + self.scratch_dense.clear(); + self.dense_inline_stable_batches = 0; + self.dense_inline_committed = false; + self.dense_inline_committed_groups = 0; + self.dense_inline_marks_ready = false; + } + + fn enter_dense_inline_mode(&mut self) { + self.enter_simple_mode(); + self.dense_inline_stable_batches = 0; + self.dense_inline_committed = false; + self.dense_inline_committed_groups = 0; + self.dense_inline_marks_ready = false; + } + + /// Updates the min/max values for the given string values + /// + /// `cmp` is the comparison function to use, called like `cmp(new_val, existing_val)` + /// returns true if the `new_val` should replace `existing_val` + fn update_batch_sparse_impl<'a, F, I>( + &mut self, + iter: I, + group_indices: &[usize], + total_num_groups: usize, + cmp: &mut F, + ) -> Result + where + F: FnMut(&[u8], &[u8]) -> bool + Send + Sync, + I: IntoIterator>, + { + let prepared = self.prepare_sparse_batch(total_num_groups); + let mut state = SparseBatchState::new(prepared, group_indices.len()); + + let mut values_iter = iter.into_iter(); + let mut processed = 0usize; + for (&group_index, new_val) in group_indices.iter().zip(&mut values_iter) { + processed += 1; + + let Some(new_val) = new_val else { + continue; + }; + + if group_index >= self.min_max.len() { + return internal_err!( + "group index {group_index} out of bounds for {} groups", + self.min_max.len() + ); + } + + self.process_sparse_value( + group_index, + new_val, + total_num_groups, + &mut state, + cmp, + ); + } + + debug_assert!( + values_iter.next().is_none(), + "value iterator longer than group indices" + ); + + if processed != group_indices.len() { + return internal_err!( + "value iterator shorter than group indices (processed {processed}, expected {})", + group_indices.len() + ); + } + + self.finalize_sparse_batch(state, total_num_groups) + } + + fn prepare_sparse_batch(&mut self, total_num_groups: usize) -> PreparedSparseBatch { + self.resize_min_max(total_num_groups); + + #[cfg(test)] + { + self.dense_sparse_detours = 0; + } + + self.scratch_epoch = self.scratch_epoch.wrapping_add(1); + if self.scratch_epoch == 0 { + for entry in &mut self.scratch_dense { + entry.epoch = 0; + entry.location = ScratchLocation::Existing; + } + self.scratch_epoch = 1; + } + + debug_assert!(self.scratch_sparse.is_empty()); + let scratch_sparse = std::mem::take(&mut self.scratch_sparse); + let scratch_group_ids = std::mem::take(&mut self.scratch_group_ids); + + self.scratch_dense_limit = self.scratch_dense_limit.min(total_num_groups); + let use_dense = self.scratch_dense_enabled && self.scratch_dense_limit > 0; + + PreparedSparseBatch { + scratch_sparse, + scratch_group_ids, + use_dense, + } + } + + fn process_sparse_value<'a, F>( + &mut self, + group_index: usize, + new_val: &'a [u8], + total_num_groups: usize, + state: &mut SparseBatchState<'a>, + cmp: &mut F, + ) where + F: FnMut(&[u8], &[u8]) -> bool + Send + Sync, + { + loop { + match self.apply_sparse_value( + group_index, + new_val, + total_num_groups, + state, + cmp, + ) { + ProcessResult::Processed => break, + ProcessResult::Retry => continue, + } + } + } + + fn apply_sparse_value<'a, F>( + &mut self, + group_index: usize, + new_val: &'a [u8], + total_num_groups: usize, + state: &mut SparseBatchState<'a>, + cmp: &mut F, + ) -> ProcessResult + where + F: FnMut(&[u8], &[u8]) -> bool + Send + Sync, + { + if state.use_dense { + match self.try_process_dense_path( + group_index, + new_val, + total_num_groups, + state, + cmp, + ) { + DenseResult::Handled => return ProcessResult::Processed, + DenseResult::Retry => return ProcessResult::Retry, + DenseResult::Fallback => {} + } + } + + self.process_sparse_path(group_index, new_val, total_num_groups, state, cmp) + } + + fn try_process_dense_path<'a, F>( + &mut self, + group_index: usize, + new_val: &'a [u8], + total_num_groups: usize, + state: &mut SparseBatchState<'a>, + cmp: &mut F, + ) -> DenseResult + where + F: FnMut(&[u8], &[u8]) -> bool + Send + Sync, + { + let mut allow_dense = group_index < self.scratch_dense_limit; + + if !allow_dense { + let (potential_unique, potential_max) = + state.potential_first_touch_metrics(group_index); + if let Some(candidate_limit) = self.evaluate_dense_candidate( + potential_unique, + potential_max, + total_num_groups, + ) { + let mut desired_limit = candidate_limit; + if desired_limit < self.scratch_dense_limit + SCRATCH_DENSE_GROWTH_STEP { + desired_limit = (self.scratch_dense_limit + + SCRATCH_DENSE_GROWTH_STEP) + .min(total_num_groups); + } + desired_limit = desired_limit.min(total_num_groups); + if self.expand_dense_limit(desired_limit, state) { + return DenseResult::Retry; + } + allow_dense = group_index < self.scratch_dense_limit; + } + } + + if !allow_dense { + #[cfg(test)] + { + debug_assert!(self.scratch_dense_enabled); + self.dense_sparse_detours += 1; + } + return DenseResult::Fallback; + } + + let mut pending_dense_growth = None; + let mut first_touch = false; + { + let entry = &mut self.scratch_dense[group_index]; + if entry.epoch != self.scratch_epoch { + entry.epoch = self.scratch_epoch; + entry.location = ScratchLocation::Existing; + first_touch = true; + } + + Self::update_scratch_location( + &mut entry.location, + group_index, + new_val, + cmp, + &mut state.batch_inputs, + &self.min_max, + ); + } + + if first_touch { + state.scratch_group_ids.push(group_index); + state.record_first_touch(group_index); + if let Some(max_group_index) = state.batch_max_group_index { + let mut desired_limit = max_group_index + 1; + if desired_limit < self.scratch_dense_limit + SCRATCH_DENSE_GROWTH_STEP { + desired_limit = (self.scratch_dense_limit + + SCRATCH_DENSE_GROWTH_STEP) + .min(total_num_groups); + } + pending_dense_growth = Some(desired_limit.min(total_num_groups)); + } + } + + if let Some(desired_limit) = pending_dense_growth { + self.expand_dense_limit(desired_limit, state); + } + + DenseResult::Handled + } + + fn process_sparse_path<'a, F>( + &mut self, + group_index: usize, + new_val: &'a [u8], + total_num_groups: usize, + state: &mut SparseBatchState<'a>, + cmp: &mut F, + ) -> ProcessResult + where + F: FnMut(&[u8], &[u8]) -> bool + Send + Sync, + { + let mut first_touch = false; + let mut evaluated_dense_candidate = false; + loop { + match state.scratch_sparse.entry(group_index) { + Entry::Occupied(_) => { + break; + } + Entry::Vacant(_) => { + first_touch = true; + + if !evaluated_dense_candidate { + evaluated_dense_candidate = true; + // To avoid holding the VacantEntry guard across an + // immutable call on `state`, re-acquire metrics first + // by snapshotting what we need and then decide. + let (potential_unique, potential_max) = + state.potential_first_touch_metrics(group_index); + if let Some(candidate_limit) = self.evaluate_dense_candidate( + potential_unique, + potential_max, + total_num_groups, + ) { + if !state.dense_activated_this_batch + && self.enable_dense_for_batch( + candidate_limit, + &mut state.scratch_sparse, + &mut state.scratch_group_ids[..], + ) + { + state.dense_activated_this_batch = true; + state.use_dense = true; + return ProcessResult::Retry; + } else if state.dense_activated_this_batch + && self.expand_dense_limit(candidate_limit, state) + { + return ProcessResult::Retry; + } + + // candidate not accepted -> continue the loop and + // re-check the entry so we can insert below. + continue; + } + + // insert into the vacant slot now that we've finished + // the immutable checks + match state.scratch_sparse.entry(group_index) { + Entry::Vacant(vacant) => { + vacant.insert(ScratchLocation::Existing); + break; + } + Entry::Occupied(_) => break, + } + } + + // If we've already evaluated the dense candidate, we still + // need to insert into the vacant slot. Acquire the vacant + // entry fresh and insert. + match state.scratch_sparse.entry(group_index) { + Entry::Vacant(vacant) => { + vacant.insert(ScratchLocation::Existing); + break; + } + Entry::Occupied(_) => break, + } + } + } + } + + if first_touch { + state.scratch_group_ids.push(group_index); + state.record_first_touch(group_index); + if let Some(candidate_limit) = self.evaluate_dense_candidate( + state.batch_unique_groups, + state.batch_max_group_index, + total_num_groups, + ) { + if !state.dense_activated_this_batch + && self.enable_dense_for_batch( + candidate_limit, + &mut state.scratch_sparse, + &mut state.scratch_group_ids[..], + ) + { + state.dense_activated_this_batch = true; + state.use_dense = true; + return ProcessResult::Retry; + } else if state.dense_activated_this_batch + && self.expand_dense_limit(candidate_limit, state) + { + return ProcessResult::Retry; + } + } + } + + let location = state + .scratch_sparse + .entry(group_index) + .or_insert(ScratchLocation::Existing); + Self::update_scratch_location( + location, + group_index, + new_val, + cmp, + &mut state.batch_inputs, + &self.min_max, + ); + ProcessResult::Processed + } + + fn update_scratch_location<'a, F>( + location: &mut ScratchLocation, + group_index: usize, + new_val: &'a [u8], + cmp: &mut F, + batch_inputs: &mut Vec<&'a [u8]>, + min_max: &[Option>], + ) where + F: FnMut(&[u8], &[u8]) -> bool + Send + Sync, + { + match *location { + ScratchLocation::Existing => { + let Some(existing_val) = min_max[group_index].as_ref() else { + let batch_index = batch_inputs.len(); + batch_inputs.push(new_val); + *location = ScratchLocation::Batch(batch_index); + return; + }; + if cmp(new_val, existing_val.as_ref()) { + let batch_index = batch_inputs.len(); + batch_inputs.push(new_val); + *location = ScratchLocation::Batch(batch_index); + } + } + ScratchLocation::Batch(existing_idx) => { + let existing_val = batch_inputs[existing_idx]; + if cmp(new_val, existing_val) { + let batch_index = batch_inputs.len(); + batch_inputs.push(new_val); + *location = ScratchLocation::Batch(batch_index); + } + } + } + } + + fn finalize_sparse_batch<'a>( + &mut self, + state: SparseBatchState<'a>, + total_num_groups: usize, + ) -> Result { + let SparseBatchState { + mut scratch_sparse, + mut scratch_group_ids, + batch_inputs, + batch_unique_groups, + batch_max_group_index, + dense_activated_this_batch: _, + use_dense, + } = state; + + if use_dense { + self.scratch_dense_enabled = true; + } + + let mut max_group_index = batch_max_group_index; + for group_index in scratch_group_ids.iter().copied() { + match max_group_index { + Some(current_max) if current_max >= group_index => {} + _ => max_group_index = Some(group_index), + } + + if group_index < self.scratch_dense.len() { + let entry = &mut self.scratch_dense[group_index]; + if entry.epoch == self.scratch_epoch { + if let ScratchLocation::Batch(batch_index) = entry.location { + self.set_value(group_index, batch_inputs[batch_index]); + } + continue; + } + } + + if let Some(ScratchLocation::Batch(batch_index)) = + scratch_sparse.remove(&group_index) + { + self.set_value(group_index, batch_inputs[batch_index]); + } + } + + let unique_groups = batch_unique_groups; + scratch_group_ids.clear(); + scratch_sparse.clear(); + self.scratch_sparse = scratch_sparse; + self.scratch_group_ids = scratch_group_ids; + + if let (Some(max_group_index), true) = (max_group_index, unique_groups > 0) { + let candidate_limit = (max_group_index + 1).min(total_num_groups); + if candidate_limit <= unique_groups * SCRATCH_DENSE_ENABLE_MULTIPLIER { + self.scratch_dense_limit = candidate_limit; + } else if !self.scratch_dense_enabled { + self.scratch_dense_limit = 0; + } + } + + self.scratch_dense_limit = self.scratch_dense_limit.min(total_num_groups); + + Ok(BatchStats { + unique_groups, + max_group_index, + }) + } + + fn evaluate_dense_candidate( + &self, + batch_unique_groups: usize, + batch_max_group_index: Option, + total_num_groups: usize, + ) -> Option { + if batch_unique_groups == 0 { + return None; + } + let max_group_index = batch_max_group_index?; + let candidate_limit = (max_group_index + 1).min(total_num_groups); + if candidate_limit == 0 { + return None; + } + if candidate_limit <= batch_unique_groups * SCRATCH_DENSE_ENABLE_MULTIPLIER { + Some(candidate_limit) + } else { + None + } + } + + /// Enable the dense scratch table for the current batch, migrating any + /// existing scratch entries that fall within the dense limit. This method is + /// intentionally invoked at most once per batch to avoid repeatedly + /// scanning `scratch_group_ids`. + fn enable_dense_for_batch( + &mut self, + candidate_limit: usize, + scratch_sparse: &mut HashMap, + scratch_group_ids: &mut [usize], + ) -> bool { + if candidate_limit == 0 { + return false; + } + + let candidate_limit = candidate_limit.min(self.min_max.len()); + if candidate_limit == 0 { + return false; + } + + self.scratch_dense_limit = candidate_limit; + self.scratch_dense_enabled = true; + if self.scratch_dense.len() < self.scratch_dense_limit { + self.scratch_dense + .resize(self.scratch_dense_limit, ScratchEntry::new()); + } + + for &group_index in scratch_group_ids.iter() { + if group_index >= self.scratch_dense_limit { + continue; + } + + let entry = &mut self.scratch_dense[group_index]; + if entry.epoch != self.scratch_epoch { + let location = scratch_sparse + .remove(&group_index) + .unwrap_or(ScratchLocation::Existing); + entry.epoch = self.scratch_epoch; + entry.location = location; + } else if let Some(location) = scratch_sparse.remove(&group_index) { + entry.location = location; + } + } + + #[cfg(test)] + { + self.dense_enable_invocations += 1; + } + + true + } + + /// Increase the dense limit for the current batch without remigrating + /// previously processed groups. Returns `true` if the limit was expanded so + /// the caller can retry handling the current group using the dense path. + fn expand_dense_limit<'a>( + &mut self, + candidate_limit: usize, + state: &mut SparseBatchState<'a>, + ) -> bool { + if candidate_limit <= self.scratch_dense_limit { + return false; + } + + let candidate_limit = candidate_limit.min(self.min_max.len()); + if candidate_limit <= self.scratch_dense_limit { + return false; + } + + let previous_limit = self.scratch_dense_limit; + self.scratch_dense_limit = candidate_limit; + if self.scratch_dense.len() < self.scratch_dense_limit { + self.scratch_dense + .resize(self.scratch_dense_limit, ScratchEntry::new()); + } + + if self.scratch_dense_enabled { + // Preserve staged candidates for groups that move from the sparse map into + // the newly expanded dense range so we do not lose per-batch minima when + // reprocessing the current row. + for &group_index in state.scratch_group_ids.iter() { + if group_index >= self.scratch_dense_limit { + continue; + } + + let entry = &mut self.scratch_dense[group_index]; + if entry.epoch != self.scratch_epoch { + let location = state + .scratch_sparse + .remove(&group_index) + .unwrap_or(ScratchLocation::Existing); + entry.epoch = self.scratch_epoch; + entry.location = location; + } else if let Some(location) = state.scratch_sparse.remove(&group_index) { + entry.location = location; + } + } + + // If we are expanding from a zero limit, enable dense tracking so future + // iterations can reuse the migrated state without reactivation. + if previous_limit == 0 { + self.scratch_dense_enabled = true; + } + } + + true + } + /// Emits the specified min_max values /// /// Returns (data_capacity, min_maxes), updating the current value of total_data_bytes /// /// - `data_capacity`: the total length of all strings and their contents, /// - `min_maxes`: the actual min/max values for each group - fn emit_to(&mut self, emit_to: EmitTo) -> (usize, Vec>>) { + pub(super) fn emit_to(&mut self, emit_to: EmitTo) -> (usize, Vec>>) { match emit_to { EmitTo::All => { - ( - std::mem::take(&mut self.total_data_bytes), // reset total bytes and min_max - std::mem::take(&mut self.min_max), - ) + let total_bytes = std::mem::take(&mut self.total_data_bytes); + let min_max = std::mem::take(&mut self.min_max); + self.reset_after_full_emit(); + (total_bytes, min_max) } EmitTo::First(n) => { let first_min_maxes: Vec<_> = self.min_max.drain(..n).collect(); + let drained_populated = first_min_maxes + .iter() + .filter(|value| value.is_some()) + .count(); let first_data_capacity: usize = first_min_maxes .iter() .map(|opt| opt.as_ref().map(|s| s.len()).unwrap_or(0)) .sum(); - self.total_data_bytes -= first_data_capacity; + self.total_data_bytes = + self.total_data_bytes.saturating_sub(first_data_capacity); + self.populated_groups = + self.populated_groups.saturating_sub(drained_populated); + self.realign_after_partial_emit(n); + if self.min_max.is_empty() { + self.reset_after_full_emit(); + } (first_data_capacity, first_min_maxes) } } } - fn size(&self) -> usize { - self.total_data_bytes + self.min_max.len() * size_of::>>() + fn reset_after_full_emit(&mut self) { + self.total_data_bytes = 0; + self.populated_groups = 0; + self.scratch_group_ids.clear(); + self.scratch_dense.clear(); + self.scratch_sparse.clear(); + self.scratch_epoch = 0; + self.scratch_dense_limit = 0; + self.scratch_dense_enabled = false; + self.workload_mode = WorkloadMode::Undecided; + self.processed_batches = 0; + self.total_groups_seen = 0; + self.lifetime_max_group_index = None; + self.simple_slots.clear(); + self.simple_epoch = 0; + self.simple_touched_groups.clear(); + self.dense_inline_marks.clear(); + self.dense_inline_marks_ready = false; + self.dense_inline_epoch = 0; + self.dense_inline_stable_batches = 0; + self.dense_inline_committed = false; + self.dense_inline_committed_groups = 0; + #[cfg(test)] + { + self.dense_enable_invocations = 0; + self.dense_sparse_detours = 0; + } } + + fn realign_after_partial_emit(&mut self, emitted: usize) { + if emitted == 0 { + return; + } + + let remaining = self.min_max.len(); + if remaining == 0 { + return; + } + + self.processed_batches = 0; + self.total_groups_seen = self.populated_groups; + self.lifetime_max_group_index = Some(remaining - 1); + + self.scratch_group_ids.clear(); + self.scratch_sparse.clear(); + self.scratch_epoch = 0; + self.scratch_dense_enabled = false; + self.scratch_dense_limit = 0; + self.scratch_dense.clear(); + + if emitted >= self.simple_slots.len() { + self.simple_slots.clear(); + } else { + self.simple_slots.drain(..emitted); + } + self.simple_slots.truncate(remaining); + for slot in &mut self.simple_slots { + slot.epoch = 0; + slot.location = SimpleLocation::Untouched; + } + self.simple_epoch = 0; + self.simple_touched_groups.clear(); + + if emitted >= self.dense_inline_marks.len() { + self.dense_inline_marks.clear(); + } else { + self.dense_inline_marks.drain(..emitted); + } + self.dense_inline_marks.truncate(remaining); + self.dense_inline_marks_ready = false; + self.dense_inline_epoch = 0; + self.dense_inline_stable_batches = 0; + self.dense_inline_committed = false; + self.dense_inline_committed_groups = 0; + } + + pub(super) fn size(&self) -> usize { + let mut size = size_of::(); + + size = size.saturating_add(self.total_data_bytes); + size = size.saturating_add(vec_allocation_bytes(&self.min_max)); + size = size.saturating_add(vec_allocation_bytes(&self.scratch_group_ids)); + size = size.saturating_add(vec_allocation_bytes(&self.scratch_dense)); + size = size.saturating_add(scratch_sparse_allocation_bytes(&self.scratch_sparse)); + size = size.saturating_add(vec_allocation_bytes(&self.simple_slots)); + size = size.saturating_add(vec_allocation_bytes(&self.simple_touched_groups)); + size = size.saturating_add(vec_allocation_bytes(&self.dense_inline_marks)); + + size + } +} +fn vec_allocation_bytes(vec: &Vec) -> usize { + vec.capacity().saturating_mul(size_of::()) +} +fn scratch_sparse_allocation_bytes(map: &HashMap) -> usize { + // `HashMap` growth strategy and control byte layout are implementation + // details of hashbrown. Rather than duplicating that logic (which can + // change across compiler versions or architectures), approximate the + // allocation using only public APIs. `capacity()` returns the number of + // buckets currently reserved which bounds the total tuple storage and the + // control byte array. Each bucket stores the key/value pair plus an + // implementation defined control byte. We round that control byte up to a + // full `usize` so the estimate remains an upper bound even if hashbrown + // widens its groups. + let capacity = map.capacity(); + let tuple_bytes = + capacity.saturating_mul(size_of::() + size_of::()); + let ctrl_bytes = capacity.saturating_mul(size_of::()); + + // Use a simple capacity-based upper bound to approximate the HashMap + // allocation. The precise control-byte layout and grouping strategy are + // internal implementation details of `hashbrown` and may change across + // versions or architectures. Rounding the control area up to a full + // `usize` per bucket produces a conservative upper bound without + // depending on internal constants. + tuple_bytes.saturating_add(ctrl_bytes) +} + +struct PreparedSparseBatch { + scratch_sparse: HashMap, + scratch_group_ids: Vec, + use_dense: bool, +} + +struct SparseBatchState<'a> { + scratch_sparse: HashMap, + scratch_group_ids: Vec, + batch_inputs: Vec<&'a [u8]>, + batch_unique_groups: usize, + batch_max_group_index: Option, + dense_activated_this_batch: bool, + use_dense: bool, +} + +impl<'a> SparseBatchState<'a> { + fn new(prepared: PreparedSparseBatch, capacity: usize) -> Self { + Self { + scratch_sparse: prepared.scratch_sparse, + scratch_group_ids: prepared.scratch_group_ids, + batch_inputs: Vec::with_capacity(capacity), + batch_unique_groups: 0, + batch_max_group_index: None, + dense_activated_this_batch: false, + use_dense: prepared.use_dense, + } + } + + fn potential_first_touch_metrics( + &self, + group_index: usize, + ) -> (usize, Option) { + let potential_unique = self.batch_unique_groups + 1; + let potential_max = match self.batch_max_group_index { + Some(current_max) if current_max >= group_index => Some(current_max), + _ => Some(group_index), + }; + (potential_unique, potential_max) + } + + fn record_first_touch(&mut self, group_index: usize) { + self.batch_unique_groups += 1; + match self.batch_max_group_index { + Some(current_max) if current_max >= group_index => {} + _ => self.batch_max_group_index = Some(group_index), + } + } +} + +enum ProcessResult { + Processed, + Retry, +} + +enum DenseResult { + Handled, + Retry, + Fallback, } diff --git a/datafusion/functions-aggregate/src/min_max/min_max_struct.rs b/datafusion/functions-aggregate/src/min_max/min_max_struct.rs index 8038f2f01d90..62d884f032e4 100644 --- a/datafusion/functions-aggregate/src/min_max/min_max_struct.rs +++ b/datafusion/functions-aggregate/src/min_max/min_max_struct.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{cmp::Ordering, sync::Arc}; +use std::{cmp::Ordering, mem::size_of, sync::Arc}; use arrow::{ array::{ @@ -169,14 +169,37 @@ struct MinMaxStructState { /// The total bytes of the string data (for pre-allocating the final array, /// and tracking memory usage) total_data_bytes: usize, + /// Tracks the groups that were updated in the current batch so that we only + /// touch entries that actually changed. This avoids clearing dense scratch + /// structures across batches. + scratch_touched_groups: Vec, + /// Dense scratch entries reused across batches. Each entry stores the epoch + /// of the last batch that touched it together with the location of the + /// candidate value for the group. + scratch_entries: Vec, + /// Epoch identifying the current batch. When the epoch wraps we reset the + /// scratch entries eagerly to maintain correctness. + scratch_epoch: u64, + /// Reusable buffer storing candidate values taken from the current batch. + scratch_batch_inputs: Vec, } -#[derive(Debug, Clone)] -enum MinMaxLocation { - /// the min/max value is stored in the existing `min_max` array - ExistingMinMax, - /// the min/max value is stored in the input array at the given index - Input(StructArray), +#[derive(Debug, Default, Clone)] +struct ScratchEntry { + epoch: u64, + location: ScratchLocation, +} + +#[derive(Debug, Clone, Default)] +enum ScratchLocation { + /// No value from the current batch has been observed for the group yet. + #[default] + Untouched, + /// The group should keep the previously materialised min/max value. + Existing, + /// The min/max candidate for the group resides in the current batch at the + /// provided index within `scratch_batch_inputs`. + Batch(usize), } /// Implement the MinMaxStructState with a comparison function @@ -191,6 +214,10 @@ impl MinMaxStructState { min_max: vec![], data_type, total_data_bytes: 0, + scratch_touched_groups: vec![], + scratch_entries: vec![], + scratch_epoch: 0, + scratch_batch_inputs: vec![], } } @@ -226,45 +253,85 @@ impl MinMaxStructState { F: FnMut(&StructArray, &StructArray) -> bool + Send + Sync, { self.min_max.resize(total_num_groups, None); - // Minimize value copies by calculating the new min/maxes for each group - // in this batch (either the existing min/max or the new input value) - // and updating the owned values in `self.min_maxes` at most once - let mut locations = vec![MinMaxLocation::ExistingMinMax; total_num_groups]; - // Figure out the new min value for each group + if self.scratch_entries.len() < total_num_groups { + self.scratch_entries + .resize_with(total_num_groups, ScratchEntry::default); + } + + self.scratch_epoch = self.scratch_epoch.wrapping_add(1); + if self.scratch_epoch == 0 { + for entry in &mut self.scratch_entries { + entry.epoch = 0; + entry.location = ScratchLocation::Untouched; + } + self.scratch_epoch = 1; + } + + let mut touched_groups = std::mem::take(&mut self.scratch_touched_groups); + touched_groups.clear(); + let mut batch_inputs = std::mem::take(&mut self.scratch_batch_inputs); + batch_inputs.clear(); + for (index, group_index) in (0..array.len()).zip(group_indices.iter()) { let group_index = *group_index; if array.is_null(index) { continue; } + + if group_index >= total_num_groups { + return internal_err!( + "group index {group_index} out of bounds for {total_num_groups} groups" + ); + } + let new_val = array.slice(index, 1); - let existing_val = match &locations[group_index] { - // previous input value was the min/max, so compare it - MinMaxLocation::Input(existing_val) => existing_val, - MinMaxLocation::ExistingMinMax => { - let Some(existing_val) = self.min_max[group_index].as_ref() else { - // no existing min/max, so this is the new min/max - locations[group_index] = MinMaxLocation::Input(new_val); + let entry = &mut self.scratch_entries[group_index]; + if entry.epoch != self.scratch_epoch { + entry.epoch = self.scratch_epoch; + entry.location = ScratchLocation::Untouched; + touched_groups.push(group_index); + } + + let existing_val = match &entry.location { + ScratchLocation::Untouched => { + if let Some(existing_val) = self.min_max[group_index].as_ref() { + entry.location = ScratchLocation::Existing; + existing_val + } else { + let batch_index = batch_inputs.len(); + batch_inputs.push(new_val); + entry.location = ScratchLocation::Batch(batch_index); continue; - }; - existing_val + } } + ScratchLocation::Existing => self.min_max[group_index] + .as_ref() + .expect("existing value must be present"), + ScratchLocation::Batch(existing_index) => &batch_inputs[*existing_index], }; - // Compare the new value to the existing value, replacing if necessary if cmp(&new_val, existing_val) { - locations[group_index] = MinMaxLocation::Input(new_val); + let batch_index = batch_inputs.len(); + batch_inputs.push(new_val); + entry.location = ScratchLocation::Batch(batch_index); } } - // Update self.min_max with any new min/max values we found in the input - for (group_index, location) in locations.iter().enumerate() { - match location { - MinMaxLocation::ExistingMinMax => {} - MinMaxLocation::Input(new_val) => self.set_value(group_index, new_val), + for &group_index in &touched_groups { + if let ScratchLocation::Batch(batch_index) = + &self.scratch_entries[group_index].location + { + let value = &batch_inputs[*batch_index]; + self.set_value(group_index, value); } } + + batch_inputs.clear(); + self.scratch_batch_inputs = batch_inputs; + touched_groups.clear(); + self.scratch_touched_groups = touched_groups; Ok(()) } @@ -295,7 +362,11 @@ impl MinMaxStructState { } fn size(&self) -> usize { - self.total_data_bytes + self.min_max.len() * size_of::>() + self.total_data_bytes + + self.min_max.len() * size_of::>() + + self.scratch_entries.capacity() * size_of::() + + self.scratch_touched_groups.capacity() * size_of::() + + self.scratch_batch_inputs.capacity() * size_of::() } } @@ -541,4 +612,46 @@ mod tests { assert_eq!(int_array.value(0), 4); assert_eq!(str_array.value(0), "d"); } + + #[test] + fn test_min_max_sparse_multi_batch() { + let batch_len = 128; + let total_groups = 1024; + let group_indices: Vec = (0..batch_len).map(|i| i * 8).collect(); + + let batch_one = create_test_struct_array( + (0..batch_len).map(|i| Some(1000_i32 - i as i32)).collect(), + vec![Some("batch_one"); batch_len], + ); + + let mut accumulator = + MinMaxStructAccumulator::new_min(batch_one.data_type().clone()); + let values_one = vec![Arc::new(batch_one) as ArrayRef]; + + accumulator + .update_batch(&values_one, &group_indices, None, total_groups) + .unwrap(); + + let batch_two = create_test_struct_array( + (0..batch_len).map(|i| Some(-(i as i32))).collect(), + vec![Some("batch_two"); batch_len], + ); + let values_two = vec![Arc::new(batch_two) as ArrayRef]; + + accumulator + .update_batch(&values_two, &group_indices, None, total_groups) + .unwrap(); + + let result = accumulator.evaluate(EmitTo::All).unwrap(); + let result = result.as_struct(); + + let int_array = result.column(0).as_primitive::(); + + for (i, group_index) in group_indices.iter().copied().enumerate() { + assert!(result.is_valid(group_index)); + assert_eq!(int_array.value(group_index), -(i as i32)); + } + + assert!(result.is_null(total_groups - 1)); + } } diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max/mod.rs similarity index 97% rename from datafusion/functions-aggregate/src/min_max.rs rename to datafusion/functions-aggregate/src/min_max/mod.rs index 1a46afefffb3..0099a336f99a 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max/mod.rs @@ -21,39 +21,36 @@ mod min_max_bytes; mod min_max_struct; -use arrow::array::ArrayRef; -use arrow::datatypes::{ - DataType, Decimal128Type, Decimal256Type, Decimal32Type, Decimal64Type, - DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, - DurationSecondType, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, - Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, +#[cfg(test)] +mod tests; + +use self::{ + min_max_bytes::MinMaxBytesAccumulator, min_max_struct::MinMaxStructAccumulator, }; -use datafusion_common::stats::Precision; -use datafusion_common::{exec_err, internal_err, ColumnStatistics, Result}; -use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; -use datafusion_physical_expr::expressions; -use std::cmp::Ordering; -use std::fmt::Debug; - -use arrow::datatypes::i256; -use arrow::datatypes::{ - Date32Type, Date64Type, Time32MillisecondType, Time32SecondType, - Time64MicrosecondType, Time64NanosecondType, TimeUnit, TimestampMicrosecondType, - TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, +use arrow::{ + array::ArrayRef, + datatypes::{ + i256, DataType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, + Decimal32Type, Decimal64Type, DurationMicrosecondType, DurationMillisecondType, + DurationNanosecondType, DurationSecondType, Float16Type, Float32Type, + Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, Time32MillisecondType, + Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimeUnit, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + }, +}; +use datafusion_common::{ + exec_err, internal_err, stats::Precision, ColumnStatistics, Result, ScalarValue, }; - -use crate::min_max::min_max_bytes::MinMaxBytesAccumulator; -use crate::min_max::min_max_struct::MinMaxStructAccumulator; -use datafusion_common::ScalarValue; use datafusion_expr::{ function::AccumulatorArgs, Accumulator, AggregateUDFImpl, Documentation, - SetMonotonicity, Signature, Volatility, + GroupsAccumulator, SetMonotonicity, Signature, StatisticsArgs, Volatility, }; -use datafusion_expr::{GroupsAccumulator, StatisticsArgs}; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; use datafusion_macros::user_doc; +use datafusion_physical_expr::expressions; use half::f16; -use std::mem::size_of_val; -use std::ops::Deref; +use std::{cmp::Ordering, fmt::Debug, mem::size_of_val, ops::Deref}; fn get_min_max_result_type(input_types: &[DataType]) -> Result> { // make sure that the input types only has one element. @@ -1008,7 +1005,7 @@ pub use datafusion_functions_aggregate_common::min_max::{ }; #[cfg(test)] -mod tests { +mod inline_tests { use super::*; use arrow::{ array::{ diff --git a/datafusion/functions-aggregate/src/min_max/tests.rs b/datafusion/functions-aggregate/src/min_max/tests.rs new file mode 100644 index 000000000000..aa4c4597ed7d --- /dev/null +++ b/datafusion/functions-aggregate/src/min_max/tests.rs @@ -0,0 +1,1525 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::min_max_bytes::{ + BatchStats, MinMaxBytesState, ScratchEntry, ScratchLocation, SimpleSlot, + WorkloadMode, DENSE_INLINE_MAX_TOTAL_GROUPS, DENSE_INLINE_STABILITY_THRESHOLD, + SPARSE_SWITCH_GROUP_THRESHOLD, +}; +use arrow::datatypes::DataType; +use datafusion_expr::EmitTo; +use rand::{rngs::StdRng, Rng, SeedableRng}; + +#[allow(dead_code)] +#[derive(Debug)] +enum Operation { + Expand { + new_total: usize, + }, + Update { + total_groups: usize, + groups: Vec, + values: Vec>>, + }, + Emit { + emit_count: usize, + }, +} + +fn random_ascii_bytes(rng: &mut StdRng, len: usize) -> Vec { + (0..len) + .map(|_| { + let offset = rng.random_range(0..26_u8); + b'a' + offset + }) + .collect() +} + +fn random_binary_bytes(rng: &mut StdRng, len: usize) -> Vec { + (0..len).map(|_| rng.random_range(0..=u8::MAX)).collect() +} + +#[test] +fn min_updates_across_batches_dense_inline_variants() { + fn run_scenario(data_type: DataType) { + let mut state = MinMaxBytesState::new(data_type.clone()); + let total_groups = 4_usize; + let group_indices = [0_usize, 1, 2, 3, 0]; + let first_values = ["m0", "n1", "o2", "p3", "z9"]; + let second_values = ["a0", "n1", "o2", "p3", "z9"]; + + let first_batch: Vec> = first_values + .iter() + .map(|value| value.as_bytes().to_vec()) + .collect(); + state + .update_batch( + first_batch.iter().map(|value| Some(value.as_slice())), + &group_indices, + total_groups, + |a, b| a < b, + ) + .expect("first batch"); + + assert!( + matches!(state.workload_mode, WorkloadMode::DenseInline), + "expected DenseInline for {data_type:?}, found {:?}", + state.workload_mode + ); + assert_eq!( + state.min_max[0].as_deref(), + Some(first_values[0].as_bytes()), + "initial minimum should match first batch for {data_type:?}" + ); + + let second_batch: Vec> = second_values + .iter() + .map(|value| value.as_bytes().to_vec()) + .collect(); + state + .update_batch( + second_batch.iter().map(|value| Some(value.as_slice())), + &group_indices, + total_groups, + |a, b| a < b, + ) + .expect("second batch"); + + assert_eq!( + state.min_max[0].as_deref(), + Some(second_values[0].as_bytes()), + "second batch should lower the minimum for {data_type:?}" + ); + } + + for data_type in [DataType::Utf8, DataType::Binary, DataType::BinaryView] { + run_scenario(data_type); + } +} + +#[test] +fn randomized_min_matches_reference() { + let mut rng = StdRng::seed_from_u64(0xDAB5_C0DE); + + for data_type in [DataType::Utf8, DataType::Binary, DataType::BinaryView] { + for trial in 0..256 { + let max_total_groups = rng.random_range(1..=48_usize); + let mut current_total = rng.random_range(1..=max_total_groups); + let mut state = MinMaxBytesState::new(data_type.clone()); + let mut expected: Vec>> = vec![None; current_total]; + let batches = rng.random_range(1..=8_usize); + let mut history = Vec::new(); + + for _ in 0..batches { + if current_total == 0 { + current_total = rng.random_range(1..=max_total_groups); + expected.resize(current_total, None); + history.push(Operation::Expand { + new_total: current_total, + }); + } else if rng.random_bool(0.3) && current_total < max_total_groups { + let new_total = + rng.random_range((current_total + 1)..=max_total_groups); + expected.resize(new_total, None); + current_total = new_total; + history.push(Operation::Expand { + new_total: current_total, + }); + } + + let batch_len = rng.random_range(1..=48_usize); + let mut group_indices = Vec::with_capacity(batch_len); + let mut values: Vec>> = Vec::with_capacity(batch_len); + + for _ in 0..batch_len { + let group_index = rng.random_range(0..current_total); + group_indices.push(group_index); + + if rng.random_bool(0.1) { + values.push(None); + } else { + let len = rng.random_range(0..=12_usize); + let bytes = match data_type { + DataType::Utf8 => random_ascii_bytes(&mut rng, len), + DataType::Binary | DataType::BinaryView => { + random_binary_bytes(&mut rng, len) + } + other => unreachable!( + "randomized_min_matches_reference unexpected data type {other:?}" + ), + }; + values.push(Some(bytes)); + } + } + + let iter = values + .iter() + .map(|value| value.as_ref().map(|bytes| bytes.as_slice())); + history.push(Operation::Update { + total_groups: current_total, + groups: group_indices.clone(), + values: values.clone(), + }); + + state + .update_batch(iter, &group_indices, current_total, |a, b| a < b) + .expect("randomized batch"); + + for (group_index, value) in group_indices.into_iter().zip(values) { + if let Some(bytes) = value { + let entry = &mut expected[group_index]; + let should_replace = entry + .as_ref() + .map(|existing| bytes.as_slice() < existing.as_slice()) + .unwrap_or(true); + if should_replace { + *entry = Some(bytes); + } + } + } + + if rng.random_bool(0.2) && !state.min_max.is_empty() { + let emit_count = rng.random_range(1..=state.min_max.len()); + let _ = state.emit_to(EmitTo::First(emit_count)); + expected.drain(..emit_count); + current_total = expected.len(); + history.push(Operation::Emit { emit_count }); + } + } + + assert_eq!(state.min_max.len(), expected.len()); + + for (group_index, expected_bytes) in expected.iter().enumerate() { + let actual = state.min_max[group_index].as_deref(); + let expected = expected_bytes.as_deref(); + assert_eq!( + actual, + expected, + "randomized min mismatch for {data_type:?} in group {group_index} (trial {trial}) history: {history:?}" + ); + } + } + } +} + +#[test] +fn reproduces_randomized_failure_case() { + fn apply_update( + state: &mut MinMaxBytesState, + expected: &mut Vec>>, + total: usize, + groups: Vec, + values: Vec>>, + ) { + if expected.len() < total { + expected.resize(total, None); + } + + let iter = values + .iter() + .map(|value| value.as_ref().map(|bytes| bytes.as_slice())); + + state + .update_batch(iter, &groups, total, |a, b| a < b) + .expect("structured update"); + + for (group_index, value) in groups.into_iter().zip(values) { + if let Some(bytes) = value { + let entry = &mut expected[group_index]; + let should_replace = entry + .as_ref() + .map(|existing| bytes.as_slice() < existing.as_slice()) + .unwrap_or(true); + if should_replace { + *entry = Some(bytes); + } + } + } + } + + let mut state = MinMaxBytesState::new(DataType::Utf8); + let mut expected: Vec>> = Vec::new(); + + { + let groups = vec![23, 28]; + let values = vec![ + Some(vec![121, 103, 113, 122, 115, 111, 104, 101, 100]), + Some(vec![121, 112, 107, 97]), + ]; + apply_update(&mut state, &mut expected, 45, groups, values); + } + assert_eq!(state.emit_to(EmitTo::First(11)).1.len(), 11); + expected.drain(..11); + + { + let groups = vec![ + 33, 17, 31, 0, 27, 3, 12, 6, 3, 27, 20, 28, 2, 9, 0, 1, 17, 33, 25, 28, 20, + 2, 29, 10, 32, 28, 32, 26, 2, 27, 22, 27, 14, 32, 30, 23, 13, 19, 26, 14, 26, + 32, 4, 32, 14, 21, + ]; + let values = vec![ + Some(vec![118, 114, 97, 97]), + Some(vec![108]), + Some(vec![114, 118, 106, 99, 122, 103, 122]), + Some(vec![ + 98, 112, 103, 114, 99, 100, 111, 113, 114, 100, 121, 115, + ]), + Some(vec![114, 105, 114, 113, 110, 122]), + Some(vec![105, 117]), + Some(vec![111, 119, 106, 99, 98, 100, 102, 100, 99, 102]), + Some(vec![116, 118, 98, 121]), + Some(vec![114, 119, 117, 107, 118, 115]), + Some(vec![110, 113, 103, 114, 120, 109, 108, 117]), + Some(vec![105, 121, 97, 111, 99, 101, 118, 122, 121]), + Some(vec![115, 121, 111, 121, 120, 97, 109, 109, 104, 105, 108]), + Some(vec![117, 101]), + Some(vec![112, 107, 113, 105]), + None, + Some(vec![99, 117, 114, 103, 118, 107, 107]), + Some(vec![]), + Some(vec![]), + Some(vec![113, 98, 104, 119, 101]), + Some(vec![122, 114]), + Some(vec![119, 98]), + Some(vec![101, 99, 111, 116, 112, 116, 113, 101, 113]), + Some(vec![114, 109, 101, 107, 117, 111, 106]), + None, + Some(vec![121, 111, 118, 106, 116, 120, 108, 119, 118]), + Some(vec![]), + None, + Some(vec![108]), + Some(vec![ + 121, 102, 105, 97, 118, 117, 120, 97, 109, 118, 97, 122, + ]), + Some(vec![98, 102, 118, 108]), + Some(vec![117, 106, 116, 103, 122]), + Some(vec![104, 103, 117, 107, 118]), + Some(vec![109, 99, 112, 112, 106, 109]), + Some(vec![117, 100, 116, 117, 120, 116, 100, 111, 119, 120]), + Some(vec![109, 104, 99, 98]), + Some(vec![107]), + Some(vec![114, 107, 110, 112, 100, 98]), + Some(vec![122, 110, 103, 104]), + Some(vec![103, 113, 122, 104, 107, 117, 113, 122, 106]), + Some(vec![ + 122, 114, 116, 101, 106, 102, 118, 106, 114, 104, 122, 105, + ]), + Some(vec![98, 106, 107, 115, 115, 118, 122]), + Some(vec![ + 114, 122, 107, 115, 108, 105, 99, 122, 106, 110, 122, 103, + ]), + Some(vec![119, 106, 120, 104, 115, 118, 108, 113, 120, 122, 121]), + Some(vec![113, 104, 113, 101, 98, 122, 97, 100, 106]), + Some(vec![105]), + Some(vec![]), + ]; + apply_update(&mut state, &mut expected, 34, groups, values); + } + + { + let groups = vec![ + 38, 22, 20, 37, 0, 33, 9, 9, 8, 21, 34, 32, 8, 20, 8, 1, 25, 27, 17, 3, 20, + 32, 34, 36, 8, 29, 2, 39, 38, 20, 38, 16, 11, 13, 15, 22, 30, 15, 13, + ]; + let values = vec![ + Some(vec![104, 107, 105, 101, 99, 118]), + Some(vec![100, 110, 114]), + Some(vec![120, 107, 119, 111, 118]), + Some(vec![121, 120, 109, 109, 118, 97, 119, 122, 110, 115]), + Some(vec![111, 106]), + Some(vec![98, 113, 114, 116]), + Some(vec![114, 113, 105, 113, 122, 110, 105, 97, 100]), + Some(vec![97, 116, 107, 102, 97, 107]), + Some(vec![ + 102, 103, 105, 115, 121, 119, 103, 107, 118, 100, 101, 99, + ]), + Some(vec![]), + Some(vec![99, 102, 110, 109, 103, 109, 120]), + Some(vec![104]), + Some(vec![ + 107, 101, 101, 115, 115, 97, 115, 114, 101, 113, 121, 97, + ]), + Some(vec![114]), + Some(vec![116, 118, 113, 106, 109, 120, 100, 121, 99]), + Some(vec![114, 100, 110, 111, 100, 110, 98]), + Some(vec![114, 105, 111, 104, 111, 100, 98, 114, 99, 113]), + Some(vec![122, 100, 97, 119, 121, 101, 117, 104, 110, 113]), + Some(vec![116, 109, 114, 110, 103, 121, 108, 114]), + Some(vec![ + 106, 122, 102, 120, 105, 103, 122, 109, 118, 113, 100, 118, + ]), + None, + Some(vec![114, 112, 97, 102, 113, 114, 107, 104]), + None, + Some(vec![116, 102]), + Some(vec![100, 116, 103, 104, 97, 114, 117]), + Some(vec![117, 119, 107, 104, 106, 99, 120, 103]), + Some(vec![104]), + Some(vec![]), + Some(vec![120, 115, 122, 119, 97, 102, 110, 100, 118, 117, 97]), + Some(vec![ + 98, 112, 121, 102, 118, 101, 100, 110, 108, 118, 108, 100, + ]), + Some(vec![117, 114, 115, 111, 122, 98, 98, 115, 112, 100]), + Some(vec![106, 99, 113, 116, 103, 100, 110, 117, 102, 122, 104]), + Some(vec![ + 102, 101, 121, 97, 121, 99, 98, 104, 103, 100, 112, 113, + ]), + Some(vec![114, 107, 100, 101]), + Some(vec![98, 115, 112, 100, 106, 119, 103, 104, 111]), + Some(vec![]), + Some(vec![121, 116, 112, 121, 114, 110, 104, 119]), + Some(vec![99, 104, 101, 109, 115, 101, 105]), + Some(vec![97, 104]), + ]; + apply_update(&mut state, &mut expected, 40, groups, values); + } + + assert_eq!( + state.min_max[38].as_deref(), + expected[38].as_deref(), + "state should hold expected minimum before re-expansion" + ); + + { + let groups = vec![ + 33, 24, 30, 5, 24, 13, 0, 8, 24, 40, 27, 25, 14, 8, 36, 23, 28, 22, 14, 20, + 23, 10, 28, 22, 31, 35, 13, 11, 10, 36, 39, 4, 40, 5, 13, 1, 20, 17, 0, 5, 3, + 24, 19, 38, + ]; + let values = vec![ + Some(vec![106, 98, 105, 119, 115, 110, 116, 119, 111, 104, 118]), + Some(vec![]), + Some(vec![ + 108, 115, 97, 110, 112, 105, 102, 100, 117, 114, 110, 116, + ]), + None, + Some(vec![111, 114, 110]), + Some(vec![107]), + Some(vec![111, 106, 121, 114, 113, 105]), + Some(vec![100, 109, 119, 122, 111, 105, 116, 104]), + Some(vec![98, 103]), + Some(vec![118, 99, 118, 118, 115, 116, 104, 110, 114, 115, 115]), + Some(vec![102, 107]), + Some(vec![105, 107, 119, 115, 98, 110, 110]), + Some(vec![120, 121, 114, 121, 102, 120, 117, 109, 122]), + Some(vec![104, 101, 115, 104, 103, 106]), + Some(vec![108, 97, 99, 111]), + Some(vec![98, 115, 102, 98, 101, 109, 120, 118, 112, 104, 102]), + Some(vec![]), + Some(vec![122, 116, 111, 107, 107]), + Some(vec![97, 118, 104, 111, 122, 100, 99, 106, 101, 107, 104]), + Some(vec![105, 119, 114, 99, 122]), + Some(vec![106, 122, 117, 116, 111, 104, 109, 105, 111, 121, 122]), + Some(vec![ + 107, 106, 111, 109, 107, 97, 105, 104, 117, 98, 105, 114, + ]), + Some(vec![115, 116, 120, 102, 109, 112, 122, 102, 102, 120, 110]), + Some(vec![114, 105, 109]), + Some(vec![117, 97, 121, 109, 120, 109, 122, 101, 112, 104]), + Some(vec![103, 111, 99]), + Some(vec![120, 120, 115, 101, 101, 109, 100, 122]), + Some(vec![115, 107, 121, 122, 121, 108, 118]), + Some(vec![107, 109, 120, 102, 121, 109, 118]), + Some(vec![98, 104, 122, 100, 97, 111, 116]), + Some(vec![121, 120]), + Some(vec![118, 110, 99, 109, 122, 103, 98, 100, 111]), + Some(vec![107, 113, 108, 97, 110, 114, 105, 122, 112, 99]), + Some(vec![105, 104, 99, 117, 108, 107, 115, 97]), + Some(vec![108, 114, 109, 106, 103, 99, 100, 99]), + Some(vec![ + 106, 112, 114, 112, 101, 117, 108, 106, 112, 116, 107, 109, + ]), + Some(vec![]), + Some(vec![102, 109, 102]), + Some(vec![111, 122, 115, 102, 98, 101, 105, 105, 109]), + Some(vec![105, 104, 101, 117, 100, 110, 103, 99, 113]), + Some(vec![111, 100, 103]), + Some(vec![113, 112, 111, 111, 107, 111, 103]), + Some(vec![111]), + Some(vec![ + 108, 122, 116, 107, 108, 112, 108, 110, 114, 116, 120, 98, + ]), + ]; + apply_update(&mut state, &mut expected, 41, groups, values); + } + + { + let groups = vec![7, 35, 27, 39, 2, 16, 19, 40, 24, 10, 32, 27]; + let values = vec![ + Some(vec![111, 98, 115, 115, 107, 121, 101, 119]), + Some(vec![]), + None, + Some(vec![98]), + Some(vec![110, 112, 103, 98, 118, 104, 103, 119, 120]), + Some(vec![104, 101, 115, 100, 102, 102, 113, 111]), + Some(vec![97]), + Some(vec![111, 116, 106, 110, 117, 121, 122, 104, 113, 110]), + Some(vec![122, 103, 111, 99, 103, 112, 108, 100, 117, 105, 100]), + Some(vec![108]), + Some(vec![100, 111, 114, 98, 98, 112, 99, 115, 120, 120]), + Some(vec![104]), + ]; + apply_update(&mut state, &mut expected, 41, groups, values); + } + + { + let groups = vec![4, 10, 30, 6, 5, 14, 31, 20, 2, 31, 35]; + let values = vec![ + None, + Some(vec![115, 109, 111, 112]), + Some(vec![112, 113, 108]), + Some(vec![113, 116]), + Some(vec![112, 106]), + Some(vec![104]), + Some(vec![106, 115, 122, 113, 107, 111, 101, 112, 108, 122]), + Some(vec![114, 116, 107, 106, 102, 118, 97, 114, 119, 116]), + Some(vec![99, 106]), + Some(vec![107, 98, 100, 109, 115, 114, 114, 104, 103]), + Some(vec![98, 111, 122, 110, 117, 103, 102, 110, 115, 114, 105]), + ]; + apply_update(&mut state, &mut expected, 41, groups, values); + } + + let actual = state.min_max[38].clone(); + let expected_bytes = expected[38].clone(); + assert_eq!(actual, expected_bytes); +} + +#[test] +fn min_updates_across_batches_simple_variants() { + fn run_scenario(data_type: DataType) { + let mut state = MinMaxBytesState::new(data_type.clone()); + let total_groups = 10_usize; + let first_groups = [0_usize, 9, 0, 9]; + let second_groups = first_groups; + let first_values = ["m0", "t9", "n0", "u9"]; + let second_values = ["a0", "t9", "n0", "u9"]; + + let first_batch: Vec> = first_values + .iter() + .map(|value| value.as_bytes().to_vec()) + .collect(); + state + .update_batch( + first_batch.iter().map(|value| Some(value.as_slice())), + &first_groups, + total_groups, + |a, b| a < b, + ) + .expect("first batch"); + + assert!( + matches!(state.workload_mode, WorkloadMode::Simple), + "expected Simple for {data_type:?}, found {:?}", + state.workload_mode + ); + assert_eq!( + state.min_max[0].as_deref(), + Some(first_values[0].as_bytes()), + "initial minimum should match first batch for {data_type:?}" + ); + + let second_batch: Vec> = second_values + .iter() + .map(|value| value.as_bytes().to_vec()) + .collect(); + state + .update_batch( + second_batch.iter().map(|value| Some(value.as_slice())), + &second_groups, + total_groups, + |a, b| a < b, + ) + .expect("second batch"); + + assert_eq!( + state.min_max[0].as_deref(), + Some(second_values[0].as_bytes()), + "second batch should lower the minimum for {data_type:?}" + ); + } + + for data_type in [DataType::Utf8, DataType::Binary, DataType::BinaryView] { + run_scenario(data_type); + } +} + +#[test] +fn min_updates_across_batches_sparse_variants() { + fn run_scenario(data_type: DataType) { + let mut state = MinMaxBytesState::new(data_type.clone()); + let total_groups = 1_024_usize; + let group_indices = [0_usize, 512, 0, 512]; + let first_values = ["m0", "t9", "n0", "u9"]; + let second_values = ["a0", "t9", "n0", "u9"]; + + let first_batch: Vec> = first_values + .iter() + .map(|value| value.as_bytes().to_vec()) + .collect(); + state + .update_batch( + first_batch.iter().map(|value| Some(value.as_slice())), + &group_indices, + total_groups, + |a, b| a < b, + ) + .expect("first batch"); + + assert!( + matches!(state.workload_mode, WorkloadMode::SparseOptimized), + "expected SparseOptimized for {data_type:?}, found {:?}", + state.workload_mode + ); + assert_eq!( + state.min_max[0].as_deref(), + Some(first_values[0].as_bytes()), + "initial minimum should match first batch for {data_type:?}" + ); + + let second_batch: Vec> = second_values + .iter() + .map(|value| value.as_bytes().to_vec()) + .collect(); + state + .update_batch( + second_batch.iter().map(|value| Some(value.as_slice())), + &group_indices, + total_groups, + |a, b| a < b, + ) + .expect("second batch"); + + assert_eq!( + state.min_max[0].as_deref(), + Some(second_values[0].as_bytes()), + "second batch should lower the minimum for {data_type:?}" + ); + } + + for data_type in [DataType::Utf8, DataType::Binary, DataType::BinaryView] { + run_scenario(data_type); + } +} + +#[test] +fn min_updates_after_dense_inline_commit() { + fn run_scenario(data_type: DataType) { + let mut state = MinMaxBytesState::new(data_type.clone()); + let total_groups = 8_usize; + let group_indices = [0_usize, 1, 2, 3, 4, 5, 6, 7]; + let initial_values = ["m0", "n1", "o2", "p3", "q4", "r5", "s6", "t7"]; + let initial_batch: Vec> = initial_values + .iter() + .map(|value| value.as_bytes().to_vec()) + .collect(); + + // Drive the accumulator into DenseInline mode and allow it to commit. + for _ in 0..=DENSE_INLINE_STABILITY_THRESHOLD { + state + .update_batch( + initial_batch.iter().map(|value| Some(value.as_slice())), + &group_indices, + total_groups, + |a, b| a < b, + ) + .expect("stable dense batch"); + } + + assert!( + matches!(state.workload_mode, WorkloadMode::DenseInline), + "expected DenseInline for {data_type:?}, found {:?}", + state.workload_mode + ); + assert!(state.dense_inline_committed); + assert_eq!( + state.min_max[0].as_deref(), + Some(initial_values[0].as_bytes()), + "initial committed minimum should match the seeded batch for {data_type:?}" + ); + + let updated_values = ["a0", "n1", "o2", "p3", "q4", "r5", "s6", "t7"]; + let updated_batch: Vec> = updated_values + .iter() + .map(|value| value.as_bytes().to_vec()) + .collect(); + + state + .update_batch( + updated_batch.iter().map(|value| Some(value.as_slice())), + &group_indices, + total_groups, + |a, b| a < b, + ) + .expect("dense inline committed batch"); + + assert!(state.dense_inline_committed); + assert_eq!( + state.min_max[0].as_deref(), + Some(updated_values[0].as_bytes()), + "committed dense inline path should accept the new minimum for {data_type:?}" + ); + } + + for data_type in [DataType::Utf8, DataType::Binary, DataType::BinaryView] { + run_scenario(data_type); + } +} + +#[test] +fn min_updates_after_dense_inline_reconsideration() { + fn run_scenario(data_type: DataType) { + let mut state = MinMaxBytesState::new(data_type.clone()); + let seed_groups: Vec = (0..8).collect(); + let seed_values: Vec> = seed_groups + .iter() + .map(|group| format!("seed_{group}").into_bytes()) + .collect(); + + // Establish DenseInline mode with a committed state. + for _ in 0..=DENSE_INLINE_STABILITY_THRESHOLD { + state + .update_batch( + seed_values.iter().map(|value| Some(value.as_slice())), + &seed_groups, + seed_groups.len(), + |a, b| a < b, + ) + .expect("seed dense batch"); + } + + assert!(state.dense_inline_committed); + + // Expand the domain substantially and provide a new minimum for group 0. + let expanded_total = 32_usize; + let expanded_groups: Vec = (0..expanded_total).collect(); + let mut expanded_values: Vec> = expanded_groups + .iter() + .map(|group| format!("expanded_{group}").into_bytes()) + .collect(); + expanded_values[0] = b"a0".to_vec(); + + state + .update_batch( + expanded_values.iter().map(|value| Some(value.as_slice())), + &expanded_groups, + expanded_total, + |a, b| a < b, + ) + .expect("expanded dense batch"); + + assert!(matches!(state.workload_mode, WorkloadMode::DenseInline)); + assert_eq!( + state.min_max[0].as_deref(), + Some(b"a0".as_slice()), + "reconsidered dense inline path should adopt the new minimum for {data_type:?}" + ); + } + + for data_type in [DataType::Utf8, DataType::Binary, DataType::BinaryView] { + run_scenario(data_type); + } +} + +#[test] +fn randomized_minimum_matches_baseline_for_byte_types() { + struct Lcg(u64); + + impl Lcg { + fn new(seed: u64) -> Self { + Self(seed) + } + + fn next(&mut self) -> u64 { + self.0 = self.0.wrapping_mul(6364136223846793005).wrapping_add(1); + self.0 + } + } + + type Batch = (Vec, Vec>>); + + fn generate_batches( + rng: &mut Lcg, + total_groups: usize, + batches: usize, + ) -> Vec { + (0..batches) + .map(|_| { + let rows = (rng.next() % 16 + 1) as usize; + let mut groups = Vec::with_capacity(rows); + let mut values = Vec::with_capacity(rows); + + for _ in 0..rows { + let group = (rng.next() as usize) % total_groups; + groups.push(group); + + let is_null = rng.next().is_multiple_of(5); + if is_null { + values.push(None); + continue; + } + + let len = (rng.next() % 5) as usize; + let mut value = Vec::with_capacity(len); + for _ in 0..len { + value.push((rng.next() & 0xFF) as u8); + } + values.push(Some(value)); + } + + (groups, values) + }) + .collect() + } + + fn run_scenario(data_type: DataType) { + let mut rng = Lcg::new(0x5EED5EED); + let total_groups = 128_usize; + + for case in 0..512 { + let mut state = MinMaxBytesState::new(data_type.clone()); + let mut baseline: Vec>> = vec![None; total_groups]; + let batches = (rng.next() % 6 + 1) as usize; + let payloads = generate_batches(&mut rng, total_groups, batches); + + for (batch_index, (groups, values)) in payloads.into_iter().enumerate() { + let iter = values.iter().map(|value| value.as_deref()); + state + .update_batch(iter, &groups, total_groups, |a, b| a < b) + .expect("update batch"); + + for (group, value) in groups.iter().zip(values.iter()) { + if let Some(candidate) = value { + match &mut baseline[*group] { + Some(existing) => { + if candidate < existing { + *existing = candidate.clone(); + } + } + slot @ None => { + *slot = Some(candidate.clone()); + } + } + } + } + + for (group_index, expected) in baseline.iter().enumerate() { + assert_eq!(state.min_max[group_index].as_deref(), expected.as_deref(), + "case {case}, batch {batch_index}, group {group_index}, type {data_type:?}"); + } + } + } + } + + for data_type in [DataType::Utf8, DataType::Binary, DataType::BinaryView] { + run_scenario(data_type); + } +} + +#[test] +fn dense_batches_use_dense_inline_mode() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + let total_groups = 32_usize; + // Use sequential + extra pattern to avoid our fast path detection + // but still exercise DenseInline mode's internal logic + // Pattern: [0, 1, 2, ..., 30, 31, 0] - sequential plus one duplicate + let mut groups: Vec = (0..total_groups).collect(); + groups.push(0); // Add one duplicate to break our fast path check + let mut raw_values: Vec> = (0..total_groups) + .map(|idx| format!("value_{idx:02}").into_bytes()) + .collect(); + raw_values.push(b"value_00".to_vec()); // Corresponding value for duplicate + + state + .update_batch( + raw_values.iter().map(|value| Some(value.as_slice())), + &groups, + total_groups, + |a, b| a < b, + ) + .expect("update batch"); + + assert!(matches!(state.workload_mode, WorkloadMode::DenseInline)); + assert!(!state.scratch_dense_enabled); + assert_eq!(state.scratch_dense_limit, 0); + assert!(state.scratch_sparse.is_empty()); + // Marks may be allocated or not depending on when fast path breaks + assert!(state.dense_inline_marks_ready); + assert_eq!(state.populated_groups, total_groups); + + // Verify values are correct + for i in 0..total_groups { + let expected = format!("value_{i:02}"); + assert_eq!(state.min_max[i].as_deref(), Some(expected.as_bytes())); + } +} + +#[test] +fn dense_inline_commits_after_stable_batches() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + // Use non-sequential indices to avoid fast path + let group_indices = vec![0_usize, 2, 1]; + let values = ["a", "b", "c"]; + + for batch in 0..5 { + let iter = values.iter().map(|value| Some(value.as_bytes())); + state + .update_batch(iter, &group_indices, 3, |a, b| a < b) + .expect("update batch"); + + if batch < DENSE_INLINE_STABILITY_THRESHOLD { + assert!(!state.dense_inline_committed); + } else { + assert!(state.dense_inline_committed); + assert!(state.dense_inline_marks.is_empty()); + } + } +} + +#[test] +fn dense_inline_reconsiders_after_commit_when_domain_grows() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + // Use a pattern with one extra element to avoid the sequential fast path + let group_indices = vec![0_usize, 1, 2, 0]; + let values: Vec<&[u8]> = + vec![b"a".as_ref(), b"b".as_ref(), b"c".as_ref(), b"z".as_ref()]; + + for _ in 0..=DENSE_INLINE_STABILITY_THRESHOLD { + let iter = values.iter().copied().map(Some); + state + .update_batch(iter, &group_indices, 3, |a, b| a < b) + .expect("stable dense batch"); + } + + assert!(state.dense_inline_committed); + assert_eq!(state.dense_inline_committed_groups, 3); + + // Expand with one more group (breaking sequential pattern) + let expanded_groups = vec![0_usize, 1, 2, 3, 0]; + let expanded_values = vec![ + Some(b"a".as_ref()), + Some(b"b".as_ref()), + Some(b"c".as_ref()), + Some(b"z".as_ref()), + Some(b"zz".as_ref()), + ]; + + state + .update_batch(expanded_values, &expanded_groups, 4, |a, b| a < b) + .expect("dense batch with new group"); + + assert!(matches!(state.workload_mode, WorkloadMode::DenseInline)); + assert!(!state.dense_inline_committed); + assert_eq!(state.dense_inline_committed_groups, 0); + assert_eq!(state.lifetime_max_group_index, Some(3)); +} + +#[test] +fn dense_inline_defers_marks_first_batch() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + // Use a pattern with one extra element to avoid the sequential fast path + // but maintain sequential core to avoid breaking DenseInline's internal fast path + let groups = vec![0_usize, 1, 2, 0]; // Sequential + one duplicate + let values = ["a", "b", "c", "z"]; // Last value won't replace first + + state + .update_batch( + values.iter().map(|value| Some(value.as_bytes())), + &groups, + 3, // total_num_groups=3, not 4 + |a, b| a < b, + ) + .expect("first batch"); + + // After first batch, marks_ready is set but marks may or may not be allocated + // depending on when the fast path broke + assert!(state.dense_inline_marks_ready); + + state + .update_batch( + values.iter().map(|value| Some(value.as_bytes())), + &groups, + 3, + |a, b| a < b, + ) + .expect("second batch"); + + assert!(state.dense_inline_marks_ready); + // Marks should be sized to total_num_groups, not the input array length + assert!(state.dense_inline_marks.len() >= 3); +} + +#[test] +fn sparse_batch_switches_mode_after_first_update() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + let groups = vec![10_usize, 20_usize]; + let values = [Some("b".as_bytes()), Some("a".as_bytes())]; + + state + .update_batch(values.iter().copied(), &groups, 1_000_000, |a, b| a < b) + .expect("first batch"); + + assert!(matches!(state.workload_mode, WorkloadMode::SparseOptimized)); + assert_eq!(state.min_max[10].as_deref(), Some("b".as_bytes())); + assert_eq!(state.min_max[20].as_deref(), Some("a".as_bytes())); + + let groups_second = vec![20_usize]; + let values_second = [Some("c".as_bytes())]; + + state + .update_batch( + values_second.iter().copied(), + &groups_second, + 1_000_000, + |a, b| a > b, + ) + .expect("second batch"); + + assert!(matches!(state.workload_mode, WorkloadMode::SparseOptimized)); + assert!(state.scratch_sparse.capacity() >= groups_second.len()); + assert_eq!(state.scratch_dense_limit, 0); + assert_eq!(state.min_max[20].as_deref(), Some("c".as_bytes())); +} + +#[test] +fn sparse_mode_updates_values_from_start() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + state.workload_mode = WorkloadMode::SparseOptimized; + + let groups = vec![1_000_000_usize, 2_000_000_usize]; + let values = [Some("left".as_bytes()), Some("right".as_bytes())]; + + state + .update_batch(values.iter().copied(), &groups, 2_000_001, |a, b| a < b) + .expect("sparse update"); + + assert!(matches!(state.workload_mode, WorkloadMode::SparseOptimized)); + assert_eq!(state.scratch_dense.len(), 0); + assert_eq!(state.scratch_dense_limit, 0); + assert!(state.scratch_sparse.capacity() >= groups.len()); + assert_eq!(state.min_max[1_000_000].as_deref(), Some("left".as_bytes())); + assert_eq!( + state.min_max[2_000_000].as_deref(), + Some("right".as_bytes()) + ); +} + +#[test] +fn sparse_mode_reenables_dense_before_use() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + state.workload_mode = WorkloadMode::SparseOptimized; + + let total_groups = 64_usize; + state.resize_min_max(total_groups); + state.set_value(0, b"mango"); + state.set_value(5, b"zebra"); + + state.scratch_dense_limit = 6; + state.scratch_dense_enabled = false; + state.scratch_dense.clear(); + + assert!(state.total_data_bytes > 0); + assert_eq!(state.scratch_dense.len(), 0); + + let groups = vec![0_usize, 5_usize]; + let values = [b"apple".as_slice(), b"aardvark".as_slice()]; + + state + .update_batch( + values.iter().copied().map(Some), + &groups, + total_groups, + |a, b| a < b, + ) + .expect("sparse update without dense scratch"); + + assert!(state.scratch_dense_enabled); + assert!(state.scratch_dense.len() >= state.scratch_dense_limit); + assert_eq!(state.scratch_dense_limit, 6); + assert_eq!(state.min_max[0].as_deref(), Some(b"apple".as_slice())); + assert_eq!(state.min_max[5].as_deref(), Some(b"aardvark".as_slice())); +} + +#[test] +fn simple_mode_switches_to_sparse_on_low_density() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + + state.record_batch_stats( + BatchStats { + unique_groups: 32, + max_group_index: Some(31), + }, + DENSE_INLINE_MAX_TOTAL_GROUPS, + ); + assert!(matches!(state.workload_mode, WorkloadMode::Simple)); + + state.populated_groups = SPARSE_SWITCH_GROUP_THRESHOLD + 1; + state.lifetime_max_group_index = Some(SPARSE_SWITCH_GROUP_THRESHOLD * 200); + + state.record_batch_stats( + BatchStats { + unique_groups: 1, + max_group_index: Some(SPARSE_SWITCH_GROUP_THRESHOLD * 200), + }, + SPARSE_SWITCH_GROUP_THRESHOLD * 200 + 1, + ); + + assert!(matches!(state.workload_mode, WorkloadMode::SparseOptimized)); +} + +#[test] +fn emit_to_all_resets_populated_groups() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + state.resize_min_max(3); + + state.set_value(0, b"alpha"); + state.set_value(1, b"beta"); + + state.workload_mode = WorkloadMode::SparseOptimized; + state.processed_batches = 3; + state.total_groups_seen = 5; + state.lifetime_max_group_index = Some(7); + state.scratch_dense_enabled = true; + state.scratch_dense_limit = 128; + state.scratch_epoch = 42; + state.scratch_group_ids.push(1); + state.scratch_dense.push(ScratchEntry { + epoch: 1, + location: ScratchLocation::Existing, + }); + state.scratch_sparse.insert(0, ScratchLocation::Existing); + state.simple_epoch = 9; + state.simple_slots.resize_with(3, SimpleSlot::new); + state.simple_touched_groups.push(2); + state.dense_inline_marks_ready = true; + state.dense_inline_marks.push(99); + state.dense_inline_epoch = 17; + state.dense_inline_stable_batches = 11; + state.dense_inline_committed = true; + state.dense_inline_committed_groups = 3; + state.dense_enable_invocations = 13; + state.dense_sparse_detours = 3; + + assert_eq!(state.populated_groups, 2); + + let (_capacity, values) = state.emit_to(EmitTo::All); + assert_eq!(values.len(), 3); + assert_eq!(values.iter().filter(|value| value.is_some()).count(), 2); + assert_eq!(state.populated_groups, 0); + assert!(state.min_max.is_empty()); + assert_eq!(state.total_data_bytes, 0); + assert!(matches!(state.workload_mode, WorkloadMode::Undecided)); + assert_eq!(state.processed_batches, 0); + assert_eq!(state.total_groups_seen, 0); + assert_eq!(state.lifetime_max_group_index, None); + assert!(!state.scratch_dense_enabled); + assert_eq!(state.scratch_dense_limit, 0); + assert_eq!(state.scratch_epoch, 0); + assert!(state.scratch_group_ids.is_empty()); + assert!(state.scratch_dense.is_empty()); + assert!(state.scratch_sparse.is_empty()); + assert_eq!(state.simple_epoch, 0); + assert!(state.simple_slots.is_empty()); + assert!(state.simple_touched_groups.is_empty()); + assert!(!state.dense_inline_marks_ready); + assert!(state.dense_inline_marks.is_empty()); + assert_eq!(state.dense_inline_epoch, 0); + assert_eq!(state.dense_inline_stable_batches, 0); + assert!(!state.dense_inline_committed); + assert_eq!(state.dense_inline_committed_groups, 0); + assert_eq!(state.dense_enable_invocations, 0); + assert_eq!(state.dense_sparse_detours, 0); +} + +#[test] +fn emit_to_first_updates_populated_groups() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + state.resize_min_max(4); + + state.set_value(0, b"left"); + state.set_value(1, b"middle"); + state.set_value(3, b"right"); + + assert_eq!(state.populated_groups, 3); + + let (_capacity, values) = state.emit_to(EmitTo::First(2)); + assert_eq!(values.len(), 2); + assert_eq!(state.populated_groups, 1); + assert_eq!(state.min_max.len(), 2); + + // Remaining groups should retain their data (original index 3) + assert_eq!(state.min_max[1].as_deref(), Some(b"right".as_slice())); +} + +#[test] +fn min_updates_after_emit_first_realigns_indices() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + let initial_groups: Vec = (0..4).collect(); + let initial_values = ["m0", "n1", "o2", "p3"]; + let initial_batch: Vec> = initial_values + .iter() + .map(|value| value.as_bytes().to_vec()) + .collect(); + + state + .update_batch( + initial_batch.iter().map(|value| Some(value.as_slice())), + &initial_groups, + initial_groups.len(), + |a, b| a < b, + ) + .expect("seed batch"); + + state.workload_mode = WorkloadMode::SparseOptimized; + state.scratch_dense_enabled = true; + state.scratch_dense_limit = initial_groups.len(); + state.scratch_dense = vec![ScratchEntry::new(); initial_groups.len()]; + state.scratch_group_ids = initial_groups.clone(); + state.scratch_epoch = 42; + state + .simple_slots + .resize_with(initial_groups.len(), SimpleSlot::new); + state.simple_epoch = 7; + state.simple_touched_groups = initial_groups.clone(); + state.dense_inline_marks = vec![99; initial_groups.len()]; + state.dense_inline_marks_ready = true; + state.dense_inline_epoch = 9; + state.dense_inline_stable_batches = 5; + state.dense_inline_committed = true; + state.dense_inline_committed_groups = initial_groups.len(); + state.total_groups_seen = 16; + state.lifetime_max_group_index = Some(initial_groups.len() - 1); + + let (_capacity, emitted) = state.emit_to(EmitTo::First(2)); + assert_eq!(emitted.len(), 2); + assert_eq!(state.min_max.len(), 2); + assert_eq!( + state.min_max[0].as_deref(), + Some(initial_values[2].as_bytes()) + ); + assert_eq!(state.populated_groups, 2); + assert_eq!(state.total_groups_seen, state.populated_groups); + assert_eq!(state.lifetime_max_group_index, Some(1)); + assert!(!state.scratch_dense_enabled); + assert_eq!(state.scratch_dense_limit, 0); + assert!(state.scratch_dense.is_empty()); + assert!(state.scratch_group_ids.is_empty()); + assert!(state.scratch_sparse.is_empty()); + assert_eq!(state.scratch_epoch, 0); + assert_eq!(state.simple_slots.len(), state.min_max.len()); + assert_eq!(state.simple_epoch, 0); + assert!(state.simple_touched_groups.is_empty()); + assert_eq!(state.dense_inline_marks.len(), state.min_max.len()); + assert!(!state.dense_inline_marks_ready); + assert_eq!(state.dense_inline_epoch, 0); + assert_eq!(state.dense_inline_stable_batches, 0); + assert!(!state.dense_inline_committed); + assert_eq!(state.dense_inline_committed_groups, 0); + assert_eq!(state.processed_batches, 0); + + let update_groups = [0_usize]; + let updated_value = b"a0".to_vec(); + state + .update_batch( + std::iter::once(Some(updated_value.as_slice())), + &update_groups, + state.min_max.len(), + |a, b| a < b, + ) + .expect("update after emit"); + + assert_eq!(state.min_max[0].as_deref(), Some(updated_value.as_slice())); +} + +#[test] +fn emit_to_first_resets_state_when_everything_is_drained() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + state.resize_min_max(2); + state.set_value(0, b"left"); + state.set_value(1, b"right"); + + state.workload_mode = WorkloadMode::DenseInline; + state.processed_batches = 10; + state.total_groups_seen = 12; + state.scratch_dense_enabled = true; + state.dense_inline_committed = true; + state.dense_inline_committed_groups = 2; + state.simple_epoch = 5; + state.simple_slots.resize_with(2, SimpleSlot::new); + + let (_capacity, values) = state.emit_to(EmitTo::First(2)); + assert_eq!(values.len(), 2); + assert!(values.iter().all(|value| value.is_some())); + assert!(state.min_max.is_empty()); + assert_eq!(state.total_data_bytes, 0); + assert!(matches!(state.workload_mode, WorkloadMode::Undecided)); + assert_eq!(state.processed_batches, 0); + assert_eq!(state.total_groups_seen, 0); + assert!(!state.scratch_dense_enabled); + assert!(!state.dense_inline_committed); + assert_eq!(state.dense_inline_committed_groups, 0); + assert_eq!(state.simple_epoch, 0); + assert!(state.simple_slots.is_empty()); +} + +#[test] +fn resize_min_max_reclaims_truncated_entries() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + state.resize_min_max(4); + state.set_value(0, b"a"); + state.set_value(1, b"bc"); + state.set_value(2, b"def"); + state.set_value(3, b"ghij"); + + assert_eq!(state.populated_groups, 4); + assert_eq!(state.total_data_bytes, 10); + + state.resize_min_max(2); + assert_eq!(state.min_max.len(), 2); + assert_eq!(state.total_data_bytes, 3); + assert_eq!(state.populated_groups, 2); + assert_eq!(state.min_max[0].as_deref(), Some(b"a".as_slice())); + assert_eq!(state.min_max[1].as_deref(), Some(b"bc".as_slice())); + + state.resize_min_max(0); + assert_eq!(state.min_max.len(), 0); + assert_eq!(state.total_data_bytes, 0); + assert_eq!(state.populated_groups, 0); +} + +#[test] +fn sequential_dense_counts_non_null_groups_without_spurious_updates() { + let total_groups = 6_usize; + let existing_values: Vec> = (0..total_groups) + .map(|group| format!("seed_{group:02}").into_bytes()) + .collect(); + let group_indices: Vec = (0..total_groups).collect(); + + let owned_replacements: Vec>> = vec![ + Some(b"aaa".to_vec()), // smaller -> should replace + Some(b"zzz".to_vec()), // larger -> should not replace + None, + Some(b"seed_03".to_vec()), // equal -> should not replace + None, + Some(b"aaa".to_vec()), // smaller -> should replace + ]; + + { + let mut state = MinMaxBytesState::new(DataType::Utf8); + state.resize_min_max(total_groups); + for (group, value) in existing_values.iter().enumerate() { + state.set_value(group, value); + } + + let stats = state + .update_batch_sequential_dense( + owned_replacements.iter().map(|value| value.as_deref()), + &group_indices, + total_groups, + |a, b| a < b, + ) + .expect("sequential dense update"); + + // Only four groups supplied non-null values in the batch. + assert_eq!(stats.unique_groups, 4); + assert_eq!(stats.max_group_index, Some(5)); + + // Groups 0 and 5 should have been updated with the smaller values. + assert_eq!(state.min_max[0].as_deref(), Some(b"aaa".as_slice())); + assert_eq!(state.min_max[5].as_deref(), Some(b"aaa".as_slice())); + + // Groups with larger/equal values must retain their existing minima. + assert_eq!(state.min_max[1].as_deref(), Some(b"seed_01".as_slice())); + assert_eq!(state.min_max[3].as_deref(), Some(b"seed_03".as_slice())); + + // Null groups are left untouched. + assert_eq!(state.min_max[2].as_deref(), Some(b"seed_02".as_slice())); + assert_eq!(state.min_max[4].as_deref(), Some(b"seed_04".as_slice())); + } + + let owned_replacements_with_null_tail: Vec>> = vec![ + Some(b"aaa".to_vec()), // smaller -> should replace + Some(b"zzz".to_vec()), // larger -> should not replace + None, + Some(b"seed_03".to_vec()), // equal -> should not replace + None, + None, // regression: highest group index is null in the batch + ]; + + let mut state = MinMaxBytesState::new(DataType::Utf8); + state.resize_min_max(total_groups); + for (group, value) in existing_values.iter().enumerate() { + state.set_value(group, value); + } + + let stats = state + .update_batch_sequential_dense( + owned_replacements_with_null_tail + .iter() + .map(|value| value.as_deref()), + &group_indices, + total_groups, + |a, b| a < b, + ) + .expect("sequential dense update"); + + // Only three groups supplied non-null values in the batch, but the maximum + // group index should still reflect the last slot in the batch even when + // that entry is null. + assert_eq!(stats.unique_groups, 3); + assert_eq!(stats.max_group_index, Some(5)); + + // Only the first group should have been updated with the smaller value. + assert_eq!(state.min_max[0].as_deref(), Some(b"aaa".as_slice())); + + // All other groups, including the null tail, must retain their original minima. + assert_eq!(state.min_max[1].as_deref(), Some(b"seed_01".as_slice())); + assert_eq!(state.min_max[2].as_deref(), Some(b"seed_02".as_slice())); + assert_eq!(state.min_max[3].as_deref(), Some(b"seed_03".as_slice())); + assert_eq!(state.min_max[4].as_deref(), Some(b"seed_04".as_slice())); + assert_eq!(state.min_max[5].as_deref(), Some(b"seed_05".as_slice())); +} + +#[test] +fn sequential_dense_reuses_allocation_across_batches() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + let total_groups = 512_usize; + let group_indices: Vec = (0..total_groups).collect(); + + let make_batch = |prefix: u8| -> Vec>> { + (0..total_groups) + .map(|group| { + Some(format!("{ch}{ch}_{group:05}", ch = char::from(prefix)).into_bytes()) + }) + .collect() + }; + + // Seed the accumulator with a batch of lexicographically large values. + let initial = make_batch(b'z'); + let stats = state + .update_batch_sequential_dense( + initial.iter().map(|value| value.as_deref()), + &group_indices, + total_groups, + |a, b| a < b, + ) + .expect("initial sequential dense update"); + assert_eq!(stats.unique_groups, total_groups); + + let baseline_size = state.size(); + + // Process several more batches where each value is strictly smaller than the + // previous one. All replacements keep the payload length constant so any + // increase in size would indicate a new allocation. + for step in 1..=5 { + let prefix = b'z' - step as u8; + let batch = make_batch(prefix); + state + .update_batch_sequential_dense( + batch.iter().map(|value| value.as_deref()), + &group_indices, + total_groups, + |a, b| a < b, + ) + .expect("sequential dense update"); + + assert_eq!(state.size(), baseline_size); + } +} + +#[test] +fn sequential_dense_batches_skip_dense_inline_marks_allocation() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + let total_groups = 2_048_usize; + let batch_size = 1_536_usize; // 75% density keeps DenseInline preferred + let group_indices: Vec = (0..batch_size).collect(); + + let make_batch = |step: usize| -> Vec> { + group_indices + .iter() + .map(|group| format!("{step:02}_{group:05}").into_bytes()) + .collect() + }; + + // First batch should drive the accumulator into DenseInline mode without + // touching the marks table because the internal fast path stays active. + let first_batch = make_batch(0); + state + .update_batch( + first_batch.iter().map(|value| Some(value.as_slice())), + &group_indices, + total_groups, + |a, b| a < b, + ) + .expect("first sequential dense batch"); + + assert!(matches!(state.workload_mode, WorkloadMode::DenseInline)); + assert!(state.dense_inline_marks_ready); + assert!(state.dense_inline_marks.is_empty()); + let initial_epoch = state.dense_inline_epoch; + + // Subsequent sequential batches should continue using the fast path + // without allocating or clearing the marks table. + for step in 1..=2 { + let batch = make_batch(step); + state + .update_batch( + batch.iter().map(|value| Some(value.as_slice())), + &group_indices, + total_groups, + |a, b| a < b, + ) + .unwrap_or_else(|err| panic!("sequential dense batch {step} failed: {err}")); + + assert!(state.dense_inline_marks.is_empty()); + assert_eq!(state.dense_inline_epoch, initial_epoch); + } +} + +#[test] +fn update_batch_duplicate_batches_match_expected_unique_counts() { + let mut state = MinMaxBytesState::new(DataType::Utf8); + let total_groups = 8_usize; + let repeats_per_group = 4_usize; + + let group_indices: Vec = (0..total_groups) + .flat_map(|group| std::iter::repeat_n(group, repeats_per_group)) + .collect(); + let values: Vec> = group_indices + .iter() + .map(|group| format!("value_{group:02}").into_bytes()) + .collect(); + + for batch in 0..3 { + let before = state.total_groups_seen; + state + .update_batch( + values.iter().map(|value| Some(value.as_slice())), + &group_indices, + total_groups, + |a, b| a < b, + ) + .expect("update batch"); + + assert_eq!( + state.total_groups_seen, + before + total_groups, + "batch {batch} should add exactly {total_groups} unique groups", + ); + } +}