Skip to content

Commit 5e20d5c

Browse files
authored
Re-implement support for query on local cache (#204)
This PR integrated a basic custom execution plan for datafusion based on existing listing features. Whenever query happens it fetches all the arrows file and loads them into memory. Ideally it will have to load two or three arrow files, this is trivial and done through memory execution plan. Other one is cached parquet which may or may not be synced at query time, this is queried through already existing listing table. Finally result of local execution is combined with that of execution for object storage. To prevent files from being removed while they are used for query there is now a basic global file tracker which will only remove file once it has been uploaded and no on going query is using that file.
1 parent 788f02b commit 5e20d5c

File tree

4 files changed

+348
-62
lines changed

4 files changed

+348
-62
lines changed

server/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ async fn main() -> anyhow::Result<()> {
7272
if let Err(e) = metadata::STREAM_INFO.load(&storage).await {
7373
warn!("could not populate local metadata. {:?}", e);
7474
}
75+
// track all parquet files already in the data directory
76+
storage::CACHED_FILES.track_parquet();
7577

7678
let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync();
7779
let (mut s3sync_handler, mut s3sync_outbox, mut s3sync_inbox) = s3_sync();

server/src/option.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,6 @@ impl<S> Server<S>
245245
where
246246
S: Clone + clap::Args + StorageOpt,
247247
{
248-
pub fn get_cache_path(&self, stream_name: &str) -> PathBuf {
249-
self.local_disk_path.join(stream_name)
250-
}
251-
252248
pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
253249
self.local_disk_path.join(stream_name)
254250
}

server/src/query.rs

Lines changed: 204 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,36 @@
1616
*
1717
*/
1818

19+
use arrow_schema::SchemaRef;
20+
use async_trait::async_trait;
21+
use chrono::TimeZone;
1922
use chrono::{DateTime, Utc};
2023
use datafusion::arrow::datatypes::Schema;
24+
use datafusion::arrow::ipc::reader::StreamReader;
2125
use datafusion::arrow::record_batch::RecordBatch;
2226
use datafusion::datasource::file_format::parquet::ParquetFormat;
2327
use datafusion::datasource::listing::ListingOptions;
2428
use datafusion::datasource::listing::ListingTable;
2529
use datafusion::datasource::listing::ListingTableConfig;
2630
use datafusion::datasource::listing::ListingTableUrl;
31+
use datafusion::datasource::{MemTable, TableProvider};
32+
use datafusion::error::DataFusionError;
33+
use datafusion::execution::context::SessionState;
34+
use datafusion::logical_expr::TableType;
35+
use datafusion::physical_plan::union::UnionExec;
36+
use datafusion::physical_plan::ExecutionPlan;
2737
use datafusion::prelude::*;
2838
use serde_json::Value;
39+
use std::any::Any;
40+
use std::fs::File;
41+
use std::path::Path;
42+
use std::path::PathBuf;
2943
use std::sync::Arc;
3044

31-
use crate::option::CONFIG;
3245
use crate::storage;
3346
use crate::storage::ObjectStorage;
3447
use crate::storage::ObjectStorageError;
48+
use crate::storage::StorageDir;
3549
use crate::utils::TimePeriod;
3650
use crate::validator;
3751

@@ -75,56 +89,201 @@ impl Query {
7589
&self,
7690
storage: &impl ObjectStorage,
7791
) -> Result<Vec<RecordBatch>, ExecuteError> {
92+
let dir = StorageDir::new(&self.stream_name);
93+
94+
// take a look at local dir and figure out what local cache we could use for this query
95+
let arrow_files: Vec<PathBuf> = dir
96+
.arrow_files()
97+
.into_iter()
98+
.filter(|path| path_intersects_query(path, self.start, self.end))
99+
.collect();
100+
101+
let possible_parquet_files = arrow_files.clone().into_iter().map(|mut path| {
102+
path.set_extension("parquet");
103+
path
104+
});
105+
106+
let parquet_files = dir
107+
.parquet_files()
108+
.into_iter()
109+
.filter(|path| path_intersects_query(path, self.start, self.end));
110+
111+
let parquet_files: Vec<PathBuf> = possible_parquet_files.chain(parquet_files).collect();
112+
78113
let mut results = vec![];
79-
storage.query(self, &mut results).await?;
80114

81-
// query cache only if end_time coulld have been after last sync.
82-
let duration_since = Utc::now() - self.end;
83-
if duration_since.num_seconds() < CONFIG.parseable.upload_interval as i64 {
84-
self.execute_on_cache(&mut results).await?;
115+
if !(arrow_files.is_empty() && parquet_files.is_empty()) {
116+
self.execute_on_cache(
117+
arrow_files,
118+
parquet_files,
119+
self.schema.clone(),
120+
&mut results,
121+
)
122+
.await?;
85123
}
86124

125+
storage.query(self, &mut results).await?;
87126
Ok(results)
88127
}
89128

90-
async fn execute_on_cache(&self, results: &mut Vec<RecordBatch>) -> Result<(), ExecuteError> {
129+
async fn execute_on_cache(
130+
&self,
131+
arrow_files: Vec<PathBuf>,
132+
parquet_files: Vec<PathBuf>,
133+
schema: Arc<Schema>,
134+
results: &mut Vec<RecordBatch>,
135+
) -> Result<(), ExecuteError> {
91136
let ctx = SessionContext::new();
92-
let file_format = ParquetFormat::default().with_enable_pruning(true);
137+
let table = Arc::new(QueryTableProvider::new(arrow_files, parquet_files, schema));
138+
ctx.register_table(
139+
&*self.stream_name,
140+
Arc::clone(&table) as Arc<dyn TableProvider>,
141+
)
142+
.map_err(ObjectStorageError::DataFusionError)?;
143+
// execute the query and collect results
144+
let df = ctx.sql(self.query.as_str()).await?;
145+
results.extend(df.collect().await?);
146+
table.remove_preserve();
147+
Ok(())
148+
}
149+
}
93150

94-
let listing_options = ListingOptions {
95-
file_extension: ".parquet".to_owned(),
96-
format: Arc::new(file_format),
97-
table_partition_cols: vec![],
98-
collect_stat: true,
99-
target_partitions: 1,
100-
};
151+
fn path_intersects_query(path: &Path, starttime: DateTime<Utc>, endtime: DateTime<Utc>) -> bool {
152+
let time = time_from_path(path);
153+
starttime <= time && time <= endtime
154+
}
101155

102-
let cache_path = CONFIG.parseable.get_cache_path(&self.stream_name);
156+
fn time_from_path(path: &Path) -> DateTime<Utc> {
157+
let prefix = path
158+
.file_name()
159+
.expect("all given path are file")
160+
.to_str()
161+
.expect("filename is valid");
162+
163+
// substring of filename i.e date=xxxx.hour=xx.minute=xx
164+
let prefix = &prefix[..33];
165+
Utc.datetime_from_str(prefix, "date=%F.hour=%H.minute=%M")
166+
.expect("valid prefix is parsed")
167+
}
103168

104-
let table_path = match ListingTableUrl::parse(
105-
cache_path.to_str().expect("path should is valid unicode"),
106-
) {
107-
Ok(table_path) => table_path,
108-
Err(e) => {
109-
log::warn!("could not parse local filesystem path. Maybe directory does not exist. Error {}", e);
110-
return Ok(());
111-
}
112-
};
169+
#[derive(Debug)]
170+
struct QueryTableProvider {
171+
arrow_files: Vec<PathBuf>,
172+
parquet_files: Vec<PathBuf>,
173+
schema: Arc<Schema>,
174+
}
113175

114-
let config = ListingTableConfig::new(table_path)
115-
.with_listing_options(listing_options)
116-
.with_schema(Arc::clone(&self.schema));
176+
impl QueryTableProvider {
177+
fn new(arrow_files: Vec<PathBuf>, parquet_files: Vec<PathBuf>, schema: Arc<Schema>) -> Self {
178+
// By the time this query executes the arrow files could be converted to parquet files
179+
// we want to preserve these files as well in case
117180

118-
let table = ListingTable::try_new(config)?;
181+
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
182+
for file in &parquet_files {
183+
parquet_cached.upsert(file)
184+
}
119185

120-
ctx.register_table(&*self.stream_name, Arc::new(table))
121-
.map_err(ObjectStorageError::DataFusionError)?;
186+
Self {
187+
arrow_files,
188+
parquet_files,
189+
schema,
190+
}
191+
}
122192

123-
// execute the query and collect results
124-
let df = ctx.sql(self.query.as_str()).await?;
125-
results.extend(df.collect().await?);
193+
pub fn remove_preserve(&self) {
194+
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
195+
for file in &self.parquet_files {
196+
parquet_cached.remove(file)
197+
}
198+
}
126199

127-
Ok(())
200+
pub async fn create_physical_plan(
201+
&self,
202+
ctx: &SessionState,
203+
projection: &Option<Vec<usize>>,
204+
filters: &[Expr],
205+
limit: Option<usize>,
206+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
207+
let mut mem_records: Vec<Vec<RecordBatch>> = Vec::new();
208+
let mut parquet_files = self.parquet_files.clone();
209+
for file in &self.arrow_files {
210+
let Ok(arrow_file) = File::open(file) else { continue; };
211+
let reader = StreamReader::try_new(arrow_file, None)?;
212+
let records = reader
213+
.filter_map(|record| match record {
214+
Ok(record) => Some(record),
215+
Err(e) => {
216+
log::warn!("warning from arrow stream {:?}", e);
217+
None
218+
}
219+
})
220+
.collect();
221+
mem_records.push(records);
222+
223+
let mut file = file.clone();
224+
file.set_extension("parquet");
225+
226+
parquet_files.retain(|p| p != &file)
227+
}
228+
229+
let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?;
230+
let memexec = memtable.scan(ctx, projection, filters, limit).await?;
231+
232+
if parquet_files.is_empty() {
233+
Ok(memexec)
234+
} else {
235+
let listing_options = ListingOptions {
236+
file_extension: ".parquet".to_owned(),
237+
format: Arc::new(ParquetFormat::default().with_enable_pruning(true)),
238+
table_partition_cols: vec![],
239+
collect_stat: true,
240+
target_partitions: 1,
241+
};
242+
243+
let paths = parquet_files
244+
.clone()
245+
.into_iter()
246+
.map(|path| {
247+
ListingTableUrl::parse(path.to_str().expect("path should is valid unicode"))
248+
.expect("path is valid for filesystem listing")
249+
})
250+
.collect();
251+
252+
let config = ListingTableConfig::new_with_multi_paths(paths)
253+
.with_listing_options(listing_options)
254+
.with_schema(Arc::clone(&self.schema));
255+
256+
let listtable = ListingTable::try_new(config).unwrap();
257+
let listexec = listtable.scan(ctx, projection, filters, limit).await?;
258+
259+
Ok(Arc::new(UnionExec::new(vec![memexec, listexec])))
260+
}
261+
}
262+
}
263+
264+
#[async_trait]
265+
impl TableProvider for QueryTableProvider {
266+
fn as_any(&self) -> &dyn Any {
267+
self
268+
}
269+
270+
fn schema(&self) -> SchemaRef {
271+
Arc::clone(&self.schema)
272+
}
273+
274+
fn table_type(&self) -> TableType {
275+
TableType::Base
276+
}
277+
278+
async fn scan(
279+
&self,
280+
ctx: &SessionState,
281+
projection: &Option<Vec<usize>>,
282+
filters: &[Expr],
283+
limit: Option<usize>,
284+
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
285+
self.create_physical_plan(ctx, projection, filters, limit)
286+
.await
128287
}
129288
}
130289

@@ -160,14 +319,23 @@ pub mod error {
160319

161320
#[cfg(test)]
162321
mod tests {
163-
use super::Query;
322+
use super::{time_from_path, Query};
164323
use crate::{alerts::Alerts, metadata::STREAM_INFO};
165324
use datafusion::arrow::datatypes::Schema;
166325
use datafusion::arrow::datatypes::{DataType, Field};
167326
use rstest::*;
168327
use serde_json::Value;
328+
use std::path::PathBuf;
169329
use std::str::FromStr;
170330

331+
#[test]
332+
fn test_time_from_parquet_path() {
333+
let path = PathBuf::from("date=2022-01-01.hour=00.minute=00.hostname.data.parquet");
334+
let time = time_from_path(path.as_path());
335+
assert_eq!(time.timestamp(), 1640995200);
336+
}
337+
338+
// Query prefix generation tests
171339
#[fixture]
172340
fn schema() -> Schema {
173341
let field_a = Field::new("a", DataType::Int64, false);

0 commit comments

Comments
 (0)