Skip to content

Commit 14ea1cb

Browse files
committed
Allow passing multiple user codecs
1 parent acaa11c commit 14ea1cb

File tree

4 files changed

+18
-11
lines changed

4 files changed

+18
-11
lines changed

src/distributed_ext.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ pub trait DistributedExt: Sized {
125125
) -> Result<(), DataFusionError>;
126126

127127
/// Injects a user-defined [PhysicalExtensionCodec] that is capable of encoding/decoding
128-
/// custom execution nodes.
128+
/// custom execution nodes. Multiple user-defined [PhysicalExtensionCodec] can be added
129+
/// by calling this method several times.
129130
///
130131
/// Example:
131132
///

src/protobuf/distributed_codec.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::get_distributed_user_codec;
1+
use super::get_distributed_user_codecs;
22
use crate::common::ComposedPhysicalExtensionCodec;
33
use crate::execution_plans::{NetworkCoalesceExec, NetworkCoalesceReady, NetworkShuffleReadyExec};
44
use crate::{NetworkShuffleExec, PartitionIsolatorExec};
@@ -25,9 +25,7 @@ pub struct DistributedCodec;
2525
impl DistributedCodec {
2626
pub fn new_combined_with_user(cfg: &SessionConfig) -> impl PhysicalExtensionCodec + use<> {
2727
let mut codecs: Vec<Arc<dyn PhysicalExtensionCodec>> = vec![Arc::new(DistributedCodec {})];
28-
if let Some(ref user_codec) = get_distributed_user_codec(cfg) {
29-
codecs.push(Arc::clone(user_codec));
30-
}
28+
codecs.extend(get_distributed_user_codecs(cfg));
3129
ComposedPhysicalExtensionCodec::new(codecs)
3230
}
3331
}

src/protobuf/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ mod user_codec;
44

55
pub(crate) use distributed_codec::DistributedCodec;
66
pub(crate) use stage_proto::{StageExecProto, StageKey, proto_from_stage, stage_from_proto};
7-
pub(crate) use user_codec::{get_distributed_user_codec, set_distributed_user_codec};
7+
pub(crate) use user_codec::{get_distributed_user_codecs, set_distributed_user_codec};

src/protobuf/user_codec.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,25 @@ use datafusion::prelude::SessionConfig;
22
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
33
use std::sync::Arc;
44

5-
pub struct UserProvidedCodec(Arc<dyn PhysicalExtensionCodec>);
5+
pub struct UserProvidedCodecs(Vec<Arc<dyn PhysicalExtensionCodec>>);
66

77
pub(crate) fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(
88
cfg: &mut SessionConfig,
99
codec: T,
1010
) {
11-
cfg.set_extension(Arc::new(UserProvidedCodec(Arc::new(codec))))
11+
let mut codecs = match cfg.get_extension::<UserProvidedCodecs>() {
12+
None => vec![],
13+
Some(prev) => prev.0.clone(),
14+
};
15+
codecs.push(Arc::new(codec));
16+
cfg.set_extension(Arc::new(UserProvidedCodecs(codecs)))
1217
}
1318

14-
pub(crate) fn get_distributed_user_codec(
19+
pub(crate) fn get_distributed_user_codecs(
1520
cfg: &SessionConfig,
16-
) -> Option<Arc<dyn PhysicalExtensionCodec>> {
17-
Some(Arc::clone(&cfg.get_extension::<UserProvidedCodec>()?.0))
21+
) -> Vec<Arc<dyn PhysicalExtensionCodec>> {
22+
match cfg.get_extension::<UserProvidedCodecs>() {
23+
None => vec![],
24+
Some(v) => v.0.clone(),
25+
}
1826
}

0 commit comments

Comments
 (0)