Skip to content

Commit 2b9604f

Browse files
committed
Initial commit
1 parent 72ac4cb commit 2b9604f

File tree

15 files changed

+1686
-510
lines changed

15 files changed

+1686
-510
lines changed

Cargo.lock

Lines changed: 1487 additions & 459 deletions
Large diffs are not rendered by default.

Cargo.toml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ ahash = "0.8"
190190
anyhow = { version = "1.0", default-features = false }
191191
argh = "0.1.12"
192192
array-init = "2.1"
193-
arrow = { version = "55.2", default-features = false }
193+
arrow = { version = "56.1", default-features = false }
194194
async-stream = "0.3"
195195
axum = "0.8.4"
196196
backtrace = "0.3"
@@ -219,10 +219,10 @@ convert_case = "0.6"
219219
criterion = "0.5"
220220
cros-codecs = "0.0.6"
221221
crossbeam = "0.8"
222-
datafusion = { version = "47", default-features = false, features = [
222+
datafusion = { version = "50", default-features = false, features = [
223223
"nested_expressions",
224224
] }
225-
datafusion-ffi = "47"
225+
datafusion-ffi = "49.0.2"
226226
directories = "5"
227227
document-features = "0.2.8"
228228
econtext = "0.2" # Prints error contexts on crashes
@@ -275,15 +275,15 @@ nohash-hasher = "0.2"
275275
notify = { version = "6.1.1", features = ["macos_kqueue"] }
276276
num-derive = "0.4"
277277
num-traits = "0.2"
278-
numpy = "0.24"
278+
numpy = "0.25"
279279
objc2-app-kit = "0.3"
280280
opentelemetry = { version = "0.30", features = ["metrics"] }
281281
opentelemetry-appender-tracing = "0.30"
282282
opentelemetry-otlp = "0.30"
283283
opentelemetry_sdk = { version = "0.30", features = ["rt-tokio"] }
284284
ordered-float = "4.3.0"
285285
parking_lot = "0.12.3"
286-
parquet = { version = "55.1", default-features = false }
286+
parquet = { version = "56.1", default-features = false }
287287
paste = "1.0"
288288
pathdiff = "0.2"
289289
percent-encoding = "2.3"
@@ -301,8 +301,8 @@ prost-types = "0.13.3"
301301
prost-reflect = "0.15.3"
302302
puffin = "0.19.1"
303303
puffin_http = "0.16"
304-
pyo3 = "0.24.1"
305-
pyo3-build-config = "0.24.1"
304+
pyo3 = "0.25.1"
305+
pyo3-build-config = "0.25.1"
306306
quote = "1.0"
307307
rand = { version = "0.8", default-features = false, features = ["small_rng"] }
308308
rand_distr = { version = "0.4", default-features = false }

crates/store/re_datafusion/src/dataframe_query_common.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ impl DataframeQueryTableProvider {
223223
}
224224
Expr::BinaryExpr(binary) => {
225225
if binary.op == Operator::NotEq
226-
&& let (Expr::Column(col), Expr::Literal(sv))
227-
| (Expr::Literal(sv), Expr::Column(col)) =
226+
&& let (Expr::Column(col), Expr::Literal(sv, _))
227+
| (Expr::Literal(sv, _), Expr::Column(col)) =
228228
(binary.left.as_ref(), binary.right.as_ref())
229229
&& sv.is_null()
230230
{

crates/store/re_datafusion/src/dataframe_query_provider.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,13 +233,16 @@ impl PartitionStreamExec {
233233
SortOptions::new(false, true),
234234
));
235235
}
236-
vec![LexOrdering::new(physical_ordering)]
236+
vec![
237+
LexOrdering::new(physical_ordering)
238+
.expect("LexOrdering should return Some since input is not empty"),
239+
]
237240
} else {
238241
vec![]
239242
};
240243

241244
let eq_properties =
242-
EquivalenceProperties::new_with_orderings(Arc::clone(&projected_schema), &orderings);
245+
EquivalenceProperties::new_with_orderings(Arc::clone(&projected_schema), orderings);
243246

244247
let partition_in_output_schema = projection.map(|p| p.contains(&0)).unwrap_or(false);
245248

crates/store/re_datafusion/src/dataframe_query_provider_wasm.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,13 @@ impl PartitionStreamExec {
232232
));
233233
}
234234

