Skip to content

Commit 63ed836

Browse files
authored
Fix staging parquet file checks in sync and query (#263)
Fix staging file checks * Move remove link procedure to Drop impl for table provider because it always has to run on exit. * If any parquet file path is not in File table during lookup then we treat it as if it is already uploaded and skip past it * Fixed possibility of double upsert when generating possible parquet paths for query. fixes #258
1 parent bfd7489 commit 63ed836

File tree

4 files changed

+44
-19
lines changed

4 files changed

+44
-19
lines changed

server/src/query.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use datafusion::arrow::record_batch::RecordBatch;
2525
use datafusion::datasource::TableProvider;
2626
use datafusion::prelude::*;
2727
use serde_json::Value;
28+
use std::collections::HashSet;
2829
use std::path::Path;
2930
use std::path::PathBuf;
3031
use std::sync::Arc;
@@ -86,7 +87,7 @@ impl Query {
8687
.filter(|path| path_intersects_query(path, self.start, self.end))
8788
.collect();
8889

89-
let possible_parquet_files = arrow_files.clone().into_iter().map(|mut path| {
90+
let possible_parquet_files = arrow_files.iter().cloned().map(|mut path| {
9091
path.set_extension("parquet");
9192
path
9293
});
@@ -96,12 +97,14 @@ impl Query {
9697
.into_iter()
9798
.filter(|path| path_intersects_query(path, self.start, self.end));
9899

99-
let parquet_files: Vec<PathBuf> = possible_parquet_files.chain(parquet_files).collect();
100+
let parquet_files: HashSet<PathBuf> = possible_parquet_files.chain(parquet_files).collect();
101+
let parquet_files = Vec::from_iter(parquet_files.into_iter());
100102

101103
let ctx = SessionContext::with_config_rt(
102104
SessionConfig::default(),
103105
CONFIG.storage().get_datafusion_runtime(),
104106
);
107+
105108
let table = Arc::new(QueryTableProvider::new(
106109
arrow_files,
107110
parquet_files,
@@ -116,7 +119,6 @@ impl Query {
116119
// execute the query and collect results
117120
let df = ctx.sql(self.query.as_str()).await?;
118121
let results = df.collect().await?;
119-
table.remove_preserve();
120122
Ok(results)
121123
}
122124
}

server/src/query/table_provider.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,6 @@ impl QueryTableProvider {
6767
}
6868
}
6969

70-
pub fn remove_preserve(&self) {
71-
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
72-
for file in &self.parquet_files {
73-
parquet_cached.remove(file)
74-
}
75-
}
76-
7770
async fn create_physical_plan(
7871
&self,
7972
ctx: &SessionState,
@@ -111,6 +104,15 @@ impl QueryTableProvider {
111104
}
112105
}
113106

107+
impl Drop for QueryTableProvider {
108+
fn drop(&mut self) {
109+
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
110+
for file in &self.parquet_files {
111+
parquet_cached.remove(file)
112+
}
113+
}
114+
}
115+
114116
#[async_trait]
115117
impl TableProvider for QueryTableProvider {
116118
fn as_any(&self) -> &dyn Any {

server/src/storage/file_link.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl<L: Link + Default> FileTable<L> {
9797
}
9898
}
9999

100-
pub fn get_mut(&mut self, path: &Path) -> &mut L {
101-
self.inner.get_mut(path).unwrap()
100+
pub fn get_mut(&mut self, path: &Path) -> Option<&mut L> {
101+
self.inner.get_mut(path)
102102
}
103103
}

server/src/storage/object_storage.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,12 @@ pub trait ObjectStorage: Sync + 'static {
229229
}
230230

231231
for file in dir.parquet_files() {
232-
let metadata = CACHED_FILES.lock().unwrap().get_mut(&file).metadata;
232+
let Some(metadata) = CACHED_FILES
233+
.lock()
234+
.unwrap()
235+
.get_mut(&file)
236+
.map(|fl| fl.metadata) else { continue };
237+
233238
if metadata != CacheState::Idle {
234239
continue;
235240
}
@@ -241,20 +246,36 @@ pub trait ObjectStorage: Sync + 'static {
241246
.expect("filename is valid string");
242247
let file_suffix = str::replacen(filename, ".", "/", 3);
243248
let objectstore_path = format!("{}/{}", stream, file_suffix);
249+
244250
CACHED_FILES
245251
.lock()
246252
.unwrap()
247253
.get_mut(&file)
254+
.expect("entry checked at the start")
248255
.set_metadata(CacheState::Uploading);
249256

250257
let compressed_size = file.metadata().map_or(0, |meta| meta.len());
251258

252-
let _put_parquet_file = self.upload_file(&objectstore_path, &file).await?;
253-
CACHED_FILES
254-
.lock()
255-
.unwrap()
256-
.get_mut(&file)
257-
.set_metadata(CacheState::Uploaded);
259+
match self.upload_file(&objectstore_path, &file).await {
260+
Ok(()) => {
261+
CACHED_FILES
262+
.lock()
263+
.unwrap()
264+
.get_mut(&file)
265+
.expect("entry checked at the start")
266+
.set_metadata(CacheState::Uploaded);
267+
}
268+
Err(e) => {
269+
CACHED_FILES
270+
.lock()
271+
.unwrap()
272+
.get_mut(&file)
273+
.expect("entry checked at the start")
274+
.set_metadata(CacheState::Idle);
275+
276+
return Err(e.into());
277+
}
278+
}
258279

259280
stream_stats
260281
.entry(stream)

0 commit comments

Comments
 (0)