Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
557 changes: 324 additions & 233 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ ahash = "0.8"
anyhow = { version = "1.0", default-features = false }
argh = "0.1.12"
array-init = "2.1"
arrow = { version = "55.2", default-features = false }
arrow = { version = "56.1", default-features = false }
async-stream = "0.3"
axum = "0.8.4"
backtrace = "0.3"
Expand Down Expand Up @@ -219,10 +219,10 @@ convert_case = "0.6"
criterion = "0.5"
cros-codecs = "0.0.6"
crossbeam = "0.8"
datafusion = { version = "47", default-features = false, features = [
datafusion = { version = "50", default-features = false, features = [
"nested_expressions",
] }
datafusion-ffi = "47"
datafusion-ffi = "50"
directories = "5"
document-features = "0.2.8"
econtext = "0.2" # Prints error contexts on crashes
Expand Down Expand Up @@ -275,15 +275,15 @@ nohash-hasher = "0.2"
notify = { version = "6.1.1", features = ["macos_kqueue"] }
num-derive = "0.4"
num-traits = "0.2"
numpy = "0.24"
numpy = "0.25"
objc2-app-kit = "0.3"
opentelemetry = { version = "0.30", features = ["metrics"] }
opentelemetry-appender-tracing = "0.30"
opentelemetry-otlp = "0.30"
opentelemetry_sdk = { version = "0.30", features = ["rt-tokio"] }
ordered-float = "4.3.0"
parking_lot = "0.12.3"
parquet = { version = "55.1", default-features = false }
parquet = { version = "56.1", default-features = false }
paste = "1.0"
pathdiff = "0.2"
percent-encoding = "2.3"
Expand All @@ -301,8 +301,8 @@ prost-types = "0.13.3"
prost-reflect = "0.15.3"
puffin = "0.19.1"
puffin_http = "0.16"
pyo3 = "0.24.1"
pyo3-build-config = "0.24.1"
pyo3 = "0.25.1"
pyo3-build-config = "0.25.1"
quote = "1.0"
rand = { version = "0.8", default-features = false, features = ["small_rng"] }
rand_distr = { version = "0.4", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_datafusion/src/dataframe_query_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ impl DataframeQueryTableProvider {
}
Expr::BinaryExpr(binary) => {
if binary.op == Operator::NotEq
&& let (Expr::Column(col), Expr::Literal(sv))
| (Expr::Literal(sv), Expr::Column(col)) =
&& let (Expr::Column(col), Expr::Literal(sv, _))
| (Expr::Literal(sv, _), Expr::Column(col)) =
(binary.left.as_ref(), binary.right.as_ref())
&& sv.is_null()
{
Expand Down
7 changes: 5 additions & 2 deletions crates/store/re_datafusion/src/dataframe_query_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,16 @@ impl PartitionStreamExec {
SortOptions::new(false, true),
));
}
vec![LexOrdering::new(physical_ordering)]
vec![
LexOrdering::new(physical_ordering)
.expect("LexOrdering should return Some since input is not empty"),
]
} else {
vec![]
};

let eq_properties =
EquivalenceProperties::new_with_orderings(Arc::clone(&projected_schema), &orderings);
EquivalenceProperties::new_with_orderings(Arc::clone(&projected_schema), orderings);

let partition_in_output_schema = projection.map(|p| p.contains(&0)).unwrap_or(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,13 @@ impl PartitionStreamExec {
));
}

let orderings = vec![LexOrdering::new(physical_ordering)];
let orderings = vec![
LexOrdering::new(physical_ordering)
.expect("LexOrdering should return Some when non-empty vec is passed"),
];

let eq_properties =
EquivalenceProperties::new_with_orderings(Arc::clone(&projected_schema), &orderings);
EquivalenceProperties::new_with_orderings(Arc::clone(&projected_schema), orderings);

let partition_in_output_schema = projection.map(|p| p.contains(&0)).unwrap_or(false);

Expand Down
2 changes: 2 additions & 0 deletions crates/store/re_types_core/src/reflection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ pub fn generic_placeholder_for_datatype(
))
}

