Skip to content

Commit 3f034de

Browse files
nitishttrueleo
andauthored
Migrate to datafusion 11.0 (#65)
This adds multiple improvements - Datafusion querying multiple prefixes - Avoid dependency on external plugins for S3 connection. - Ensure using complete file path inside each prefix (added in #55) to avoid listing calls Co-authored-by: Satyam Singh <[email protected]>
1 parent 7fe14b7 commit 3f034de

File tree

3 files changed

+117
-50
lines changed

3 files changed

+117
-50
lines changed

server/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ aws-types = "0.47"
2323
bytes = "1"
2424
chrono = "0.4.19"
2525
crossterm = "0.23.2"
26-
datafusion = "8.0"
27-
datafusion-objectstore-s3 = { git = "https://github.com/parseablehq/datafusion-objectstore-s3" }
26+
datafusion = "11.0"
27+
object_store = { version = "0.4", features=["aws"] }
2828
derive_more = "0.99.17"
2929
env_logger = "0.9.0"
3030
futures = "0.3"

server/src/s3.rs

Lines changed: 113 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,41 +4,80 @@ use aws_sdk_s3::model::{Delete, ObjectIdentifier};
44
use aws_sdk_s3::types::{ByteStream, SdkError};
55
use aws_sdk_s3::Error as AwsSdkError;
66
use aws_sdk_s3::{Client, Credentials, Endpoint, Region};
7-
use aws_types::credentials::SharedCredentialsProvider;
87
use bytes::Bytes;
98
use crossterm::style::Stylize;
109
use datafusion::arrow::record_batch::RecordBatch;
11-
use datafusion::datasource::listing::{ListingTable, ListingTableConfig};
12-
use datafusion::prelude::SessionContext;
13-
use datafusion_objectstore_s3::object_store::s3::S3FileSystem;
10+
use datafusion::datasource::file_format::parquet::ParquetFormat;
11+
use datafusion::datasource::listing::{
12+
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
13+
};
14+
use datafusion::datasource::object_store::ObjectStoreRegistry;
15+
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
16+
use datafusion::prelude::{SessionConfig, SessionContext};
17+
use futures::StreamExt;
1418
use http::Uri;
19+
use object_store::aws::AmazonS3Builder;
20+
use object_store::limit::LimitStore;
1521
use std::collections::HashSet;
1622
use std::fs;
1723
use std::iter::Iterator;
1824
use std::sync::Arc;
1925
use structopt::StructOpt;
20-
use tokio_stream::StreamExt;
2126

2227
use crate::alerts::Alerts;
2328
use crate::metadata::Stats;
2429
use crate::option::{StorageOpt, CONFIG};
2530
use crate::query::Query;
2631
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError};
32+
use crate::utils::hostname_unchecked;
2733

2834
// Default object storage currently is DO Spaces bucket
2935
// Any user who starts the Parseable server with default configuration
3036
// will point to this bucket and will see any data present on this bucket
31-
const DEFAULT_S3_URL: &str = "https://sgp1.digitaloceanspaces.com";
32-
const DEFAULT_S3_REGION: &str = "sgp1";
37+
const DEFAULT_S3_URL: &str = "https://minio.parseable.io:9000";
38+
const DEFAULT_S3_REGION: &str = "us-east-1";
3339
const DEFAULT_S3_BUCKET: &str = "parseable";
34-
const DEFAULT_S3_ACCESS_KEY: &str = "DO00YF68WC2P3QUAM82K";
35-
const DEFAULT_S3_SECRET_KEY: &str = "Ov6D7DvM6NHlyU4W2ajrHhRnT4IVRqKxExLPhekNIKw";
40+
const DEFAULT_S3_ACCESS_KEY: &str = "minioadmin";
41+
const DEFAULT_S3_SECRET_KEY: &str = "minioadmin";
3642

3743
const S3_URL_ENV_VAR: &str = "P_S3_URL";
3844

45+
// max concurrent request allowed for datafusion object store
46+
const MAX_OBJECT_STORE_REQUESTS: usize = 1000;
47+
// max concurrent request allowed for prefix checks during listing
48+
const MAX_CONCURRENT_PREFIX_CHECK: usize = 1000;
49+
3950
lazy_static::lazy_static! {
4051
#[derive(Debug)]
4152
pub static ref S3_CONFIG: Arc<S3Config> = Arc::new(S3Config::from_args());
53+
54+
// runtime to be used in query session
55+
pub static ref STORAGE_RUNTIME: Arc<RuntimeEnv> = {
56+
57+
let s3 = AmazonS3Builder::new()
58+
.with_region(&S3_CONFIG.s3_default_region)
59+
.with_endpoint(&S3_CONFIG.s3_endpoint_url)
60+
.with_bucket_name(&S3_CONFIG.s3_bucket_name)
61+
.with_access_key_id(&S3_CONFIG.s3_access_key_id)
62+
.with_secret_access_key(&S3_CONFIG.s3_secret_key)
63+
// allow http for local instances
64+
.with_allow_http(true)
65+
.build()
66+
.unwrap();
67+
68+
// limit objectstore to a concurrent request limit
69+
let s3 = LimitStore::new(s3, MAX_OBJECT_STORE_REQUESTS);
70+
71+
let object_store_registry = ObjectStoreRegistry::new();
72+
object_store_registry.register_store("s3", &S3_CONFIG.s3_bucket_name, Arc::new(s3));
73+
74+
let config = RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry));
75+
76+
let runtime = RuntimeEnv::new(config).unwrap();
77+
78+
Arc::new(runtime)
79+
80+
};
4281
}
4382

