Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 0 additions & 71 deletions src/channel_manager.rs

This file was deleted.

52 changes: 52 additions & 0 deletions src/channel_manager_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use async_trait::async_trait;
use datafusion::error::DataFusionError;
use datafusion::prelude::SessionConfig;
use std::sync::Arc;
use tonic::body::BoxBody;
use url::Url;

pub(crate) fn set_distributed_channel_resolver(
cfg: &mut SessionConfig,
channel_resolver: impl ChannelResolver + Send + Sync + 'static,
) {
cfg.set_extension(Arc::new(ChannelResolverExtension(Arc::new(
channel_resolver,
))));
}

pub(crate) fn get_distributed_channel_resolver(
cfg: &SessionConfig,
) -> Option<Arc<dyn ChannelResolver + Send + Sync>> {
cfg.get_extension::<ChannelResolverExtension>()
.map(|cm| cm.0.clone())
}

#[derive(Clone)]
struct ChannelResolverExtension(Arc<dyn ChannelResolver + Send + Sync>);

pub type BoxCloneSyncChannel = tower::util::BoxCloneSyncService<
http::Request<BoxBody>,
http::Response<BoxBody>,
tonic::transport::Error,
>;

/// Abstracts networking details so that users can implement their own network resolution
/// mechanism.
#[async_trait]
pub trait ChannelResolver {
/// Gets all available worker URLs. Used during stage assignment.
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError>;
/// For a given URL, get a channel for communicating to it.
async fn get_channel_for_url(&self, url: &Url) -> Result<BoxCloneSyncChannel, DataFusionError>;
}

#[async_trait]
impl ChannelResolver for Arc<dyn ChannelResolver + Send + Sync> {
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
self.as_ref().get_urls()
}

async fn get_channel_for_url(&self, url: &Url) -> Result<BoxCloneSyncChannel, DataFusionError> {
self.as_ref().get_channel_for_url(url).await
}
}
File renamed without changes.
3 changes: 3 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
mod composed_extension_codec;
#[allow(unused)]
pub mod ttl_map;
pub mod util;

pub(crate) use composed_extension_codec::ComposedPhysicalExtensionCodec;
Loading