Skip to content

Commit 770a8d1

Browse files
committed
Add docs
1 parent 04728a1 commit 770a8d1

File tree

4 files changed

+64
-4
lines changed

4 files changed

+64
-4
lines changed

src/flight_service/do_get.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::composed_extension_codec::ComposedPhysicalExtensionCodec;
22
use crate::errors::datafusion_error_to_tonic_status;
33
use crate::flight_service::service::ArrowFlightEndpoint;
4-
use crate::get_user_codec;
54
use crate::plan::DistributedCodec;
65
use crate::stage::{stage_from_proto, ExecutionStageProto};
6+
use crate::user_provided_codec::get_user_codec;
77
use arrow_flight::encode::FlightDataEncoderBuilder;
88
use arrow_flight::error::FlightError;
99
use arrow_flight::flight_service_server::FlightService;

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@ mod user_provided_codec;
1515
pub use channel_manager::{BoxCloneSyncChannel, ChannelManager, ChannelResolver};
1616
pub use flight_service::{ArrowFlightEndpoint, SessionBuilder};
1717
pub use plan::ArrowFlightReadExec;
18-
pub use user_provided_codec::{add_user_codec, get_user_codec, with_user_codec};
18+
pub use user_provided_codec::{add_user_codec, with_user_codec};

src/plan/arrow_flight_read.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use crate::channel_manager::ChannelManager;
33
use crate::composed_extension_codec::ComposedPhysicalExtensionCodec;
44
use crate::errors::tonic_status_to_datafusion_error;
55
use crate::flight_service::DoGet;
6-
use crate::get_user_codec;
76
use crate::plan::DistributedCodec;
87
use crate::stage::{proto_from_stage, ExecutionStage};
8+
use crate::user_provided_codec::get_user_codec;
99
use arrow_flight::decode::FlightRecordBatchStream;
1010
use arrow_flight::error::FlightError;
1111
use arrow_flight::flight_service_client::FlightServiceClient;

src/user_provided_codec.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,36 @@ use std::sync::Arc;
55

66
pub struct UserProvidedCodec(Arc<dyn PhysicalExtensionCodec>);
77

8+
/// Injects a user-defined codec that is capable of encoding/decoding custom execution nodes.
9+
/// It will inject the codec as a config extension in the provided [SessionConfig], [SessionContext]
10+
/// or [SessionStateBuilder].
11+
///
12+
/// Example:
13+
///
14+
/// ```
15+
/// # use std::sync::Arc;
16+
/// # use datafusion::execution::{SessionState, FunctionRegistry, SessionStateBuilder};
17+
/// # use datafusion::physical_plan::ExecutionPlan;
18+
/// # use datafusion_proto::physical_plan::PhysicalExtensionCodec;
19+
/// # use datafusion_distributed::{add_user_codec};
20+
///
21+
/// #[derive(Debug)]
22+
/// struct CustomExecCodec;
23+
///
24+
/// impl PhysicalExtensionCodec for CustomExecCodec {
25+
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], registry: &dyn FunctionRegistry) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
26+
/// todo!()
27+
/// }
28+
///
29+
/// fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> datafusion::common::Result<()> {
30+
/// todo!()
31+
/// }
32+
/// }
33+
///
34+
/// let builder = SessionStateBuilder::new();
35+
/// let mut state = builder.build();
36+
/// add_user_codec(state.config_mut(), CustomExecCodec);
37+
/// ```
838
#[allow(private_bounds)]
939
pub fn add_user_codec(
1040
transport: &mut impl UserCodecTransport,
@@ -13,6 +43,36 @@ pub fn add_user_codec(
1343
transport.set(codec);
1444
}
1545

46+
/// Adds a user-defined codec that is capable of encoding/decoding custom execution nodes.
47+
/// It returns the [SessionContext], [SessionConfig] or [SessionStateBuilder] passed on the first
48+
/// argument with the user-defined codec already placed into the config extensions.
49+
///
50+
/// Example:
51+
///
52+
/// ```
53+
/// # use std::sync::Arc;
54+
/// # use datafusion::execution::{SessionState, FunctionRegistry, SessionStateBuilder};
55+
/// # use datafusion::physical_plan::ExecutionPlan;
56+
/// # use datafusion_proto::physical_plan::PhysicalExtensionCodec;
57+
/// # use datafusion_distributed::with_user_codec;
58+
///
59+
/// #[derive(Debug)]
60+
/// struct CustomExecCodec;
61+
///
62+
/// impl PhysicalExtensionCodec for CustomExecCodec {
63+
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], registry: &dyn FunctionRegistry) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
64+
/// todo!()
65+
/// }
66+
///
67+
/// fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> datafusion::common::Result<()> {
68+
/// todo!()
69+
/// }
70+
/// }
71+
///
72+
/// let builder = SessionStateBuilder::new();
73+
/// let builder = with_user_codec(builder, CustomExecCodec);
74+
/// let state = builder.build();
75+
/// ```
1676
#[allow(private_bounds)]
1777
pub fn with_user_codec<T: UserCodecTransport>(
1878
mut transport: T,
@@ -23,7 +83,7 @@ pub fn with_user_codec<T: UserCodecTransport>(
2383
}
2484

2585
#[allow(private_bounds)]
26-
pub fn get_user_codec(
86+
pub(crate) fn get_user_codec(
2787
transport: &impl UserCodecTransport,
2888
) -> Option<Arc<dyn PhysicalExtensionCodec>> {
2989
transport.get()

0 commit comments

Comments
 (0)