4483
#[derive(Debug, Clone, StructOpt)]
@@ -125,22 +164,21 @@ impl S3Options {
125164
}
126165

127166
pub struct S3 {
128-
options: S3Options,
129167
client: aws_sdk_s3::Client,
130168
}
131169

132170
impl S3 {
133171
pub fn new() -> Self {
134172
let options = S3Options::new();
135173
let config = aws_sdk_s3::Config::builder()
136-
.region(options.region.clone())
137-
.endpoint_resolver(options.endpoint.clone())
138-
.credentials_provider(options.creds.clone())
174+
.region(options.region)
175+
.endpoint_resolver(options.endpoint)
176+
.credentials_provider(options.creds)
139177
.build();
140178

141179
let client = Client::from_conf(config);
142180

143-
Self { options, client }
181+
Self { client }
144182
}
145183

146184
async fn _put_schema(&self, stream_name: String, body: String) -> Result<(), AwsSdkError> {
@@ -376,45 +414,74 @@ impl ObjectStorage for S3 {
376414
query: &Query,
377415
results: &mut Vec<RecordBatch>,
378416
) -> Result<(), ObjectStorageError> {
379-
let s3_file_system = Arc::new(
380-
S3FileSystem::new(
381-
Some(SharedCredentialsProvider::new(self.options.creds.clone())),
382-
Some(self.options.region.clone()),
383-
Some(self.options.endpoint.clone()),
384-
None,
385-
None,
386-
None,
387-
)
388-
.await,
389-
);
390-
391-
for prefix in query.get_prefixes() {
392-
let ctx = SessionContext::new();
393-
let path = format!("s3://{}/{}", &S3_CONFIG.s3_bucket_name, prefix);
394-
395-
if !self.prefix_exists(&prefix).await? {
396-
continue;
397-
}
417+
let ctx =
418+
SessionContext::with_config_rt(SessionConfig::default(), Arc::clone(&STORAGE_RUNTIME));
419+
420+
// Get all prefix paths and convert them into futures which yeilds ListingTableUrl
421+
let handles = query.get_prefixes().into_iter().map(|prefix| async move {
422+
let t: Result<Option<ListingTableUrl>, ObjectStorageError> = {
423+
if self.prefix_exists(&prefix).await? {
424+
let path = format!("s3://{}/{}", &S3_CONFIG.s3_bucket_name, prefix);
425+
Ok(Some(ListingTableUrl::parse(path)?))
426+
} else {
427+
Ok(None)
428+
}
429+
};
430+
t
431+
});
432+
433+
// Poll futures but limit them to concurrency of MAX_CONCURRENT_PREFIX_CHECK
434+
let list: Vec<_> = futures::stream::iter(handles)
435+
.buffer_unordered(MAX_CONCURRENT_PREFIX_CHECK)
436+
.collect()
437+
.await;
438+
439+
// Collect all available prefixes
440+
let prefixes: Vec<_> = list
441+
.into_iter()
442+
.filter_map(|fetch| {
443+
fetch
444+
.map_err(|err| {
445+
log::error!("prefix lookup failed due to {}", err);
446+
err
447+
})
448+
.ok()
449+
})
450+
.flatten()
451+
.collect();
452+
453+
if prefixes.is_empty() {
454+
return Ok(());
455+
}
398456

399-
let config = ListingTableConfig::new(s3_file_system.clone(), &path)
400-
.infer()
401-
.await?;
457+
let file_format = ParquetFormat::default().with_enable_pruning(true);
458+
let listing_options = ListingOptions {
459+
file_extension: format!("{}.data.parquet", hostname_unchecked()),
460+
format: Arc::new(file_format),
461+
table_partition_cols: vec![],
462+
collect_stat: true,
463+
target_partitions: 1,
464+
};
465+
466+
let config = ListingTableConfig::new_with_multi_paths(prefixes)
467+
.with_listing_options(listing_options)
468+
.infer(&ctx.state())
469+
.await?;
402470

403-
let table = ListingTable::try_new(config)?;
404-
ctx.register_table(query.stream_name.as_str(), Arc::new(table))?;
471+
let table = ListingTable::try_new(config)?;
472+
ctx.register_table(query.stream_name.as_str(), Arc::new(table))?;
405473

406-
// execute the query and collect results
407-
let df = ctx.sql(query.query.as_str()).await?;
408-
results.extend(df.collect().await?);
409-
}
474+
// execute the query and collect results
475+
let df = ctx.sql(&query.query).await?;
476+
results.extend(df.collect().await?);
410477

411478
Ok(())
412479
}
413480
}
414481

415482
impl From<AwsSdkError> for ObjectStorageError {
416483
fn from(error: AwsSdkError) -> Self {
417-
ObjectStorageError::UnhandledError(error.into())
484+
ObjectStorageError::UnhandledError(Box::new(error))
418485
}
419486
}
420487

@@ -429,15 +496,15 @@ impl From<SdkError<HeadBucketError>> for ObjectStorageError {
429496
},
430497
..
431498
} => ObjectStorageError::NoSuchBucket(S3_CONFIG.bucket_name().to_string()),
432-
SdkError::DispatchFailure(err) => ObjectStorageError::ConnectionError(err.into()),
499+
SdkError::DispatchFailure(err) => ObjectStorageError::ConnectionError(Box::new(err)),
433500
SdkError::TimeoutError(err) => ObjectStorageError::ConnectionError(err),
434-
err => ObjectStorageError::UnhandledError(err.into()),
501+
err => ObjectStorageError::UnhandledError(Box::new(err)),
435502
}
436503
}
437504
}
438505

439506
impl From<serde_json::Error> for ObjectStorageError {
440507
fn from(error: serde_json::Error) -> Self {
441-
ObjectStorageError::UnhandledError(error.into())
508+
ObjectStorageError::UnhandledError(Box::new(error))
442509
}
443510
}

server/src/storage.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,13 @@ pub enum ObjectStorageError {
209209
#[error("Bucket {0} not found")]
210210
NoSuchBucket(String),
211211
#[error("Connection Error: {0}")]
212-
ConnectionError(Box<dyn std::error::Error>),
212+
ConnectionError(Box<dyn std::error::Error + Send + 'static>),
213213
#[error("IO Error: {0}")]
214214
IoError(#[from] std::io::Error),
215215
#[error("DataFusion Error: {0}")]
216216
DataFusionError(#[from] datafusion::error::DataFusionError),
217217
#[error("Unhandled Error: {0}")]
218-
UnhandledError(Box<dyn std::error::Error>),
218+
UnhandledError(Box<dyn std::error::Error + Send + 'static>),
219219
}
220220

221221
impl From<ObjectStorageError> for crate::error::Error {

0 commit comments

Comments
 (0)