Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,7 @@ class PyExpr:
def approx_percentiles(self, percentiles: float | list[float]) -> PyExpr: ...
def mean(self) -> PyExpr: ...
def stddev(self) -> PyExpr: ...
def var(self, ddof: int = 1) -> PyExpr: ...
def min(self) -> PyExpr: ...
def max(self) -> PyExpr: ...
def bool_and(self) -> PyExpr: ...
Expand Down
15 changes: 15 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,21 @@ def stddev(self) -> Expression:

return stddev(self)

def var(self, ddof: int = 1) -> Expression:
"""Calculates the variance of the values in the expression.

Args:
ddof: Delta degrees of freedom. The divisor used in calculations
is N - ddof, where N is the number of non-null elements.
Defaults to 1 (sample variance).

Tip: See Also
[`daft.functions.var`](https://docs.daft.ai/en/stable/api/functions/var/)
"""
from daft.functions import var
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline import violates custom rule 430ffc3f: imports should be at the top of the file. While this pattern is used consistently across the codebase (e.g., stddev() method), it still conflicts with the stated guideline.

Suggested change
from daft.functions import var
return var(self, ddof)

Move the import to the top of the file with other imports.

Context Used: Rule from dashboard - Import statements should be placed at the top of the file rather than inline within functions or met... (source)

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Prompt To Fix With AI
This is a comment left during a code review.
Path: daft/expressions/expressions.py
Line: 1001:1001

Comment:
Inline import violates custom rule 430ffc3f: imports should be at the top of the file. While this pattern is used consistently across the codebase (e.g., `stddev()` method), it still conflicts with the stated guideline.

```suggestion
        return var(self, ddof)
```

Move the import to the top of the file with other imports.

