Skip to content

Commit f289529

Browse files
authored
ensure statistics are type compliant with schema (#571)
This PR adds ensure all the types in table statistics are compatible with table schema types.
1 parent c6f1f74 commit f289529

File tree

2 files changed

+53
-51
lines changed

2 files changed

+53
-51
lines changed

server/src/catalog/column.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
use std::cmp::{max, min};
2020

21+
use arrow_schema::DataType;
22+
use datafusion::scalar::ScalarValue;
2123
use parquet::file::statistics::Statistics;
2224

2325
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
@@ -85,6 +87,40 @@ impl TypedStatistics {
8587
_ => panic!("Cannot update wrong types"),
8688
}
8789
}
90+
91+
pub fn min_max_as_scalar(self, datatype: &DataType) -> Option<(ScalarValue, ScalarValue)> {
92+
let (min, max) = match (self, datatype) {
93+
(TypedStatistics::Bool(stats), DataType::Boolean) => (
94+
ScalarValue::Boolean(Some(stats.min)),
95+
ScalarValue::Boolean(Some(stats.max)),
96+
),
97+
(TypedStatistics::Int(stats), DataType::Int32) => (
98+
ScalarValue::Int32(Some(stats.min as i32)),
99+
ScalarValue::Int32(Some(stats.max as i32)),
100+
),
101+
(TypedStatistics::Int(stats), DataType::Int64) => (
102+
ScalarValue::Int64(Some(stats.min)),
103+
ScalarValue::Int64(Some(stats.max)),
104+
),
105+
(TypedStatistics::Float(stats), DataType::Float32) => (
106+
ScalarValue::Float32(Some(stats.min as f32)),
107+
ScalarValue::Float32(Some(stats.max as f32)),
108+
),
109+
(TypedStatistics::Float(stats), DataType::Float64) => (
110+
ScalarValue::Float64(Some(stats.min)),
111+
ScalarValue::Float64(Some(stats.max)),
112+
),
113+
(TypedStatistics::String(stats), DataType::Utf8) => (
114+
ScalarValue::Utf8(Some(stats.min)),
115+
ScalarValue::Utf8(Some(stats.max)),
116+
),
117+
_ => {
118+
return None;
119+
}
120+
};
121+
122+
Some((min, max))
123+
}
88124
}
89125

90126
/// Column statistics are used to track statistics for a column in a given file.

server/src/query/stream_schema_provider.rs

Lines changed: 17 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc};
2020

21-
use arrow_schema::{DataType, Schema, SchemaRef, SortOptions};
21+
use arrow_schema::{Schema, SchemaRef, SortOptions};
2222
use bytes::Bytes;
2323
use chrono::{NaiveDateTime, Timelike, Utc};
2424
use datafusion::{
@@ -236,57 +236,23 @@ fn partitioned_files(
236236
count += num_rows;
237237
}
238238

239-
let mut statistics = vec![];
240-
241-
for field in table_schema.fields() {
242-
let Some(stats) = column_statistics
243-
.get(field.name())
244-
.and_then(|stats| stats.as_ref())
245-
else {
246-
statistics.push(datafusion::common::ColumnStatistics::default());
247-
break;
248-
};
249-
250-
let datatype = field.data_type();
251-
252-
let (min, max) = match (stats, datatype) {
253-
(TypedStatistics::Bool(stats), DataType::Boolean) => (
254-
ScalarValue::Boolean(Some(stats.min)),
255-
ScalarValue::Boolean(Some(stats.max)),
256-
),
257-
(TypedStatistics::Int(stats), DataType::Int32) => (
258-
ScalarValue::Int32(Some(stats.min as i32)),
259-
ScalarValue::Int32(Some(stats.max as i32)),
260-
),
261-
(TypedStatistics::Int(stats), DataType::Int64) => (
262-
ScalarValue::Int64(Some(stats.min)),
263-
ScalarValue::Int64(Some(stats.max)),
264-
),
265-
(TypedStatistics::Float(stats), DataType::Float32) => (
266-
ScalarValue::Float32(Some(stats.min as f32)),
267-
ScalarValue::Float32(Some(stats.max as f32)),
268-
),
269-
(TypedStatistics::Float(stats), DataType::Float64) => (
270-
ScalarValue::Float64(Some(stats.min)),
271-
ScalarValue::Float64(Some(stats.max)),
272-
),
273-
(TypedStatistics::String(stats), DataType::Utf8) => (
274-
ScalarValue::Utf8(Some(stats.min.clone())),
275-
ScalarValue::Utf8(Some(stats.max.clone())),
276-
),
277-
_ => {
278-
statistics.push(datafusion::common::ColumnStatistics::default());
279-
break;
280-
}
281-
};
282-
283-
statistics.push(datafusion::common::ColumnStatistics {
284-
null_count: None,
285-
max_value: Some(max),
286-
min_value: Some(min),
287-
distinct_count: None,
239+
let statistics = table_schema
240+
.fields()
241+
.iter()
242+
.map(|field| {
243+
column_statistics
244+
.get(field.name())
245+
.and_then(|stats| stats.as_ref())
246+
.and_then(|stats| stats.clone().min_max_as_scalar(field.data_type()))
247+
.map(|(min, max)| datafusion::common::ColumnStatistics {
248+
null_count: None,
249+
max_value: Some(max),
250+
min_value: Some(min),
251+
distinct_count: None,
252+
})
253+
.unwrap_or_default()
288254
})
289-
}
255+
.collect();
290256

291257
let statistics = datafusion::common::Statistics {
292258
num_rows: Some(count as usize),

0 commit comments

Comments
 (0)