Skip to content
Merged
29 changes: 15 additions & 14 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ use datafusion::datasource::listing::{
};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::{SessionState, SessionStateBuilder};
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;

use crate::util::{print_memory_stats, BenchmarkRun, CommonOpt, QueryResult};
use datafusion_distributed::test_utils::localhost::start_localhost_context;
use datafusion_distributed::{DistributedPhysicalOptimizerRule, SessionBuilder};
use datafusion_distributed::{
DistributedPhysicalOptimizerRule, DistributedSessionBuilder, DistributedSessionBuilderContext,
};
use log::info;
use structopt::StructOpt;

Expand Down Expand Up @@ -110,11 +112,13 @@ pub struct RunOpt {
}

#[async_trait]
impl SessionBuilder for RunOpt {
fn session_state_builder(
impl DistributedSessionBuilder for RunOpt {
async fn build_session_state(
&self,
mut builder: SessionStateBuilder,
) -> Result<SessionStateBuilder, DataFusionError> {
_ctx: DistributedSessionBuilderContext,
) -> Result<SessionState, DataFusionError> {
let mut builder = SessionStateBuilder::new().with_default_features();

let mut config = self
.common
.config()?
Expand Down Expand Up @@ -145,17 +149,14 @@ impl SessionBuilder for RunOpt {
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
}

Ok(builder
let state = builder
.with_config(config)
.with_runtime_env(rt_builder.build_arc()?))
}
.with_runtime_env(rt_builder.build_arc()?)
.build();

async fn session_context(
&self,
ctx: SessionContext,
) -> std::result::Result<SessionContext, DataFusionError> {
let ctx = SessionContext::from(state);
self.register_tables(&ctx).await?;
Ok(ctx)
Ok(ctx.state())
}
}

Expand Down
1 change: 1 addition & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
#[allow(unused)]
pub mod ttl_map;
pub mod util;
6 changes: 3 additions & 3 deletions src/common/ttl_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ where
shard.insert(key);
}
BucketOp::Clear => {
let keys_to_delete = std::mem::replace(&mut shard, HashSet::new());
let keys_to_delete = std::mem::take(&mut shard);
for key in keys_to_delete {
data.remove(&key);
}
Expand Down Expand Up @@ -252,14 +252,14 @@ where

/// run_gc_loop will continuously clear expired entries from the map, checking every `period`. The
/// function terminates if `shutdown` is signalled.
async fn run_gc_loop(time: Arc<AtomicU64>, period: Duration, buckets: &Vec<Bucket<K>>) {
async fn run_gc_loop(time: Arc<AtomicU64>, period: Duration, buckets: &[Bucket<K>]) {
loop {
tokio::time::sleep(period).await;
Self::gc(time.clone(), buckets);
}
}

fn gc(time: Arc<AtomicU64>, buckets: &Vec<Bucket<K>>) {
fn gc(time: Arc<AtomicU64>, buckets: &[Bucket<K>]) {
let index = time.load(std::sync::atomic::Ordering::SeqCst) % buckets.len() as u64;
buckets[index as usize].clear();
time.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Expand Down
105 changes: 50 additions & 55 deletions src/config_extension_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ pub trait ConfigExtensionExt {
/// # use async_trait::async_trait;
/// # use datafusion::common::{extensions_options, DataFusionError};
/// # use datafusion::config::ConfigExtension;
/// # use datafusion::execution::SessionState;
/// # use datafusion::execution::{SessionState, SessionStateBuilder};
/// # use datafusion::prelude::SessionConfig;
/// # use datafusion_distributed::{ConfigExtensionExt, SessionBuilder};
/// # use datafusion_distributed::{ConfigExtensionExt, DistributedSessionBuilder, DistributedSessionBuilderContext};
///
/// extensions_options! {
/// pub struct CustomExtension {
Expand All @@ -52,11 +52,13 @@ pub trait ConfigExtensionExt {
/// struct MyCustomSessionBuilder;
///
/// #[async_trait]
/// impl SessionBuilder for MyCustomSessionBuilder {
/// async fn session_state(&self, mut state: SessionState) -> Result<SessionState, DataFusionError> {
/// impl DistributedSessionBuilder for MyCustomSessionBuilder {
/// async fn build_session_state(&self, ctx: DistributedSessionBuilderContext) -> Result<SessionState, DataFusionError> {
/// let mut state = SessionStateBuilder::new().build();
///
/// // while providing this MyCustomSessionBuilder to an Arrow Flight endpoint, it will
/// // know how to deserialize the CustomExtension from the gRPC metadata.
/// state.retrieve_distributed_option_extension::<CustomExtension>()?;
/// state.retrieve_distributed_option_extension::<CustomExtension>(&ctx.headers)?;
/// Ok(state)
/// }
/// }
Expand All @@ -76,9 +78,9 @@ pub trait ConfigExtensionExt {
/// # use async_trait::async_trait;
/// # use datafusion::common::{extensions_options, DataFusionError};
/// # use datafusion::config::ConfigExtension;
/// # use datafusion::execution::SessionState;
/// # use datafusion::execution::{SessionState, SessionStateBuilder};
/// # use datafusion::prelude::SessionConfig;
/// # use datafusion_distributed::{ConfigExtensionExt, SessionBuilder};
/// # use datafusion_distributed::{ConfigExtensionExt, DistributedSessionBuilder, DistributedSessionBuilderContext};
///
/// extensions_options! {
/// pub struct CustomExtension {
Expand All @@ -101,17 +103,19 @@ pub trait ConfigExtensionExt {
/// struct MyCustomSessionBuilder;
///
/// #[async_trait]
/// impl SessionBuilder for MyCustomSessionBuilder {
/// async fn session_state(&self, mut state: SessionState) -> Result<SessionState, DataFusionError> {
/// impl DistributedSessionBuilder for MyCustomSessionBuilder {
/// async fn build_session_state(&self, ctx: DistributedSessionBuilderContext) -> Result<SessionState, DataFusionError> {
/// let mut state = SessionStateBuilder::new().build();
/// // while providing this MyCustomSessionBuilder to an Arrow Flight endpoint, it will
/// // know how to deserialize the CustomExtension from the gRPC metadata.
/// state.retrieve_distributed_option_extension::<CustomExtension>()?;
/// state.retrieve_distributed_option_extension::<CustomExtension>(&ctx.headers)?;
/// Ok(state)
/// }
/// }
/// ```
fn retrieve_distributed_option_extension<T: ConfigExtension + Default>(
&mut self,
headers: &HeaderMap,
) -> Result<(), DataFusionError>;
}

Expand Down Expand Up @@ -153,20 +157,17 @@ impl ConfigExtensionExt for SessionConfig {

fn retrieve_distributed_option_extension<T: ConfigExtension + Default>(
&mut self,
headers: &HeaderMap,
) -> Result<(), DataFusionError> {
let Some(flight_metadata) = self.get_extension::<ContextGrpcMetadata>() else {
return Ok(());
};

let mut result = T::default();
let mut found_some = false;
for (k, v) in flight_metadata.0.iter() {
for (k, v) in headers.iter() {
let key = k.as_str().trim_start_matches(FLIGHT_METADATA_CONFIG_PREFIX);
let prefix = format!("{}.", T::PREFIX);
if key.starts_with(&prefix) {
found_some = true;
result.set(
&key.trim_start_matches(&prefix),
key.trim_start_matches(&prefix),
v.to_str().map_err(|err| {
internal_datafusion_err!("Cannot parse header value: {err}")
})?,
Expand All @@ -185,7 +186,7 @@ impl ConfigExtensionExt for SessionStateBuilder {
delegate! {
to self.config().get_or_insert_default() {
fn add_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T) -> Result<(), DataFusionError>;
fn retrieve_distributed_option_extension<T: ConfigExtension + Default>(&mut self) -> Result<(), DataFusionError>;
fn retrieve_distributed_option_extension<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
}
}
}
Expand All @@ -194,7 +195,7 @@ impl ConfigExtensionExt for SessionState {
delegate! {
to self.config_mut() {
fn add_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T) -> Result<(), DataFusionError>;
fn retrieve_distributed_option_extension<T: ConfigExtension + Default>(&mut self) -> Result<(), DataFusionError>;
fn retrieve_distributed_option_extension<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
}
}
}
Expand All @@ -203,7 +204,7 @@ impl ConfigExtensionExt for SessionContext {
delegate! {
to self.state_ref().write().config_mut() {
fn add_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T) -> Result<(), DataFusionError>;
fn retrieve_distributed_option_extension<T: ConfigExtension + Default>(&mut self) -> Result<(), DataFusionError>;
fn retrieve_distributed_option_extension<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
}
}
}
Expand All @@ -212,17 +213,6 @@ impl ConfigExtensionExt for SessionContext {
pub(crate) struct ContextGrpcMetadata(pub HeaderMap);

impl ContextGrpcMetadata {
pub(crate) fn from_headers(metadata: HeaderMap) -> Self {
let mut new = HeaderMap::new();
for (k, v) in metadata.into_iter() {
let Some(k) = k else { continue };
if k.as_str().starts_with(FLIGHT_METADATA_CONFIG_PREFIX) {
new.insert(k, v);
}
}
Self(new)
}

fn merge(mut self, other: Self) -> Self {
for (k, v) in other.0.into_iter() {
let Some(k) = k else { continue };
Expand All @@ -246,16 +236,16 @@ mod tests {
fn test_propagation() -> Result<(), Box<dyn std::error::Error>> {
let mut config = SessionConfig::new();

let mut opt = CustomExtension::default();
opt.foo = "foo".to_string();
opt.bar = 1;
opt.baz = true;
let opt = CustomExtension {
foo: "".to_string(),
bar: 0,
baz: false,
};

config.add_distributed_option_extension(opt)?;

let metadata = config.get_extension::<ContextGrpcMetadata>().unwrap();
let mut new_config = SessionConfig::new();
new_config.set_extension(config.get_extension::<ContextGrpcMetadata>().unwrap());
new_config.retrieve_distributed_option_extension::<CustomExtension>()?;
new_config.retrieve_distributed_option_extension::<CustomExtension>(&metadata.0)?;

let opt = get_ext::<CustomExtension>(&config);
let new_opt = get_ext::<CustomExtension>(&new_config);
Expand Down Expand Up @@ -294,12 +284,16 @@ mod tests {
fn test_new_extension_overwrites_previous() -> Result<(), Box<dyn std::error::Error>> {
let mut config = SessionConfig::new();

let mut opt1 = CustomExtension::default();
opt1.foo = "first".to_string();
let opt1 = CustomExtension {
foo: "first".to_string(),
..Default::default()
};
config.add_distributed_option_extension(opt1)?;

let mut opt2 = CustomExtension::default();
opt2.bar = 42;
let opt2 = CustomExtension {
bar: 42,
..Default::default()
};
config.add_distributed_option_extension(opt2)?;

let flight_metadata = config.get_extension::<ContextGrpcMetadata>().unwrap();
Expand All @@ -317,7 +311,7 @@ mod tests {
fn test_propagate_no_metadata() -> Result<(), Box<dyn std::error::Error>> {
let mut config = SessionConfig::new();

config.retrieve_distributed_option_extension::<CustomExtension>()?;
config.retrieve_distributed_option_extension::<CustomExtension>(&Default::default())?;

let extension = config.options().extensions.get::<CustomExtension>();
assert!(extension.is_none());
Expand All @@ -330,13 +324,11 @@ mod tests {
let mut config = SessionConfig::new();
let mut header_map = HeaderMap::new();
header_map.insert(
HeaderName::from_str("x-datafusion-distributed-other.setting").unwrap(),
HeaderName::from_str("x-datafusion-distributed-config-other.setting").unwrap(),
HeaderValue::from_str("value").unwrap(),
);

let flight_metadata = ContextGrpcMetadata::from_headers(header_map);
config.set_extension(std::sync::Arc::new(flight_metadata));
config.retrieve_distributed_option_extension::<CustomExtension>()?;
config.retrieve_distributed_option_extension::<CustomExtension>(&header_map)?;

let extension = config.options().extensions.get::<CustomExtension>();
assert!(extension.is_none());
Expand All @@ -348,13 +340,17 @@ mod tests {
fn test_multiple_extensions_different_prefixes() -> Result<(), Box<dyn std::error::Error>> {
let mut config = SessionConfig::new();

let mut custom_opt = CustomExtension::default();
custom_opt.foo = "custom_value".to_string();
custom_opt.bar = 123;
let custom_opt = CustomExtension {
foo: "custom_value".to_string(),
bar: 123,
..Default::default()
};

let mut another_opt = AnotherExtension::default();
another_opt.setting1 = "other".to_string();
another_opt.setting2 = 456;
let another_opt = AnotherExtension {
setting1: "other".to_string(),
setting2: 456,
..Default::default()
};

config.add_distributed_option_extension(custom_opt)?;
config.add_distributed_option_extension(another_opt)?;
Expand Down Expand Up @@ -384,9 +380,8 @@ mod tests {
);

let mut new_config = SessionConfig::new();
new_config.set_extension(flight_metadata);
new_config.retrieve_distributed_option_extension::<CustomExtension>()?;
new_config.retrieve_distributed_option_extension::<AnotherExtension>()?;
new_config.retrieve_distributed_option_extension::<CustomExtension>(metadata)?;
new_config.retrieve_distributed_option_extension::<AnotherExtension>(metadata)?;

let propagated_custom = get_ext::<CustomExtension>(&new_config);
let propagated_another = get_ext::<AnotherExtension>(&new_config);
Expand Down
Loading