Skip to content

Commit a7abb5d

Browse files
authored
Config datafusion query with schema (#99)
Instead of pre listing all the valid prefixes before query execution and letting datafusion infer the schema, it is better to pass the schema we already have for a given stream and let datafusion do all the heavy lifting. In case stream info does not have a schema for a given stream then we return early with Error suggesting they need to post events to this logstream first.
1 parent 849c92c commit a7abb5d

File tree

3 files changed

+62
-37
lines changed

3 files changed

+62
-37
lines changed

server/src/query.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818

1919
use chrono::{DateTime, Utc};
20+
use datafusion::arrow::datatypes::Schema;
2021
use datafusion::arrow::record_batch::RecordBatch;
2122
use datafusion::datasource::file_format::parquet::ParquetFormat;
2223
use datafusion::datasource::listing::ListingOptions;
@@ -44,6 +45,7 @@ fn get_value<'a>(value: &'a Value, key: &'static str) -> Result<&'a str, Error>
4445
pub struct Query {
4546
pub query: String,
4647
pub stream_name: String,
48+
pub schema: Arc<Schema>,
4749
pub start: DateTime<Utc>,
4850
pub end: DateTime<Utc>,
4951
}
@@ -123,10 +125,25 @@ impl Query {
123125
#[cfg(test)]
124126
mod tests {
125127
use super::Query;
128+
use crate::{alerts::Alerts, metadata::STREAM_INFO};
129+
use datafusion::arrow::datatypes::Schema;
130+
use datafusion::arrow::datatypes::{DataType, Field};
126131
use rstest::*;
127132
use serde_json::Value;
128133
use std::str::FromStr;
129134

135+
#[fixture]
136+
fn schema() -> Schema {
137+
let field_a = Field::new("a", DataType::Int64, false);
138+
let field_b = Field::new("b", DataType::Boolean, false);
139+
Schema::new(vec![field_a, field_b])
140+
}
141+
142+
fn clear_map() {
143+
STREAM_INFO.write().unwrap().clear();
144+
}
145+
146+
// A query can only be performed on streams with a valid schema
130147
#[rstest]
131148
#[case(
132149
r#"{
@@ -144,12 +161,38 @@ mod tests {
144161
}"#,
145162
&["stream_name/date=2022-10-15/hour=10/minute=00/", "stream_name/date=2022-10-15/hour=10/minute=01/"]
146163
)]
147-
fn query_parse_prefix(#[case] prefix: &str, #[case] right: &[&str]) {
164+
#[serial_test::serial]
165+
fn query_parse_prefix_with_some_schema(#[case] prefix: &str, #[case] right: &[&str]) {
166+
clear_map();
167+
STREAM_INFO
168+
.add_stream("stream_name".to_string(), Some(schema()), Alerts::default())
169+
.unwrap();
170+
148171
let query = Value::from_str(prefix).unwrap();
149172
let query = Query::parse(query).unwrap();
150173
assert_eq!(&query.stream_name, "stream_name");
151174
let prefixes = query.get_prefixes();
152175
let left = prefixes.iter().map(String::as_str).collect::<Vec<&str>>();
153176
assert_eq!(left.as_slice(), right);
154177
}
178+
179+
// If there is no schema for this stream then parsing a Query should fail
180+
#[rstest]
181+
#[case(
182+
r#"{
183+
"query": "SELECT * FROM stream_name",
184+
"startTime": "2022-10-15T10:00:00+00:00",
185+
"endTime": "2022-10-15T10:01:00+00:00"
186+
}"#
187+
)]
188+
#[serial_test::serial]
189+
fn query_parse_prefix_with_no_schema(#[case] prefix: &str) {
190+
clear_map();
191+
STREAM_INFO
192+
.add_stream("stream_name".to_string(), None, Alerts::default())
193+
.unwrap();
194+
195+
let query = Value::from_str(prefix).unwrap();
196+
assert!(Query::parse(query).is_err());
197+
}
155198
}

server/src/s3.rs

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ const S3_URL_ENV_VAR: &str = "P_S3_URL";
4444

