Skip to content

Commit 825ec2d

Browse files
authored
Introduce DistributedExt trait that extends the capabilities of DataFusion's session building tools (#106)
* Introduce DistributedExt * Include user codecs in DistributedExt * Add with_ variants to DistributedExt * Include ChannelManager in the DistributedExt trait * Run cargo fmt * Rename methods * Improve distributed_ext.rs doc comments
1 parent 5c92110 commit 825ec2d

19 files changed

+627
-486
lines changed

src/channel_manager.rs

Lines changed: 0 additions & 71 deletions
This file was deleted.

src/channel_manager_ext.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use async_trait::async_trait;
2+
use datafusion::error::DataFusionError;
3+
use datafusion::prelude::SessionConfig;
4+
use std::sync::Arc;
5+
use tonic::body::BoxBody;
6+
use url::Url;
7+
8+
pub(crate) fn set_distributed_channel_resolver(
9+
cfg: &mut SessionConfig,
10+
channel_resolver: impl ChannelResolver + Send + Sync + 'static,
11+
) {
12+
cfg.set_extension(Arc::new(ChannelResolverExtension(Arc::new(
13+
channel_resolver,
14+
))));
15+
}
16+
17+
pub(crate) fn get_distributed_channel_resolver(
18+
cfg: &SessionConfig,
19+
) -> Option<Arc<dyn ChannelResolver + Send + Sync>> {
20+
cfg.get_extension::<ChannelResolverExtension>()
21+
.map(|cm| cm.0.clone())
22+
}
23+
24+
#[derive(Clone)]
25+
struct ChannelResolverExtension(Arc<dyn ChannelResolver + Send + Sync>);
26+
27+
pub type BoxCloneSyncChannel = tower::util::BoxCloneSyncService<
28+
http::Request<BoxBody>,
29+
http::Response<BoxBody>,
30+
tonic::transport::Error,
31+
>;
32+
33+
/// Abstracts networking details so that users can implement their own network resolution
34+
/// mechanism.
35+
#[async_trait]
36+
pub trait ChannelResolver {
37+
/// Gets all available worker URLs. Used during stage assignment.
38+
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError>;
39+
/// For a given URL, get a channel for communicating to it.
40+
async fn get_channel_for_url(&self, url: &Url) -> Result<BoxCloneSyncChannel, DataFusionError>;
41+
}
42+
43+
#[async_trait]
44+
impl ChannelResolver for Arc<dyn ChannelResolver + Send + Sync> {
45+
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
46+
self.as_ref().get_urls()
47+
}
48+
49+
async fn get_channel_for_url(&self, url: &Url) -> Result<BoxCloneSyncChannel, DataFusionError> {
50+
self.as_ref().get_channel_for_url(url).await
51+
}
52+
}
File renamed without changes.

src/common/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
mod composed_extension_codec;
12
#[allow(unused)]
23
pub mod ttl_map;
34
pub mod util;
5+
6+
pub(crate) use composed_extension_codec::ComposedPhysicalExtensionCodec;

0 commit comments

Comments
 (0)