Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 86 additions & 58 deletions src/cmd/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1670,56 +1670,53 @@ impl Args {
let needs_memory_aware_chunking =
which_stats.needs_memory_aware_chunking() && max_chunk_memory_mb.is_some();

let (chunking_mode_info, chunk_size) =
if max_chunk_memory_mb.is_some() || needs_memory_aware_chunking {
// Sample records for memory estimation
let sample_records = util::sample_records(&self.rconfig(), 1000);

// Calculate memory-aware chunk size
let chunk_size = calculate_memory_aware_chunk_size(
idx_count,
njobs,
max_chunk_memory_mb,
&which_stats,
sample_records.as_deref(),
);
// Estimate average record size from samples if available
let avg_record_size = if let Some(samples) = sample_records {
calculate_avg_record_size(&samples, &which_stats)
} else {
1024
};

let estimated_memory_mb =
estimate_chunk_memory(chunk_size, avg_record_size, &which_stats, headers.len())
/ (1024 * 1024);
let (chunking_mode_info, chunk_size) = if needs_memory_aware_chunking {
// Sample records for memory estimation
let sample_records = util::sample_records(&self.rconfig(), 1000);

// Calculate memory-aware chunk size
let chunk_size = calculate_memory_aware_chunk_size(
idx_count,
njobs,
max_chunk_memory_mb,
&which_stats,
sample_records.as_deref(),
);
// Estimate average record size from samples if available
let avg_record_size = if let Some(samples) = sample_records {
calculate_avg_record_size(&samples, &which_stats)
} else {
1024
};

let chunking_mode = if let Some(limit_mb) = max_chunk_memory_mb {
if limit_mb == 0 {
"dynamic (auto)"
} else {
"fixed limit"
}
} else {
"dynamic (auto)"
};
let estimated_memory_mb =
estimate_chunk_memory(chunk_size, avg_record_size, &which_stats, headers.len())
/ (1024 * 1024);

(
format!(
"Memory-aware chunking ({chunking_mode}): chunk_size={chunk_size}, \
estimated_memory_mb={estimated_memory_mb:.2}"
),
chunk_size,
)
// Safety: max_chunk_memory_mb is guaranteed Some(...) here since
// needs_memory_aware_chunking requires max_chunk_memory_mb.is_some()
let chunking_mode = if max_chunk_memory_mb.unwrap_or(0) == 0 {
"dynamic (auto)"
} else {
// CPU-based chunking
let chunk_size = util::chunk_size(idx_count as usize, njobs);
(
format!("CPU-based chunking: chunk_size={chunk_size}"),
chunk_size,
)
"fixed limit"
};

(
format!(
"Memory-aware chunking ({chunking_mode}): chunk_size={chunk_size}, \
estimated_memory_mb={estimated_memory_mb:.2}"
),
chunk_size,
)
} else {
// CPU-based chunking
let chunk_size = util::chunk_size(idx_count as usize, njobs);
(
format!("CPU-based chunking: chunk_size={chunk_size}"),
chunk_size,
)
};

let nchunks = util::num_of_chunks(idx_count as usize, chunk_size);
log::info!("({chunking_mode_info}) nchunks={nchunks}");

Expand Down Expand Up @@ -2026,9 +2023,9 @@ impl Args {
fn which_stats(&self) -> WhichStats {
WhichStats {
include_nulls: self.flag_nulls,
sum: !self.flag_typesonly || self.flag_infer_boolean,
sum: !self.flag_typesonly,
range: !self.flag_typesonly || self.flag_infer_boolean,
dist: !self.flag_typesonly || self.flag_infer_boolean,
dist: !self.flag_typesonly,
cardinality: self.flag_everything || self.flag_cardinality,
median: !self.flag_everything && self.flag_median && !self.flag_quartiles,
mad: self.flag_everything || self.flag_mad,
Expand Down Expand Up @@ -2307,6 +2304,8 @@ fn calculate_memory_aware_chunk_size(
},
Some(0) => {
// Dynamic sizing: sample records to estimate average size
// Note: caller already gates on needs_memory_aware_chunking, so we always
// use dynamic sizing here. The None arm has its own guard for direct callers.
util::calculate_dynamic_chunk_size(idx_count, njobs, sample_records, |record| {
estimate_record_memory(record, which_stats)
})
Expand Down Expand Up @@ -3177,6 +3176,23 @@ impl Stats {
/// # Safety
///
/// * Uses unsafe code for performance-critical operations
/// Updates modes/cardinality trackers with a sample value.
/// Weighted modes and unweighted modes are mutually exclusive.
#[inline(always)]
fn update_modes(&mut self, sample: &[u8], weight: f64) {
if let Some(ref mut wm) = self.weighted_modes {
// Weighted modes: accumulate weights per value
// Use get_mut first to avoid heap-allocating sample.to_vec() when key already exists
if let Some(val) = wm.get_mut(sample) {
*val += weight;
} else {
wm.insert(sample.to_vec(), weight);
}
} else if let Some(v) = self.modes.as_mut() {
v.add_bytes(sample);
}
}

/// * Assumes valid UTF-8 input for string operations
/// * Bounds checking is avoided where safe
#[allow(clippy::inline_always)]
Expand Down Expand Up @@ -3204,6 +3220,28 @@ impl Stats {

let t = self.typ;

// typesonly + infer_boolean: only need minmax + cardinality for boolean inference
if self.which.typesonly {
// safety: MinMax is enabled because range=true is set for infer_boolean.
// The preceding `if self.which.typesonly && !infer_boolean { return; }` guard
// ensures we only reach here when infer_boolean is true.
debug_assert!(
self.minmax.is_some(),
"minmax must be enabled for typesonly+infer_boolean"
);
unsafe {
self.minmax
.as_mut()
.unwrap_unchecked()
.add_with_parsed(t, sample, float_val, int_val);
}
self.update_modes(sample, weight);
if sample_type == TNull {
self.nullcount += 1;
}
return;
}

// Update total weight for weighted statistics
// Skip entirely when weights aren't active (the common case)
if self.which.use_weights && weight > 0.0 {
Expand Down Expand Up @@ -3235,17 +3273,7 @@ impl Stats {
// Modes/cardinality less common but still frequent
// These are mutually exclusive: weighted_modes is used when weights are active,
// otherwise the unweighted modes (Unsorted) tracker is used.
if let Some(ref mut wm) = self.weighted_modes {
// Weighted modes: accumulate weights per value
// Use get_mut first to avoid heap-allocating sample.to_vec() when key already exists
if let Some(val) = wm.get_mut(sample) {
*val += weight;
} else {
wm.insert(sample.to_vec(), weight);
}
} else if let Some(v) = self.modes.as_mut() {
v.add_bytes(sample);
}
self.update_modes(sample, weight);

if t == TString {
// safety: online_len is always enabled when t == TString
Expand Down
8 changes: 3 additions & 5 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1194,11 +1194,9 @@ where
let cpu_based_chunks = num_of_chunks(idx_count as usize, cpu_based_chunk_size);

// Prefer CPU-based chunking if:
// 1. It creates more chunks (better parallelization), OR
// 2. Memory allows for CPU-based chunks (memory_based_chunk_size >= cpu_based_chunk_size)
// and CPU-based creates at least as many chunks as CPUs
if (cpu_based_chunks > memory_based_chunks)
|| (memory_based_chunk_size >= cpu_based_chunk_size && cpu_based_chunks >= njobs)
// 1. Memory-based chunk size is smaller than CPU-based (degenerate/low memory), OR
// 2. It creates more chunks (better parallelization)
if memory_based_chunk_size <= cpu_based_chunk_size || cpu_based_chunks > memory_based_chunks
{
cpu_based_chunk_size
} else {
Expand Down