235-
let orderings = vec![LexOrdering::new(physical_ordering)];
235+
let orderings = vec![
236+
LexOrdering::new(physical_ordering)
237+
.expect("LexOrdering should return Some when non-empty vec is passed"),
238+
];
236239

237240
let eq_properties =
238-
EquivalenceProperties::new_with_orderings(Arc::clone(&projected_schema), &orderings);
241+
EquivalenceProperties::new_with_orderings(Arc::clone(&projected_schema), orderings);
239242

240243
let partition_in_output_schema = projection.map(|p| p.contains(&0)).unwrap_or(false);
241244

crates/store/re_types_core/src/reflection.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ pub fn generic_placeholder_for_datatype(
224224
))
225225
}
226226

227+
DataType::Decimal32(_, _) => Arc::new(array::Decimal32Array::from_iter([0])),
228+
DataType::Decimal64(_, _) => Arc::new(array::Decimal64Array::from_iter([0])),
227229
DataType::Decimal128(_, _) => Arc::new(array::Decimal128Array::from_iter([0])),
228230

229231
DataType::Decimal256(_, _) => Arc::new(array::Decimal256Array::from_iter([

crates/utils/re_arrow_util/src/format_data_type.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ impl std::fmt::Display for DisplayDatatype<'_> {
110110
DataType::Dictionary(key, value) => {
111111
return write!(f, "Dictionary{{{}: {}}}", Self(key), Self(value));
112112
}
113+
DataType::Decimal32(_, _) => "Decimal32",
114+
DataType::Decimal64(_, _) => "Decimal64",
113115
DataType::Decimal128(_, _) => "Decimal128",
114116
DataType::Decimal256(_, _) => "Decimal256",
115117
DataType::BinaryView => "BinaryView",

crates/utils/re_byte_size/src/arrow_sizes.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ impl SizeBytes for DataType {
106106
| Self::Utf8
107107
| Self::LargeUtf8
108108
| Self::BinaryView
109+
| Self::Decimal32(_, _)
110+
| Self::Decimal64(_, _)
109111
| Self::Decimal128(_, _)
110112
| Self::Decimal256(_, _)
111113
| Self::FixedSizeBinary(_)

crates/viewer/re_arrow_ui/src/show_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ fn make_ui<'a>(
160160
| DataType::Utf8View | DataType::BinaryView
161161
| DataType::Date32 | DataType::Date64 | DataType::Time32(_) | DataType::Time64(_)
162162
| DataType::Timestamp(_, _) | DataType::Duration(_) | DataType::Interval(_)
163-
| DataType::Decimal128(_, _) | DataType::Decimal256(_, _)
163+
| DataType::Decimal32(_,_) | DataType::Decimal64(_,_) | DataType::Decimal128(_, _) | DataType::Decimal256(_, _)
164164
=> {
165165
show_arrow_builtin(array, options)
166166
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use arrow::datatypes::{DataType, Field};
2+
use datafusion::common::{DFSchema, ExprSchema as _};
3+
use datafusion::prelude::{Column, Expr, array_has, array_to_string, col, contains, lit, lower};
4+
5+
#[derive(Debug, Clone, thiserror::Error)]
6+
pub enum FilterError {
7+
#[error("column {0} was not found")]
8+
ColumnNotFound(Column),
9+
10+
#[error("invalid filter operation {0:?} for field {1}")]
11+
InvalidFilterOperation(FilterOperation, Box<Field>),
12+
}
13+
14+
/// A filter applied to a table.
15+
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
16+
pub struct Filter {
17+
pub column_name: String,
18+
pub operation: FilterOperation,
19+
}
20+
21+
impl Filter {
22+
pub fn new(column_name: impl Into<String>, operation: FilterOperation) -> Self {
23+
Self {
24+
column_name: column_name.into(),
25+
operation,
26+
}
27+
}
28+
29+
/// Convert to an [`Expr`].
30+
///
31+
/// The expression is used for filtering and should thus evaluate to a boolean.
32+
pub fn as_filter_expression(&self, schema: &DFSchema) -> Result<Expr, FilterError> {
33+
let column = Column::from(self.column_name.clone());
34+
let Ok(field) = schema.field_from_column(&column) else {
35+
return Err(FilterError::ColumnNotFound(column));
36+
};
37+
38+
self.operation.as_filter_expression(&column, field)
39+
}
40+
}
41+
42+
/// The kind of filter operation
43+
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
44+
pub enum FilterOperation {
45+
//TODO(ab): parameterise that over multiple string ops, e.g. "contains", "starts with", etc.
46+
StringContains(String),
47+
48+
BooleanEquals(bool),
49+
}
50+
51+
impl FilterOperation {
52+
pub fn default_for_datatype(data_type: &DataType) -> Option<Self> {
53+
match data_type {
54+
DataType::Utf8 | DataType::Utf8View => Some(Self::StringContains(String::new())),
55+
DataType::List(field) | DataType::ListView(field)
56+
if field.data_type() == &DataType::Utf8
57+
|| field.data_type() == &DataType::Utf8View =>
58+
{
59+
Some(Self::StringContains(String::new()))
60+
}
61+
62+
DataType::Boolean => Some(Self::BooleanEquals(true)),
63+
DataType::List(fields) | DataType::ListView(fields)
64+
if fields.data_type() == &DataType::Boolean =>
65+
{
66+
Some(Self::BooleanEquals(true))
67+
}
68+
69+
_ => None,
70+
}
71+
}
72+
73+
/// Convert to an [`Expr`].
74+
///
75+
/// The expression is used for filtering and should thus evaluate to a boolean.
76+
pub fn as_filter_expression(
77+
&self,
78+
column: &Column,
79+
field: &Field,
80+
) -> Result<Expr, FilterError> {
81+
match self {
82+
Self::StringContains(query_string) => {
83+
if query_string.is_empty() {
84+
return Ok(lit(true));
85+
}
86+
87+
let operand = match field.data_type() {
88+
DataType::Utf8 | DataType::Utf8View => col(column.clone()),
89+
90+
DataType::List(field) | DataType::ListView(field)
91+
if field.data_type() == &DataType::Utf8
92+
|| field.data_type() == &DataType::Utf8View =>
93+
{
94+
// for List[Utf8], we concatenate all the instances into a single logical
95+
// string
96+
array_to_string(col(column.clone()), lit(" "))
97+
}
98+
99+
_ => {
100+
return Err(FilterError::InvalidFilterOperation(
101+
self.clone(),
102+
field.clone().into(),
103+
));
104+
}
105+
};
106+
107+
Ok(contains(lower(operand), lower(lit(query_string))))
108+
}
109+
110+
Self::BooleanEquals(value) => match field.data_type() {
111+
DataType::Boolean => Ok(col(column.clone()).eq(lit(*value))),
112+
113+
DataType::List(field) | DataType::ListView(field)
114+
if field.data_type() == &DataType::Boolean =>
115+
{
116+
// all instances must be equal to the filter value
117+
Ok(!array_has(col(column.clone()), lit(!*value)))
118+
}
119+
120+
_ => Err(FilterError::InvalidFilterOperation(
121+
self.clone(),
122+
field.clone().into(),
123+
)),
124+
},
125+
}
126+
}
127+
}

0 commit comments

Comments
 (0)