**Context Used:** Rule from `dashboard` - Import statements should be placed at the top of the file rather than inline within functions or met... ([source](https://app.greptile.com/review/custom-context?memory=430ffc3f-245c-4a7f-8039-aba31c0ed558))

<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>

How can I resolve this? If you propose a fix, please make it concise.


return var(self, ddof)

def min(self) -> Expression:
"""Calculates the minimum value in the expression.

Expand Down
2 changes: 2 additions & 0 deletions daft/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
mean,
avg,
stddev,
var,
min,
max,
bool_and,
Expand Down Expand Up @@ -442,6 +443,7 @@
"upload",
"upper",
"value_counts",
"var",
"video_file",
"video_keyframes",
"video_metadata",
Expand Down
15 changes: 15 additions & 0 deletions daft/functions/agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,21 @@ def stddev(expr: Expression) -> Expression:
return Expression._from_pyexpr(expr._expr.stddev())


def var(expr: Expression, ddof: int = 1) -> Expression:
"""Calculates the variance of the values in the expression.

Args:
expr: The input expression to calculate variance for.
ddof: Delta degrees of freedom. The divisor used in calculations
is N - ddof, where N is the number of non-null elements.
Defaults to 1 (sample variance).

Returns:
Expression representing the variance.
"""
return Expression._from_pyexpr(expr._expr.var(ddof))


def min(expr: Expression) -> Expression:
"""Calculates the minimum of the values in the expression."""
return Expression._from_pyexpr(expr._expr.min())
Expand Down
7 changes: 7 additions & 0 deletions src/daft-core/src/array/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ mod time;
pub mod trigonometry;
mod truncate;
mod utf8;
mod variance;

use std::hash::BuildHasher;

Expand Down Expand Up @@ -222,6 +223,12 @@ pub trait DaftStddevAggable {
fn grouped_stddev(&self, groups: &GroupIndices) -> Self::Output;
}

pub trait DaftVarianceAggable {
type Output;
fn var(&self, ddof: usize) -> Self::Output;
fn grouped_var(&self, groups: &GroupIndices, ddof: usize) -> Self::Output;
}

pub trait DaftCompareAggable {
type Output;
fn min(&self) -> Self::Output;
Expand Down
34 changes: 34 additions & 0 deletions src/daft-core/src/array/ops/variance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use common_error::DaftResult;
use daft_arrow::array::PrimitiveArray;

use crate::{
array::{
DataArray,
ops::{DaftVarianceAggable, GroupIndices},
},
datatypes::Float64Type,
utils::stats,
};

impl DaftVarianceAggable for DataArray<Float64Type> {
type Output = DaftResult<Self>;

fn var(&self, ddof: usize) -> Self::Output {
let stats = stats::calculate_stats(self)?;
let values = self.into_iter().flatten().copied();
let variance = stats::calculate_variance(stats, values, ddof);
let field = self.field.clone();
let data = PrimitiveArray::<f64>::from([variance]).boxed();
Self::new(field, data)
}

fn grouped_var(&self, groups: &GroupIndices, ddof: usize) -> Self::Output {
let grouped_variances_iter = stats::grouped_stats(self, groups)?.map(|(stats, group)| {
let values = group.iter().filter_map(|&index| self.get(index as _));
stats::calculate_variance(stats, values, ddof)
});
let field = self.field.clone();
let data = PrimitiveArray::<f64>::from_iter(grouped_variances_iter).boxed();
Self::new(field, data)
}
}
13 changes: 13 additions & 0 deletions src/daft-core/src/datatypes/agg_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ pub fn try_stddev_aggregation_supertype(dtype: &DataType) -> DaftResult<DataType
}
}

/// Get the data type that the variance of a column of the given data type should be casted to.
pub fn try_variance_aggregation_supertype(dtype: &DataType) -> DaftResult<DataType> {
match dtype {
d if d.is_numeric() => Ok(DataType::Float64),
DataType::Decimal128(..) => Ok(DataType::Float64),
DataType::Null => Ok(DataType::Float64),
_ => Err(DaftError::TypeError(format!(
"Variance is not supported for: {}",
dtype
))),
}
}

/// Get the data type that the skew of a column of the given data type should be casted to.
pub fn try_skew_aggregation_supertype(dtype: &DataType) -> DaftResult<DataType> {
match dtype {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-core/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::ops::{Add, Div, Mul, Rem, Sub};

pub use agg_ops::{
try_mean_aggregation_supertype, try_product_supertype, try_skew_aggregation_supertype,
try_stddev_aggregation_supertype, try_sum_supertype,
try_stddev_aggregation_supertype, try_sum_supertype, try_variance_aggregation_supertype,
};
use daft_arrow::{
compute::comparison::Simd8,
Expand Down
23 changes: 22 additions & 1 deletion src/daft-core/src/series/ops/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
ops::{
DaftApproxSketchAggable, DaftCountAggable, DaftHllMergeAggable, DaftMeanAggable,
DaftProductAggable, DaftSetAggable, DaftSkewAggable as _, DaftStddevAggable,
DaftSumAggable, GroupIndices,
DaftSumAggable, DaftVarianceAggable, GroupIndices,
},
},
count_mode::CountMode,
Expand Down Expand Up @@ -260,6 +260,27 @@ impl Series {
}
}

pub fn var(&self, groups: Option<&GroupIndices>, ddof: usize) -> DaftResult<Self> {
let target_type = try_variance_aggregation_supertype(self.data_type())?;
match target_type {
DataType::Float64 => {
let casted = self.cast(&DataType::Float64)?;
let casted = casted.f64()?;
let series = groups
.map_or_else(
|| casted.var(ddof),
|groups| casted.grouped_var(groups, ddof),
)?
.into_series();
Ok(series)
}
_ => Err(DaftError::not_implemented(format!(
"Variance not implemented for {target_type}, source type: {}",
self.data_type()
))),
}
}

pub fn min(&self, groups: Option<&GroupIndices>) -> DaftResult<Self> {
self.inner.min(groups)
}
Expand Down
15 changes: 15 additions & 0 deletions src/daft-core/src/utils/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,21 @@ pub fn calculate_stddev(stats: Stats, values: impl Iterator<Item = f64>) -> Opti
})
}

pub fn calculate_variance(
stats: Stats,
values: impl Iterator<Item = f64>,
ddof: usize,
) -> Option<f64> {
stats.mean.and_then(|mean| {
let n = stats.count as usize;
if n <= ddof {
return None; // Not enough data points for the requested ddof
}
let sum_of_squares = values.map(|value| (value - mean).powi(2)).sum::<f64>();
Some(sum_of_squares / (n - ddof) as f64)
})
}

pub fn calculate_skew(stats: Stats, values: impl Iterator<Item = f64>) -> Option<f64> {
let count = stats.count;
stats.mean.map(|mean| {
Expand Down
1 change: 1 addition & 0 deletions src/daft-dsl/src/expr/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub fn extract_agg_expr(expr: &ExprRef) -> DaftResult<AggExpr> {
}
AggExpr::Mean(e) => AggExpr::Mean(Expr::Alias(e, name.clone()).into()),
AggExpr::Stddev(e) => AggExpr::Stddev(Expr::Alias(e, name.clone()).into()),
AggExpr::Var(e, ddof) => AggExpr::Var(Expr::Alias(e, name.clone()).into(), ddof),
AggExpr::Min(e) => AggExpr::Min(Expr::Alias(e, name.clone()).into()),
AggExpr::Max(e) => AggExpr::Max(Expr::Alias(e, name.clone()).into()),
AggExpr::BoolAnd(e) => AggExpr::BoolAnd(Expr::Alias(e, name.clone()).into()),
Expand Down
23 changes: 23 additions & 0 deletions src/daft-dsl/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use daft_core::{
datatypes::{
InferDataType, try_mean_aggregation_supertype, try_product_supertype,
try_skew_aggregation_supertype, try_stddev_aggregation_supertype, try_sum_supertype,
try_variance_aggregation_supertype,
},
join::JoinSide,
lit::Literal,
Expand Down Expand Up @@ -418,6 +419,9 @@ pub enum AggExpr {
#[display("stddev({_0})")]
Stddev(ExprRef),

#[display("var({_0}, ddof={_1})")]
Var(ExprRef, usize),

#[display("min({_0})")]
Min(ExprRef),

Expand Down Expand Up @@ -538,6 +542,7 @@ impl AggExpr {
Self::MergeSketch(_, _) => "Merge Sketch",
Self::Mean(_) => "Mean",
Self::Stddev(_) => "Stddev",
Self::Var(_, _) => "Var",
Self::Min(_) => "Min",
Self::Max(_) => "Max",
Self::BoolAnd(_) => "Bool And",
Expand All @@ -563,6 +568,7 @@ impl AggExpr {
| Self::MergeSketch(expr, _)
| Self::Mean(expr)
| Self::Stddev(expr)
| Self::Var(expr, _)
| Self::Min(expr)
| Self::Max(expr)
| Self::BoolAnd(expr)
Expand Down Expand Up @@ -629,6 +635,10 @@ impl AggExpr {
let child_id = expr.semantic_id(schema);
FieldID::new(format!("{child_id}.local_stddev()"))
}
Self::Var(expr, ddof) => {
let child_id = expr.semantic_id(schema);
FieldID::new(format!("{child_id}.local_var(ddof={ddof})"))
}
Self::Min(expr) => {
let child_id = expr.semantic_id(schema);
FieldID::new(format!("{child_id}.local_min()"))
Expand Down Expand Up @@ -683,6 +693,7 @@ impl AggExpr {
| Self::MergeSketch(expr, _)
| Self::Mean(expr)
| Self::Stddev(expr)
| Self::Var(expr, _)
| Self::Min(expr)
| Self::Max(expr)
| Self::BoolAnd(expr)
Expand Down Expand Up @@ -710,6 +721,7 @@ impl AggExpr {
Self::Product(_) => Self::Product(first_child()),
Self::Mean(_) => Self::Mean(first_child()),
Self::Stddev(_) => Self::Stddev(first_child()),
&Self::Var(_, ddof) => Self::Var(first_child(), ddof),
Self::Min(_) => Self::Min(first_child()),
Self::Max(_) => Self::Max(first_child()),
Self::BoolAnd(_) => Self::BoolAnd(first_child()),
Expand Down Expand Up @@ -838,6 +850,13 @@ impl AggExpr {
try_stddev_aggregation_supertype(&field.dtype)?,
))
}
Self::Var(expr, _) => {
let field = expr.to_field(schema)?;
Ok(Field::new(
field.name.as_str(),
try_variance_aggregation_supertype(&field.dtype)?,
))
}

Self::Min(expr) | Self::Max(expr) | Self::AnyValue(expr, _) => {
let field = expr.to_field(schema)?;
Expand Down Expand Up @@ -1129,6 +1148,10 @@ impl Expr {
Self::Agg(AggExpr::Stddev(self)).into()
}

pub fn var(self: ExprRef, ddof: usize) -> ExprRef {
Self::Agg(AggExpr::Var(self, ddof)).into()
}

pub fn min(self: ExprRef) -> ExprRef {
Self::Agg(AggExpr::Min(self)).into()
}
Expand Down
4 changes: 4 additions & 0 deletions src/daft-dsl/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,10 @@ impl PyExpr {
Ok(self.expr.clone().stddev().into())
}

pub fn var(&self, ddof: usize) -> PyResult<Self> {
Ok(self.expr.clone().var(ddof).into())
}

pub fn min(&self) -> PyResult<Self> {
Ok(self.expr.clone().min().into())
}
Expand Down
36 changes: 35 additions & 1 deletion src/daft-local-plan/src/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use daft_dsl::{
AggExpr, ApproxPercentileParams, ExprRef, SketchType, bound_col,
expr::bound_expr::{BoundAggExpr, BoundExpr},
functions::agg::merge_mean,
lit,
lit, null_lit,
};
use daft_functions::numeric::sqrt;
use daft_functions_list::{count_distinct, distinct};
Expand Down Expand Up @@ -190,6 +190,40 @@ pub fn populate_aggregation_stages_bound_with_schema(

final_stage(result);
}
AggExpr::Var(expr, ddof) => {
// The variance calculation we're performing here is:
// var(X, ddof) = (E(X^2) - E(X)^2) * n / (n - ddof)
// where X is the sub_expr.
//
// First stage, we compute `sum(X^2)`, `sum(X)` and `count(X)`.
// Second stage, we get global versions: `global_sqsum`, `global_sum`, `global_count`.
// In the final projection, we compute:
// ((global_sqsum / global_count) - (global_sum / global_count) ^ 2) * global_count / (global_count - ddof)

let expr = expr.clone().cast(&DataType::Float64);

let sum_col = first_stage!(AggExpr::Sum(expr.clone()));
let sq_sum_col = first_stage!(AggExpr::Sum(expr.clone().mul(expr.clone())));
let count_col = first_stage!(AggExpr::Count(expr, CountMode::Valid));

let global_sum_col = second_stage!(AggExpr::Sum(sum_col));
let global_sq_sum_col = second_stage!(AggExpr::Sum(sq_sum_col));
let global_count_col = second_stage!(AggExpr::Sum(count_col));

// Population variance = (sqsum/n - (sum/n)^2)
let n = global_count_col.clone().cast(&DataType::Float64);
let sq_mean = global_sq_sum_col.div(n.clone());
let mean = global_sum_col.clone().div(n.clone());
let mean_sq = mean.clone().mul(mean);
let pop_var = sq_mean.sub(mean_sq);

// Adjust for ddof: sample_var = pop_var * n / (n - ddof)
let ddof_expr = lit(*ddof as f64);
let adjusted = pop_var.mul(n.clone()).div(n.clone().sub(ddof_expr.clone()));
let result = n.clone().lt_eq(ddof_expr).if_else(null_lit(), adjusted);

final_stage(result);
}
AggExpr::Min(expr) => {
let min_col = first_stage!(AggExpr::Min(expr.clone()));
let global_min_col = second_stage!(AggExpr::Min(min_col));
Expand Down
6 changes: 6 additions & 0 deletions src/daft-logical-plan/src/ops/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,12 @@ fn replace_column_with_semantic_id_aggexpr(
replace_column_with_semantic_id(child.clone(), subexprs_to_replace, schema)
.map_yes_no(AggExpr::Stddev, |_| e)
}
AggExpr::Var(ref child, ddof) => {
replace_column_with_semantic_id(child.clone(), subexprs_to_replace, schema).map_yes_no(
|transformed_child| AggExpr::Var(transformed_child, ddof),
|_| e,
)
}
AggExpr::Min(ref child) => {
replace_column_with_semantic_id(child.clone(), subexprs_to_replace, schema)
.map_yes_no(AggExpr::Min, |_| e)
Expand Down
3 changes: 3 additions & 0 deletions src/daft-recordbatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,9 @@ impl RecordBatch {
AggExpr::Stddev(expr) => self
.eval_expression(&BoundExpr::new_unchecked(expr.clone()))?
.stddev(groups),
AggExpr::Var(expr, ddof) => self
.eval_expression(&BoundExpr::new_unchecked(expr.clone()))?
.var(groups, *ddof),
AggExpr::Min(expr) => self
.eval_expression(&BoundExpr::new_unchecked(expr.clone()))?
.min(groups),
Expand Down
Loading
Loading