DataType::Decimal32(_, _) => Arc::new(array::Decimal32Array::from_iter([0])),
DataType::Decimal64(_, _) => Arc::new(array::Decimal64Array::from_iter([0])),
DataType::Decimal128(_, _) => Arc::new(array::Decimal128Array::from_iter([0])),

DataType::Decimal256(_, _) => Arc::new(array::Decimal256Array::from_iter([
Expand Down
2 changes: 2 additions & 0 deletions crates/utils/re_arrow_util/src/format_data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ impl std::fmt::Display for DisplayDatatype<'_> {
DataType::Dictionary(key, value) => {
return write!(f, "Dictionary{{{}: {}}}", Self(key), Self(value));
}
DataType::Decimal32(_, _) => "Decimal32",
DataType::Decimal64(_, _) => "Decimal64",
DataType::Decimal128(_, _) => "Decimal128",
DataType::Decimal256(_, _) => "Decimal256",
DataType::BinaryView => "BinaryView",
Expand Down
2 changes: 2 additions & 0 deletions crates/utils/re_byte_size/src/arrow_sizes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ impl SizeBytes for DataType {
| Self::Utf8
| Self::LargeUtf8
| Self::BinaryView
| Self::Decimal32(_, _)
| Self::Decimal64(_, _)
| Self::Decimal128(_, _)
| Self::Decimal256(_, _)
| Self::FixedSizeBinary(_)
Expand Down
2 changes: 1 addition & 1 deletion crates/viewer/re_arrow_ui/src/show_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ fn make_ui<'a>(
| DataType::Utf8View | DataType::BinaryView
| DataType::Date32 | DataType::Date64 | DataType::Time32(_) | DataType::Time64(_)
| DataType::Timestamp(_, _) | DataType::Duration(_) | DataType::Interval(_)
| DataType::Decimal128(_, _) | DataType::Decimal256(_, _)
| DataType::Decimal32(_,_) | DataType::Decimal64(_,_) | DataType::Decimal128(_, _) | DataType::Decimal256(_, _)
=> {
show_arrow_builtin(array, options)
}
Expand Down
2 changes: 2 additions & 0 deletions crates/viewer/re_dataframe_ui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ egui_dnd.workspace = true
egui_table.workspace = true
itertools.workspace = true
nohash-hasher.workspace = true
num-traits.workspace = true
ordered-float.workspace = true
parking_lot.workspace = true
serde.workspace = true
thiserror.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use re_ui::syntax_highlighting::SyntaxHighlightedBuilder;
use super::{FilterError, FilterUiAction, Nullability};

/// A filter for a boolean column.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum BooleanFilter {
/// Filter for strictly non-nullable columns.
///
Expand Down
35 changes: 14 additions & 21 deletions crates/viewer/re_dataframe_ui/src/filters/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use std::sync::Arc;

use arrow::array::{ArrayRef, BooleanArray, ListArray, as_list_array};
use arrow::datatypes::{DataType, Field};
use datafusion::common::{DFSchema, Result as DataFusionResult, exec_err};
use datafusion::common::{DFSchema, ExprSchema as _, Result as DataFusionResult, exec_err};
use datafusion::logical_expr::{
ArrayFunctionArgument, ArrayFunctionSignature, ColumnarValue, ScalarFunctionArgs, ScalarUDF,
ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use datafusion::prelude::{Column, Expr, array_to_string, col, lit, lower};
use ordered_float::OrderedFloat;

use super::BooleanFilter;

Expand Down Expand Up @@ -92,7 +93,7 @@ pub enum FilterError {
}

/// A filter applied to a table.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Filter {
pub column_name: String,
pub operation: FilterOperation,
Expand All @@ -119,7 +120,7 @@ impl Filter {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ComparisonOperator {
Eq,
Ne,
Expand Down Expand Up @@ -172,7 +173,7 @@ impl ComparisonOperator {
}

/// The kind of filter operation
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum FilterOperation {
/// Compare an integer value to a constant.
///
Expand All @@ -187,7 +188,7 @@ pub enum FilterOperation {
/// For columns of lists of floats, only the first value is considered.
FloatCompares {
operator: ComparisonOperator,
value: Option<f64>,
value: Option<OrderedFloat<f64>>,
},

//TODO(ab): parameterise that over multiple string ops, e.g. "contains", "starts with", etc.
Expand Down Expand Up @@ -280,7 +281,10 @@ impl FilterOperation {
}
};

Ok(contains_patch(lower(operand), lower(lit(query_string))))
Ok(datafusion::prelude::contains(
lower(operand),
lower(lit(query_string)),
))
}

Self::Boolean(boolean_filter) => boolean_filter.as_filter_expression(column, field),
Expand All @@ -290,7 +294,7 @@ impl FilterOperation {

/// Custom UDF for evaluating filter operations.
//TODO(ab): consider splitting the vectorized filtering part from the `any`/`all` aggregation.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct FilterOperationUdf {
op: FilterOperation,
signature: Signature,
Expand Down Expand Up @@ -375,8 +379,8 @@ impl FilterOperationUdf {
let Some(value) = value else {
return Some(true);
};

x.map(|x| operator.apply(x, *value as _))
use num_traits::AsPrimitive as _;
x.map(|x| operator.apply(x, value.as_()))
})
.collect();

Expand Down Expand Up @@ -458,7 +462,7 @@ impl ScalarUDFImpl for FilterOperationUdf {
}
}

fn invoke_with_args(&self, args: ScalarFunctionArgs<'_>) -> DataFusionResult<ColumnarValue> {
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DataFusionResult<ColumnarValue> {
let ColumnarValue::Array(input_array) = &args.args[0] else {
return exec_err!("FilterOperation expected array inputs, not scalar values");
};
Expand Down Expand Up @@ -490,14 +494,3 @@ impl ScalarUDFImpl for FilterOperationUdf {
}
}
}

// TODO(ab): this is a workaround for https://github.com/apache/datafusion/pull/16046. Next time we
// update datafusion, this should break compilation. Remove this function and replace
// `contains_patch` by `datafusion::prelude::contains` in the method above.
fn contains_patch(arg1: Expr, arg2: Expr) -> Expr {
// make sure we break compilation when we update datafusion
#[cfg(debug_assertions)]
let _ = datafusion::prelude::contains();

datafusion::functions::string::contains().call(<[_]>::into_vec(Box::new([arg1, arg2])))
}
6 changes: 4 additions & 2 deletions crates/viewer/re_dataframe_ui/src/filters/filter_ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl SyntaxHighlighting for FilterOperation {

Self::FloatCompares { value, operator: _ } => {
if let Some(value) = value {
builder.append_primitive(&re_format::format_f64(*value));
builder.append_primitive(&re_format::format_f64(value.into_inner()));
} else {
builder.append_primitive("…");
}
Expand Down Expand Up @@ -461,6 +461,8 @@ impl FilterOperation {

#[cfg(test)]
mod tests {
use ordered_float::OrderedFloat;

use super::*;
use crate::filters::BooleanFilter;

Expand Down Expand Up @@ -493,7 +495,7 @@ mod tests {
(
FilterOperation::FloatCompares {
operator: ComparisonOperator::Ge,
value: Some(10.5),
value: Some(OrderedFloat(10.5)),
},
"float_compares",
),
Expand Down
2 changes: 1 addition & 1 deletion crates/viewer/re_dataframe_ui/src/table_blueprint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub struct EntryLinksSpec {
}

/// The "blueprint" for a table, a.k.a the specification of how it should look.
#[derive(Debug, Clone, PartialEq, Default)]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct TableBlueprint {
pub sort_by: Option<SortBy>,
pub partition_links: Option<PartitionLinksSpec>,
Expand Down
11 changes: 6 additions & 5 deletions crates/viewer/re_dataframe_ui/tests/filter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use arrow::record_batch::RecordBatch;
use datafusion::catalog::MemTable;
use datafusion::prelude::{DataFrame, SessionContext};

use ordered_float::OrderedFloat;
use re_dataframe_ui::{BooleanFilter, ComparisonOperator, Filter, FilterOperation, Nullability};
use re_viewer_context::external::tokio;

Expand Down Expand Up @@ -414,7 +415,7 @@ async fn test_float_compares() {
filter_snapshot!(
FilterOperation::FloatCompares {
operator: *op,
value: Some(3.0),
value: Some(OrderedFloat(3.0)),
},
floats.clone(),
format!("{}_3.0", op.as_ascii())
Expand All @@ -423,7 +424,7 @@ async fn test_float_compares() {
filter_snapshot!(
FilterOperation::FloatCompares {
operator: *op,
value: Some(4.0),
value: Some(OrderedFloat(4.0)),
},
floats_nulls.clone(),
format!("nulls_{}_4", op.as_ascii())
Expand All @@ -439,7 +440,7 @@ async fn test_float_all_types() {
filter_snapshot!(
FilterOperation::FloatCompares {
operator: ComparisonOperator::Eq,
value: Some(3.0),
value: Some(OrderedFloat(3.0)),
},
TestColumn::primitive::<$ty>(vec![1.0, 2.0, 3.0, 4.0, 5.0]),
format!("{:?}", $ty {})
Expand Down Expand Up @@ -468,7 +469,7 @@ async fn test_float_lists() {
filter_snapshot!(
FilterOperation::FloatCompares {
operator: *op,
value: Some(2.0)
value: Some(OrderedFloat(2.0))
},
float_lists.clone(),
format!("{}_2.0", op.as_ascii())
Expand All @@ -477,7 +478,7 @@ async fn test_float_lists() {
filter_snapshot!(
FilterOperation::FloatCompares {
operator: *op,
value: Some(2.0)
value: Some(OrderedFloat(2.0))
},
float_lists_nulls.clone(),
format!("nulls_{}_2.0", op.as_ascii())
Expand Down
7 changes: 3 additions & 4 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ all-features = true
[advisories]
version = 2
ignore = [
"RUSTSEC-2024-0384", # Waiting for https://github.com/console-rs/indicatif/pull/666
"RUSTSEC-2024-0436", # https://rustsec.org/advisories/RUSTSEC-2024-0436 - paste is unmaintained - https://github.com/dtolnay/paste
"RUSTSEC-2024-0014", # https://rustsec.org/advisories/RUSTSEC-2024-0014 - generational-arena is unmaintained
]
Expand All @@ -54,6 +53,7 @@ deny = [
skip = [
{ name = "base64" }, # Too popular
{ name = "block2" }, # Old version via rfd
{ name = "bzip2" }, # Remove after https://github.com/apache/datafusion/pull/17509 closes
{ name = "cargo-platform" }, # Older version used by ply-rs. It's build-time only!
{ name = "cargo_metadata" }, # Older version used by ply-rs. It's small, and it's build-time only!
{ name = "core-foundation" }, # Currently, e.g. `webbrowser` and `winit` use different versions.
Expand All @@ -63,7 +63,7 @@ skip = [
{ name = "objc2-foundation" }, # `accesskit_macos` uses a different version than `arboard`
{ name = "objc2" }, # `accesskit_macos` uses a different version than `arboard`
{ name = "ordered-float" }, # Old version being used by parquet, but super small!
{ name = "pollster" }, # rfd is still on 0.3
{ name = "petgraph" }, # Remove after https://github.com/tokio-rs/prost/pull/1268 resolves
{ name = "pulldown-cmark" }, # Build-dependency via `ply-rs` (!). TODO(emilk): use a better crate for .ply parsing
{ name = "redox_syscall" }, # Plenty of versions in the wild
{ name = "rustc-hash" }, # numpy with compatible pyo3 requires different version than wgpu
Expand Down Expand Up @@ -95,8 +95,8 @@ allow = [
"OFL-1.1", # https://spdx.org/licenses/OFL-1.1.html
"Ubuntu-font-1.0", # https://ubuntu.com/legal/font-licence
"Unicode-3.0", # https://www.unicode.org/license.txt
"Unicode-DFS-2016", # https://spdx.org/licenses/Unicode-DFS-2016.html
"Zlib", # https://tldrlegal.com/license/zlib-libpng-license-(zlib)
"bzip2-1.0.6", # https://github.com/trifectatechfoundation/libbzip2-rs/blob/v0.2.2/COPYING
]
exceptions = []

Expand All @@ -110,7 +110,6 @@ name = "ring"
expression = "MIT AND ISC AND OpenSSL"
license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }]


[sources]
unknown-registry = "deny"
unknown-git = "deny"
Expand Down
Loading
Loading