Skip to content

Commit c6f1f74

Browse files
trueleonitisht
andauthored
feat: use custom table provider to reuse statistics. (#566)
This PR adds a new table provider implementation that directly creates physical plan from manifest file entries. File level statistics are aggregated into table level statistics which is available for count optimisation. Note: Aggregate Statistics will only take effect for time range which lie exactly on minute boundary. Otherwise they are treated as partial filters and plan changes, hence will not use statistics anymore. --------- Co-authored-by: Nitish Tiwari <[email protected]>
1 parent 40cddb5 commit c6f1f74

File tree

6 files changed

+417
-218
lines changed

6 files changed

+417
-218
lines changed

server/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub mod snapshot;
3636
pub use manifest::create_from_parquet_file;
3737

3838
pub trait Snapshot {
39-
fn manifests(&self, time_predicates: Vec<PartialTimeFilter>) -> Vec<ManifestItem>;
39+
fn manifests(&self, time_predicates: &[PartialTimeFilter]) -> Vec<ManifestItem>;
4040
}
4141

4242
pub trait ManifestFile {

server/src/catalog/snapshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl Default for Snapshot {
3838
}
3939

4040
impl super::Snapshot for Snapshot {
41-
fn manifests(&self, time_predicates: Vec<PartialTimeFilter>) -> Vec<ManifestItem> {
41+
fn manifests(&self, time_predicates: &[PartialTimeFilter]) -> Vec<ManifestItem> {
4242
let mut manifests = self.manifest_list.clone();
4343
for predicate in time_predicates {
4444
match predicate {

server/src/query.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818

1919
mod filter_optimizer;
20+
mod listing_table_builder;
2021
mod stream_schema_provider;
2122
mod table_provider;
2223

@@ -34,7 +35,6 @@ use datafusion::prelude::*;
3435
use itertools::Itertools;
3536
use once_cell::sync::Lazy;
3637
use std::collections::HashMap;
37-
use std::ops::Add;
3838
use std::path::{Path, PathBuf};
3939
use std::sync::Arc;
4040
use sysinfo::{System, SystemExt};
@@ -172,12 +172,10 @@ impl Query {
172172
PartialTimeFilter::Low(std::ops::Bound::Included(self.start.naive_utc())).binary_expr(
173173
Expr::Column(Column::from_name(event::DEFAULT_TIMESTAMP_KEY)),
174174
);
175-
let end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(
176-
self.end.add(chrono::Duration::milliseconds(1)).naive_utc(),
177-
))
178-
.binary_expr(Expr::Column(Column::from_name(
179-
event::DEFAULT_TIMESTAMP_KEY,
180-
)));
175+
let end_time_filter =
176+
PartialTimeFilter::High(std::ops::Bound::Excluded(self.end.naive_utc())).binary_expr(
177+
Expr::Column(Column::from_name(event::DEFAULT_TIMESTAMP_KEY)),
178+
);
181179

182180
// see https://github.com/apache/arrow-datafusion/pull/8400
183181
// this can be eliminated in later version of datafusion but with slight caveat
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use std::{collections::HashMap, ops::Bound, pin::Pin, sync::Arc};
20+
21+
use arrow_schema::Schema;
22+
use datafusion::{
23+
datasource::{
24+
file_format::parquet::ParquetFormat,
25+
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
26+
},
27+
error::DataFusionError,
28+
logical_expr::col,
29+
};
30+
use futures_util::{future, stream::FuturesUnordered, Future, TryStreamExt};
31+
use itertools::Itertools;
32+
use object_store::{ObjectMeta, ObjectStore};
33+
34+
use crate::{
35+
event::DEFAULT_TIMESTAMP_KEY,
36+
storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY},
37+
utils::TimePeriod,
38+
};
39+
40+
use super::PartialTimeFilter;
41+
42+
// Listing Table Builder for querying old data
43+
#[derive(Debug, Default)]
44+
pub struct ListingTableBuilder {
45+
stream: String,
46+
listing: Vec<String>,
47+
}
48+
49+
impl ListingTableBuilder {
50+
pub fn new(stream: String) -> Self {
51+
Self {
52+
stream,
53+
..Self::default()
54+
}
55+
}
56+
57+
pub async fn populate_via_listing(
58+
self,
59+
storage: Arc<dyn ObjectStorage + Send>,
60+
client: Arc<dyn ObjectStore>,
61+
time_filters: &[PartialTimeFilter],
62+
) -> Result<Self, DataFusionError> {
63+
let start_time = time_filters
64+
.iter()
65+
.filter_map(|x| match x {
66+
PartialTimeFilter::Low(Bound::Excluded(x)) => Some(x),
67+
PartialTimeFilter::Low(Bound::Included(x)) => Some(x),
68+
_ => None,
69+
})
70+
.min()
71+
.cloned();
72+
73+
let end_time = time_filters
74+
.iter()
75+
.filter_map(|x| match x {
76+
PartialTimeFilter::High(Bound::Excluded(x)) => Some(x),
77+
PartialTimeFilter::High(Bound::Included(x)) => Some(x),
78+
_ => None,
79+
})
80+
.max()
81+
.cloned();
82+
83+
let Some((start_time, end_time)) = start_time.zip(end_time) else {
84+
return Err(DataFusionError::NotImplemented(
85+
"The time predicate is not supported because of possibly querying older data."
86+
.to_string(),
87+
));
88+
};
89+
90+
let prefixes = TimePeriod::new(
91+
start_time.and_utc(),
92+
end_time.and_utc(),
93+
OBJECT_STORE_DATA_GRANULARITY,
94+
)
95+
.generate_prefixes();
96+
97+
let prefixes = prefixes
98+
.into_iter()
99+
.map(|entry| {
100+
let path =
101+
relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, entry));
102+
storage.absolute_url(path.as_relative_path()).to_string()
103+
})
104+
.collect_vec();
105+
106+
let mut minute_resolve: HashMap<String, Vec<String>> = HashMap::new();
107+
let mut all_resolve = Vec::new();
108+
109+
for prefix in prefixes {
110+
let components = prefix.split_terminator('/');
111+
if components.last().is_some_and(|x| x.starts_with("minute")) {
112+
let hour_prefix = &prefix[0..prefix.rfind("minute").expect("minute exists")];
113+
minute_resolve
114+
.entry(hour_prefix.to_owned())
115+
.and_modify(|list| list.push(prefix))
116+
.or_default();
117+
} else {
118+
all_resolve.push(prefix)
119+
}
120+
}
121+
122+
type ResolveFuture = Pin<
123+
Box<dyn Future<Output = Result<Vec<ObjectMeta>, object_store::Error>> + Send + 'static>,
124+
>;
125+
126+
let tasks: FuturesUnordered<ResolveFuture> = FuturesUnordered::new();
127+
128+
for (listing_prefix, prefix) in minute_resolve {
129+
let client = Arc::clone(&client);
130+
tasks.push(Box::pin(async move {
131+
let mut list = client
132+
.list(Some(&object_store::path::Path::from(listing_prefix)))
133+
.await?
134+
.try_collect::<Vec<_>>()
135+
.await?;
136+
137+
list.retain(|object| {
138+
prefix.iter().any(|prefix| {
139+
object
140+
.location
141+
.prefix_matches(&object_store::path::Path::from(prefix.as_ref()))
142+
})
143+
});
144+
145+
Ok(list)
146+
}));
147+
}
148+
149+
for prefix in all_resolve {
150+
let client = Arc::clone(&client);
151+
tasks.push(Box::pin(async move {
152+
client
153+
.list(Some(&object_store::path::Path::from(prefix)))
154+
.await?
155+
.try_collect::<Vec<_>>()
156+
.await
157+
.map_err(Into::into)
158+
}));
159+
}
160+
161+
let res: Vec<Vec<String>> = tasks
162+
.and_then(|res| {
163+
future::ok(
164+
res.into_iter()
165+
.map(|res| res.location.to_string())
166+
.collect_vec(),
167+
)
168+
})
169+
.try_collect()
170+
.await?;
171+
172+
let mut res = res.into_iter().flatten().collect_vec();
173+
res.sort();
174+
res.reverse();
175+
176+
Ok(Self {
177+
stream: self.stream,
178+
listing: res,
179+
})
180+
}
181+
182+
pub fn build(
183+
self,
184+
schema: Arc<Schema>,
185+
map: impl Fn(Vec<String>) -> Vec<ListingTableUrl>,
186+
) -> Result<Option<Arc<ListingTable>>, DataFusionError> {
187+
if self.listing.is_empty() {
188+
return Ok(None);
189+
}
190+
191+
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
192+
let file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]];
193+
let listing_options = ListingOptions::new(Arc::new(file_format))
194+
.with_file_extension(".parquet")
195+
.with_file_sort_order(file_sort_order)
196+
.with_collect_stat(true)
197+
.with_target_partitions(1);
198+
199+
let config = ListingTableConfig::new_with_multi_paths(map(self.listing))
200+
.with_listing_options(listing_options)
201+
.with_schema(schema);
202+
203+
let listing_table = Arc::new(ListingTable::try_new(config)?);
204+
Ok(Some(listing_table))
205+
}
206+
}

0 commit comments

Comments
 (0)