Skip to content

Commit 347f7ed

Browse files
authored
Disable query for stage data (#326)
This PR disables the query flow to query staged data, since current flow allocates too much memory to load everything in RAM. This is temporary, and we'll enable this again as we fix the memory issue. With this change, when a query is run, users will see only the data that is persisted in backend store (and not the staging area data)
1 parent 4c501a4 commit 347f7ed

File tree

4 files changed

+13
-36
lines changed

4 files changed

+13
-36
lines changed

server/src/query.rs

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,16 @@ use datafusion::datasource::TableProvider;
2626
use datafusion::prelude::*;
2727
use itertools::Itertools;
2828
use serde_json::Value;
29-
use std::collections::hash_map::RandomState;
30-
use std::collections::HashSet;
31-
use std::path::{Path, PathBuf};
29+
use std::path::Path;
3230
use std::sync::Arc;
3331

3432
use crate::option::CONFIG;
3533
use crate::storage::ObjectStorageError;
36-
use crate::storage::StorageDir;
3734
use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY};
3835
use crate::utils::TimePeriod;
3936
use crate::validator;
4037

4138
use self::error::{ExecuteError, ParseError};
42-
use table_provider::QueryTableProvider;
4339

4440
type Key = &'static str;
4541
fn get_value(value: &Value, key: Key) -> Result<&str, Key> {
@@ -89,41 +85,18 @@ impl Query {
8985
&self,
9086
storage: Arc<dyn ObjectStorage + Send>,
9187
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
92-
let dir = StorageDir::new(&self.stream_name);
93-
// take a look at local dir and figure out what local cache we could use for this query
94-
let staging_arrows = dir
95-
.arrow_files_grouped_by_time()
96-
.into_iter()
97-
.filter(|(path, _)| path_intersects_query(path, self.start, self.end))
98-
.sorted_by(|(a, _), (b, _)| Ord::cmp(a, b))
99-
.collect_vec();
100-
101-
let staging_parquet_set: HashSet<&PathBuf, RandomState> =
102-
HashSet::from_iter(staging_arrows.iter().map(|(p, _)| p));
103-
104-
let other_staging_parquet = dir
105-
.parquet_files()
106-
.into_iter()
107-
.filter(|path| path_intersects_query(path, self.start, self.end))
108-
.filter(|path| !staging_parquet_set.contains(path))
109-
.collect_vec();
110-
11188
let ctx = SessionContext::with_config_rt(
11289
SessionConfig::default(),
11390
CONFIG.storage().get_datafusion_runtime(),
11491
);
11592

116-
let table = Arc::new(QueryTableProvider::new(
117-
staging_arrows,
118-
other_staging_parquet,
119-
self.get_prefixes(),
120-
storage,
121-
Arc::new(self.get_schema().clone()),
122-
));
93+
let Some(table) = storage.query_table(self.get_prefixes(), Arc::new(self.get_schema().clone()))? else {
94+
return Ok((Vec::new(), Vec::new()));
95+
};
12396

12497
ctx.register_table(
12598
&*self.stream_name,
126-
Arc::clone(&table) as Arc<dyn TableProvider>,
99+
Arc::new(table) as Arc<dyn TableProvider>,
127100
)
128101
.map_err(ObjectStorageError::DataFusionError)?;
129102
// execute the query and collect results
@@ -144,11 +117,13 @@ impl Query {
144117
}
145118
}
146119

120+
#[allow(unused)]
147121
fn path_intersects_query(path: &Path, starttime: DateTime<Utc>, endtime: DateTime<Utc>) -> bool {
148122
let time = time_from_path(path);
149123
starttime <= time && time <= endtime
150124
}
151125

126+
#[allow(unused)]
152127
fn time_from_path(path: &Path) -> DateTime<Utc> {
153128
let prefix = path
154129
.file_name()

server/src/query/table_provider.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*
1717
*/
1818

19+
#![allow(unused)]
20+
1921
use async_trait::async_trait;
2022
use datafusion::arrow::datatypes::{Schema, SchemaRef};
2123
use datafusion::arrow::ipc::reader::StreamReader;
@@ -92,6 +94,7 @@ impl QueryTableProvider {
9294
parquet_files.push(staging_parquet.clone())
9395
}
9496
}
97+
9598
parquet_files.extend(self.other_staging_parquet.clone());
9699

97100
let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?;

server/src/storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ impl StorageDir {
248248
paths
249249
}
250250

251+
#[allow(unused)]
251252
pub fn arrow_files_grouped_by_time(&self) -> HashMap<PathBuf, Vec<PathBuf>> {
252253
// hashmap <time, vec[paths]>
253254
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();

server/src/storage/object_storage.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -389,10 +389,7 @@ impl MergedRecordReader {
389389
}
390390

391391
pub fn merged_iter(self, schema: &Schema) -> impl Iterator<Item = RecordBatch> + '_ {
392-
let adapted_readers = self
393-
.readers
394-
.into_iter()
395-
.map(move |reader| reader.flatten().map(|batch| adapt_batch(schema, batch)));
392+
let adapted_readers = self.readers.into_iter().map(move |reader| reader.flatten());
396393

397394
kmerge_by(adapted_readers, |a: &RecordBatch, b: &RecordBatch| {
398395
let a: &TimestampMillisecondArray = a
@@ -409,6 +406,7 @@ impl MergedRecordReader {
409406

410407
a.value(0) < b.value(0)
411408
})
409+
.map(|batch| adapt_batch(schema, batch))
412410
}
413411

414412
pub fn merged_schema(&self) -> Schema {

0 commit comments

Comments
 (0)