Skip to content

Commit c02fb0f

Browse files
authored
Refactor query (#207)
Fixes #87
1 parent a5fd8b6 commit c02fb0f

File tree

4 files changed

+238
-205
lines changed

4 files changed

+238
-205
lines changed

server/src/query.rs

Lines changed: 14 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,15 @@
1616
*
1717
*/
1818

19-
use arrow_schema::SchemaRef;
20-
use async_trait::async_trait;
19+
mod table_provider;
20+
2121
use chrono::TimeZone;
2222
use chrono::{DateTime, Utc};
2323
use datafusion::arrow::datatypes::Schema;
24-
use datafusion::arrow::ipc::reader::StreamReader;
2524
use datafusion::arrow::record_batch::RecordBatch;
26-
use datafusion::datasource::file_format::parquet::ParquetFormat;
27-
use datafusion::datasource::listing::ListingOptions;
28-
use datafusion::datasource::listing::ListingTable;
29-
use datafusion::datasource::listing::ListingTableConfig;
30-
use datafusion::datasource::listing::ListingTableUrl;
31-
use datafusion::datasource::{MemTable, TableProvider};
32-
use datafusion::error::DataFusionError;
33-
use datafusion::execution::context::SessionState;
34-
use datafusion::logical_expr::TableType;
35-
use datafusion::physical_plan::union::UnionExec;
36-
use datafusion::physical_plan::ExecutionPlan;
25+
use datafusion::datasource::TableProvider;
3726
use datafusion::prelude::*;
3827
use serde_json::Value;
39-
use std::any::Any;
40-
use std::fs::File;
4128
use std::path::Path;
4229
use std::path::PathBuf;
4330
use std::sync::Arc;
@@ -50,6 +37,7 @@ use crate::utils::TimePeriod;
5037
use crate::validator;
5138

5239
use self::error::{ExecuteError, ParseError};
40+
use table_provider::QueryTableProvider;
5341

5442
type Key = &'static str;
5543
fn get_value(value: &Value, key: Key) -> Result<&str, Key> {
@@ -110,41 +98,24 @@ impl Query {
11098

11199
let parquet_files: Vec<PathBuf> = possible_parquet_files.chain(parquet_files).collect();
112100

113-
let mut results = vec![];
114-
115-
if !(arrow_files.is_empty() && parquet_files.is_empty()) {
116-
self.execute_on_cache(
117-
arrow_files,
118-
parquet_files,
119-
self.schema.clone(),
120-
&mut results,
121-
)
122-
.await?;
123-
}
124-
125-
storage.query(self, &mut results).await?;
126-
Ok(results)
127-
}
128-
129-
async fn execute_on_cache(
130-
&self,
131-
arrow_files: Vec<PathBuf>,
132-
parquet_files: Vec<PathBuf>,
133-
schema: Arc<Schema>,
134-
results: &mut Vec<RecordBatch>,
135-
) -> Result<(), ExecuteError> {
136-
let ctx = SessionContext::new();
137-
let table = Arc::new(QueryTableProvider::new(arrow_files, parquet_files, schema));
101+
let ctx =
102+
SessionContext::with_config_rt(SessionConfig::default(), storage.query_runtime_env());
103+
let table = Arc::new(QueryTableProvider::new(
104+
arrow_files,
105+
parquet_files,
106+
storage.query_table(self)?,
107+
Arc::clone(&self.schema),
108+
));
138109
ctx.register_table(
139110
&*self.stream_name,
140111
Arc::clone(&table) as Arc<dyn TableProvider>,
141112
)
142113
.map_err(ObjectStorageError::DataFusionError)?;
143114
// execute the query and collect results
144115
let df = ctx.sql(self.query.as_str()).await?;
145-
results.extend(df.collect().await?);
116+
let results = df.collect().await?;
146117
table.remove_preserve();
147-
Ok(())
118+
Ok(results)
148119
}
149120
}
150121

@@ -166,127 +137,6 @@ fn time_from_path(path: &Path) -> DateTime<Utc> {
166137
.expect("valid prefix is parsed")
167138
}
168139

169-
#[derive(Debug)]
170-
struct QueryTableProvider {
171-
arrow_files: Vec<PathBuf>,
172-
parquet_files: Vec<PathBuf>,
173-
schema: Arc<Schema>,
174-
}
175-
176-
impl QueryTableProvider {
177-
fn new(arrow_files: Vec<PathBuf>, parquet_files: Vec<PathBuf>, schema: Arc<Schema>) -> Self {
178-
// By the time this query executes the arrow files could be converted to parquet files
179-
// we want to preserve these files as well in case
180-
181-
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
182-
for file in &parquet_files {
183-
parquet_cached.upsert(file)
184-
}
185-
186-
Self {
187-
arrow_files,
188-
parquet_files,
189-
schema,
190-
}
191-
}
192-
193-
pub fn remove_preserve(&self) {
194-
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
195-
for file in &self.parquet_files {
196-
parquet_cached.remove(file)
197-
}
198-
}
199-
200-
pub async fn create_physical_plan(
201-
&self,
202-
ctx: &SessionState,
203-
projection: &Option<Vec<usize>>,
204-
filters: &[Expr],
205-
limit: Option<usize>,
206-
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
207-
let mut mem_records: Vec<Vec<RecordBatch>> = Vec::new();
208-
let mut parquet_files = self.parquet_files.clone();
209-
for file in &self.arrow_files {
210-
let Ok(arrow_file) = File::open(file) else { continue; };
211-
let reader = StreamReader::try_new(arrow_file, None)?;
212-
let records = reader
213-
.filter_map(|record| match record {
214-
Ok(record) => Some(record),
215-
Err(e) => {
216-
log::warn!("warning from arrow stream {:?}", e);
217-
None
218-
}
219-
})
220-
.collect();
221-
mem_records.push(records);
222-
223-
let mut file = file.clone();
224-
file.set_extension("parquet");
225-
226-
parquet_files.retain(|p| p != &file)
227-
}
228-
229-
let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?;
230-
let memexec = memtable.scan(ctx, projection, filters, limit).await?;
231-
232-
if parquet_files.is_empty() {
233-
Ok(memexec)
234-
} else {
235-
let listing_options = ListingOptions {
236-
file_extension: ".parquet".to_owned(),
237-
format: Arc::new(ParquetFormat::default().with_enable_pruning(true)),
238-
table_partition_cols: vec![],
239-
collect_stat: true,
240-
target_partitions: 1,
241-
};
242-
243-
let paths = parquet_files
244-
.clone()
245-
.into_iter()
246-
.map(|path| {
247-
ListingTableUrl::parse(path.to_str().expect("path should is valid unicode"))
248-
.expect("path is valid for filesystem listing")
249-
})
250-
.collect();
251-
252-
let config = ListingTableConfig::new_with_multi_paths(paths)
253-
.with_listing_options(listing_options)
254-
.with_schema(Arc::clone(&self.schema));
255-
256-
let listtable = ListingTable::try_new(config).unwrap();
257-
let listexec = listtable.scan(ctx, projection, filters, limit).await?;
258-
259-
Ok(Arc::new(UnionExec::new(vec![memexec, listexec])))
260-
}
261-
}
262-
}
263-
264-
#[async_trait]
265-
impl TableProvider for QueryTableProvider {
266-
fn as_any(&self) -> &dyn Any {
267-
self
268-
}
269-
270-
fn schema(&self) -> SchemaRef {
271-
Arc::clone(&self.schema)
272-
}
273-
274-
fn table_type(&self) -> TableType {
275-
TableType::Base
276-
}
277-
278-
async fn scan(
279-
&self,
280-
ctx: &SessionState,
281-
projection: &Option<Vec<usize>>,
282-
filters: &[Expr],
283-
limit: Option<usize>,
284-
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
285-
self.create_physical_plan(ctx, projection, filters, limit)
286-
.await
287-
}
288-
}
289-
290140
pub mod error {
291141
use datafusion::error::DataFusionError;
292142

server/src/query/table_provider.rs

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Parseable Server (C) 2022 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use arrow_schema::Schema;
20+
use arrow_schema::SchemaRef;
21+
use async_trait::async_trait;
22+
use datafusion::arrow::ipc::reader::StreamReader;
23+
use datafusion::arrow::record_batch::RecordBatch;
24+
use datafusion::datasource::file_format::parquet::ParquetFormat;
25+
use datafusion::datasource::listing::{
26+
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
27+
};
28+
use datafusion::datasource::{MemTable, TableProvider};
29+
use datafusion::error::DataFusionError;
30+
use datafusion::execution::context::SessionState;
31+
use datafusion::logical_expr::TableType;
32+
use datafusion::physical_plan::union::UnionExec;
33+
use datafusion::physical_plan::ExecutionPlan;
34+
use datafusion::prelude::Expr;
35+
use std::any::Any;
36+
use std::fs::File;
37+
use std::path::PathBuf;
38+
use std::sync::Arc;
39+
40+
pub struct QueryTableProvider {
41+
arrow_files: Vec<PathBuf>,
42+
parquet_files: Vec<PathBuf>,
43+
storage: ListingTable,
44+
schema: Arc<Schema>,
45+
}
46+
47+
impl QueryTableProvider {
48+
pub fn new(
49+
arrow_files: Vec<PathBuf>,
50+
parquet_files: Vec<PathBuf>,
51+
storage: ListingTable,
52+
schema: Arc<Schema>,
53+
) -> Self {
54+
// By the time this query executes the arrow files could be converted to parquet files
55+
// we want to preserve these files as well in case
56+
57+
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
58+
for file in &parquet_files {
59+
parquet_cached.upsert(file)
60+
}
61+
62+
Self {
63+
arrow_files,
64+
parquet_files,
65+
storage,
66+
schema,
67+
}
68+
}
69+
70+
pub fn remove_preserve(&self) {
71+
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
72+
for file in &self.parquet_files {
73+
parquet_cached.remove(file)
74+
}
75+
}
76+
77+
async fn create_physical_plan(
78+
&self,
79+
ctx: &SessionState,
80+
projection: &Option<Vec<usize>>,
81+
filters: &[Expr],
82+
limit: Option<usize>,
83+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
84+
let mut mem_records: Vec<Vec<RecordBatch>> = Vec::new();
85+
let mut parquet_files = self.parquet_files.clone();
86+
for file in &self.arrow_files {
87+
load_arrows(file, &mut mem_records, &mut parquet_files);
88+
}
89+
90+
let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?;
91+
let memexec = memtable.scan(ctx, projection, filters, limit).await?;
92+
93+
let cache_exec = if parquet_files.is_empty() {
94+
memexec
95+
} else {
96+
let listtable = local_parquet_table(&parquet_files, &self.schema)?;
97+
let listexec = listtable.scan(ctx, projection, filters, limit).await?;
98+
Arc::new(UnionExec::new(vec![memexec, listexec]))
99+
};
100+
let storage_exec = self.storage.scan(ctx, projection, filters, limit).await?;
101+
102+
Ok(Arc::new(UnionExec::new(vec![cache_exec, storage_exec])))
103+
}
104+
}
105+
106+
#[async_trait]
107+
impl TableProvider for QueryTableProvider {
108+
fn as_any(&self) -> &dyn Any {
109+
self
110+
}
111+
112+
fn schema(&self) -> SchemaRef {
113+
Arc::clone(&self.schema)
114+
}
115+
116+
fn table_type(&self) -> TableType {
117+
TableType::Base
118+
}
119+
120+
async fn scan(
121+
&self,
122+
ctx: &SessionState,
123+
projection: &Option<Vec<usize>>,
124+
filters: &[Expr],
125+
limit: Option<usize>,
126+
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
127+
self.create_physical_plan(ctx, projection, filters, limit)
128+
.await
129+
}
130+
}
131+
132+
fn local_parquet_table(
133+
parquet_files: &[PathBuf],
134+
schema: &SchemaRef,
135+
) -> Result<ListingTable, DataFusionError> {
136+
let listing_options = ListingOptions {
137+
file_extension: ".parquet".to_owned(),
138+
format: Arc::new(ParquetFormat::default().with_enable_pruning(true)),
139+
table_partition_cols: vec![],
140+
collect_stat: true,
141+
target_partitions: 1,
142+
};
143+
144+
let paths = parquet_files
145+
.iter()
146+
.map(|path| {
147+
ListingTableUrl::parse(path.to_str().expect("path should is valid unicode"))
148+
.expect("path is valid for filesystem listing")
149+
})
150+
.collect();
151+
152+
let config = ListingTableConfig::new_with_multi_paths(paths)
153+
.with_listing_options(listing_options)
154+
.with_schema(Arc::clone(schema));
155+
156+
ListingTable::try_new(config)
157+
}
158+
159+
fn load_arrows(
160+
file: &PathBuf,
161+
mem_records: &mut Vec<Vec<RecordBatch>>,
162+
parquet_files: &mut Vec<PathBuf>,
163+
) {
164+
let Ok(arrow_file) = File::open(file) else { return; };
165+
let Ok(reader)= StreamReader::try_new(arrow_file, None) else { return; };
166+
let records = reader
167+
.filter_map(|record| match record {
168+
Ok(record) => Some(record),
169+
Err(e) => {
170+
log::warn!("warning from arrow stream {:?}", e);
171+
None
172+
}
173+
})
174+
.collect();
175+
mem_records.push(records);
176+
let mut file = file.clone();
177+
file.set_extension("parquet");
178+
parquet_files.retain(|p| p != &file);
179+
}

0 commit comments

Comments
 (0)