Skip to content

Commit 63b7ffa

Browse files
authored
Use register_table for execute on cache (#128)
Registering table with `register_listing_table` uses the listing feature of the object store to find the files to be processed. Internally this call `ListingTable::list_files_for_scan` in datafusion which panics when table path is empty. This code change is similar to how query works for s3. Also remove hostname from listing options, so querying files works properly on S3. This is for cases where one server posted data and another server is trying to query the data.
1 parent 2763cee commit 63b7ffa

File tree

3 files changed

+31
-17
lines changed

3 files changed

+31
-17
lines changed

server/src/main.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,6 @@ fn startup_sync() {
114114

115115
for stream in metadata::STREAM_INFO.list_streams() {
116116
let dir = StorageDir::new(stream.clone());
117-
// if data.records file is not present then skip this stream
118-
if !dir.local_data_exists() {
119-
continue;
120-
}
121117

122118
if let Err(e) = dir.create_temp_dir() {
123119
log::error!(
@@ -127,6 +123,12 @@ fn startup_sync() {
127123
);
128124
continue;
129125
}
126+
127+
// if data.records file is not present then skip this stream
128+
if !dir.local_data_exists() {
129+
continue;
130+
}
131+
130132
// create prefix for this file from its last modified time
131133
let path = dir.data_path.join("data.records");
132134

server/src/query.rs

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ use datafusion::arrow::datatypes::Schema;
2121
use datafusion::arrow::record_batch::RecordBatch;
2222
use datafusion::datasource::file_format::parquet::ParquetFormat;
2323
use datafusion::datasource::listing::ListingOptions;
24+
use datafusion::datasource::listing::ListingTable;
25+
use datafusion::datasource::listing::ListingTableConfig;
26+
use datafusion::datasource::listing::ListingTableUrl;
2427
use datafusion::prelude::*;
2528
use serde_json::Value;
2629
use std::sync::Arc;
@@ -29,6 +32,7 @@ use crate::metadata::STREAM_INFO;
2932
use crate::option::CONFIG;
3033
use crate::storage;
3134
use crate::storage::ObjectStorage;
35+
use crate::storage::ObjectStorageError;
3236
use crate::utils::TimePeriod;
3337
use crate::validator;
3438
use crate::Error;
@@ -102,17 +106,26 @@ impl Query {
102106
None => return Ok(()),
103107
};
104108

105-
ctx.register_listing_table(
106-
&self.stream_name,
107-
CONFIG
108-
.parseable
109-
.get_cache_path(&self.stream_name)
110-
.to_str()
111-
.unwrap(),
112-
listing_options,
113-
Some(schema),
114-
)
115-
.await?;
109+
let cache_path = CONFIG.parseable.get_cache_path(&self.stream_name);
110+
111+
let table_path = match ListingTableUrl::parse(
112+
cache_path.to_str().expect("path should is valid unicode"),
113+
) {
114+
Ok(table_path) => table_path,
115+
Err(e) => {
116+
log::warn!("could not parse local filesystem path. Maybe directory does not exist. Error {}", e);
117+
return Ok(());
118+
}
119+
};
120+
121+
let config = ListingTableConfig::new(table_path)
122+
.with_listing_options(listing_options)
123+
.with_schema(schema);
124+
125+
let table = ListingTable::try_new(config)?;
126+
127+
ctx.register_table(&*self.stream_name, Arc::new(table))
128+
.map_err(ObjectStorageError::DataFusionError)?;
116129

117130
// execute the query and collect results
118131
let df = ctx.sql(self.query.as_str()).await?;

server/src/s3.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use crate::metadata::Stats;
2929
use crate::option::{StorageOpt, CONFIG};
3030
use crate::query::Query;
3131
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError};
32-
use crate::utils::hostname_unchecked;
3332

3433
// Default object storage currently is DO Spaces bucket
3534
// Any user who starts the Parseable server with default configuration
@@ -427,7 +426,7 @@ impl ObjectStorage for S3 {
427426

428427
let file_format = ParquetFormat::default().with_enable_pruning(true);
429428
let listing_options = ListingOptions {
430-
file_extension: format!("{}.data.parquet", hostname_unchecked()),
429+
file_extension: ".data.parquet".to_string(),
431430
format: Arc::new(file_format),
432431
table_partition_cols: vec![],
433432
collect_stat: true,

0 commit comments

Comments
 (0)