Skip to content

Commit 6be2101

Browse files
committed
Add Arc variants for the distributed user codec
1 parent 14ea1cb commit 6be2101

File tree

3 files changed

+44
-5
lines changed

3 files changed

+44
-5
lines changed

src/distributed_ext.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ use crate::channel_resolver_ext::set_distributed_channel_resolver;
33
use crate::config_extension_ext::{
44
set_distributed_option_extension, set_distributed_option_extension_from_headers,
55
};
6-
use crate::protobuf::set_distributed_user_codec;
6+
use crate::protobuf::{set_distributed_user_codec, set_distributed_user_codec_arc};
77
use datafusion::common::DataFusionError;
88
use datafusion::config::ConfigExtension;
99
use datafusion::execution::{SessionState, SessionStateBuilder};
1010
use datafusion::prelude::{SessionConfig, SessionContext};
1111
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
1212
use delegate::delegate;
1313
use http::HeaderMap;
14+
use std::sync::Arc;
1415

1516
/// Extends DataFusion with distributed capabilities.
1617
pub trait DistributedExt: Sized {
@@ -167,6 +168,12 @@ pub trait DistributedExt: Sized {
167168
/// Same as [DistributedExt::with_distributed_user_codec] but with an in-place mutation
168169
fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
169170

171+
/// Same as [DistributedExt::with_distributed_user_codec] but with a dynamic argument.
172+
fn with_distributed_user_codec_arc(self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
173+
174+
/// Same as [DistributedExt::set_distributed_user_codec] but with a dynamic argument.
175+
fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
176+
170177
/// Injects a [ChannelResolver] implementation for Distributed DataFusion to resolve worker
171178
/// nodes. When running in distributed mode, setting a [ChannelResolver] is required.
172179
///
@@ -234,6 +241,10 @@ impl DistributedExt for SessionConfig {
234241
set_distributed_user_codec(self, codec)
235242
}
236243

244+
fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>) {
245+
set_distributed_user_codec_arc(self, codec)
246+
}
247+
237248
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
238249
&mut self,
239250
resolver: T,
@@ -255,6 +266,10 @@ impl DistributedExt for SessionConfig {
255266
#[expr($;self)]
256267
fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
257268

269+
#[call(set_distributed_user_codec_arc)]
270+
#[expr($;self)]
271+
fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
272+
258273
#[call(set_distributed_channel_resolver)]
259274
#[expr($;self)]
260275
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
@@ -280,6 +295,11 @@ impl DistributedExt for SessionStateBuilder {
280295
#[expr($;self)]
281296
fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
282297

298+
fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
299+
#[call(set_distributed_user_codec_arc)]
300+
#[expr($;self)]
301+
fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
302+
283303
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
284304
#[call(set_distributed_channel_resolver)]
285305
#[expr($;self)]
@@ -306,6 +326,11 @@ impl DistributedExt for SessionState {
306326
#[expr($;self)]
307327
fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
308328

329+
fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
330+
#[call(set_distributed_user_codec_arc)]
331+
#[expr($;self)]
332+
fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
333+
309334
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
310335
#[call(set_distributed_channel_resolver)]
311336
#[expr($;self)]
@@ -332,6 +357,11 @@ impl DistributedExt for SessionContext {
332357
#[expr($;self)]
333358
fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(self, codec: T) -> Self;
334359

360+
fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
361+
#[call(set_distributed_user_codec_arc)]
362+
#[expr($;self)]
363+
fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
364+
335365
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
336366
#[call(set_distributed_channel_resolver)]
337367
#[expr($;self)]

src/protobuf/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@ mod user_codec;
44

55
pub(crate) use distributed_codec::DistributedCodec;
66
pub(crate) use stage_proto::{StageExecProto, StageKey, proto_from_stage, stage_from_proto};
7-
pub(crate) use user_codec::{get_distributed_user_codecs, set_distributed_user_codec};
7+
pub(crate) use user_codec::{
8+
get_distributed_user_codecs, set_distributed_user_codec, set_distributed_user_codec_arc,
9+
};

src/protobuf/user_codec.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,25 @@ use std::sync::Arc;
44

55
pub struct UserProvidedCodecs(Vec<Arc<dyn PhysicalExtensionCodec>>);
66

7-
pub(crate) fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(
7+
pub(crate) fn set_distributed_user_codec_arc(
88
cfg: &mut SessionConfig,
9-
codec: T,
9+
codec: Arc<dyn PhysicalExtensionCodec>,
1010
) {
1111
let mut codecs = match cfg.get_extension::<UserProvidedCodecs>() {
1212
None => vec![],
1313
Some(prev) => prev.0.clone(),
1414
};
15-
codecs.push(Arc::new(codec));
15+
codecs.push(codec);
1616
cfg.set_extension(Arc::new(UserProvidedCodecs(codecs)))
1717
}
1818

19+
pub(crate) fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(
20+
cfg: &mut SessionConfig,
21+
codec: T,
22+
) {
23+
set_distributed_user_codec_arc(cfg, Arc::new(codec))
24+
}
25+
1926
pub(crate) fn get_distributed_user_codecs(
2027
cfg: &SessionConfig,
2128
) -> Vec<Arc<dyn PhysicalExtensionCodec>> {

0 commit comments

Comments
 (0)