4545
// max concurrent request allowed for datafusion object store
4646
const MAX_OBJECT_STORE_REQUESTS: usize = 1000;
47-
// max concurrent request allowed for prefix checks during listing
48-
const MAX_CONCURRENT_PREFIX_CHECK: usize = 1000;
4947

5048
lazy_static::lazy_static! {
5149
#[derive(Debug)]
@@ -276,6 +274,7 @@ impl S3 {
276274
Ok(body_bytes)
277275
}
278276

277+
#[allow(dead_code)]
279278
async fn prefix_exists(&self, prefix: &str) -> Result<bool, AwsSdkError> {
280279
// TODO check if head object is faster compared to
281280
// list objects
@@ -418,42 +417,15 @@ impl ObjectStorage for S3 {
418417
SessionContext::with_config_rt(SessionConfig::default(), Arc::clone(&STORAGE_RUNTIME));
419418

420419
// Get all prefix paths and convert them into futures which yeilds ListingTableUrl
421-
let handles = query.get_prefixes().into_iter().map(|prefix| async move {
422-
let t: Result<Option<ListingTableUrl>, ObjectStorageError> = {
423-
if self.prefix_exists(&prefix).await? {
424-
let path = format!("s3://{}/{}", &S3_CONFIG.s3_bucket_name, prefix);
425-
Ok(Some(ListingTableUrl::parse(path)?))
426-
} else {
427-
Ok(None)
428-
}
429-
};
430-
t
431-
});
432-
433-
// Poll futures but limit them to concurrency of MAX_CONCURRENT_PREFIX_CHECK
434-
let list: Vec<_> = futures::stream::iter(handles)
435-
.buffer_unordered(MAX_CONCURRENT_PREFIX_CHECK)
436-
.collect()
437-
.await;
438-
439-
// Collect all available prefixes
440-
let prefixes: Vec<_> = list
420+
let prefixes = query
421+
.get_prefixes()
441422
.into_iter()
442-
.filter_map(|fetch| {
443-
fetch
444-
.map_err(|err| {
445-
log::error!("prefix lookup failed due to {}", err);
446-
err
447-
})
448-
.ok()
423+
.map(|prefix| {
424+
let path = format!("s3://{}/{}", &S3_CONFIG.s3_bucket_name, prefix);
425+
ListingTableUrl::parse(path).unwrap()
449426
})
450-
.flatten()
451427
.collect();
452428

453-
if prefixes.is_empty() {
454-
return Ok(());
455-
}
456-
457429
let file_format = ParquetFormat::default().with_enable_pruning(true);
458430
let listing_options = ListingOptions {
459431
file_extension: format!("{}.data.parquet", hostname_unchecked()),
@@ -465,8 +437,7 @@ impl ObjectStorage for S3 {
465437

466438
let config = ListingTableConfig::new_with_multi_paths(prefixes)
467439
.with_listing_options(listing_options)
468-
.infer(&ctx.state())
469-
.await?;
440+
.with_schema(Arc::clone(&query.schema));
470441

471442
let table = ListingTable::try_new(config)?;
472443
ctx.register_table(query.stream_name.as_str(), Arc::new(table))?;

server/src/validator.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
*
1717
*/
1818

19+
use std::sync::Arc;
20+
1921
use chrono::{DateTime, Utc};
2022
use serde_json::json;
2123

2224
use crate::alerts::Alerts;
25+
use crate::metadata::STREAM_INFO;
2326
use crate::query::Query;
2427
use crate::Error;
2528

@@ -133,10 +136,18 @@ pub fn query(query: &str, start_time: &str, end_time: &str) -> Result<Query, Err
133136
return Err(Error::StartTimeAfterEndTime());
134137
}
135138

139+
let stream_name = tokens[stream_name_index].to_string();
140+
141+
let schema = match STREAM_INFO.schema(&stream_name)? {
142+
Some(schema) => Arc::new(schema),
143+
None => return Err(Error::MissingRecord),
144+
};
145+
136146
Ok(Query {
137147
stream_name: tokens[stream_name_index].to_string(),
138148
start,
139149
end,
140150
query: query.to_string(),
151+
schema,
141152
})
142153
}

0 commit comments

Comments
 (0)