Skip to content

Commit e94da39

Browse files
committed
Rename methods
1 parent a4fa271 commit e94da39

File tree

11 files changed

+70
-56
lines changed

11 files changed

+70
-56
lines changed

src/channel_manager_ext.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::Arc;
55
use tonic::body::BoxBody;
66
use url::Url;
77

8-
pub(crate) fn set_channel_resolver(
8+
pub(crate) fn set_distributed_channel_resolver(
99
cfg: &mut SessionConfig,
1010
channel_resolver: impl ChannelResolver + Send + Sync + 'static,
1111
) {
@@ -14,7 +14,7 @@ pub(crate) fn set_channel_resolver(
1414
))));
1515
}
1616

17-
pub(crate) fn get_channel_resolver(
17+
pub(crate) fn get_distributed_channel_resolver(
1818
cfg: &SessionConfig,
1919
) -> Option<Arc<dyn ChannelResolver + Send + Sync>> {
2020
cfg.get_extension::<ChannelResolverExtension>()

src/distributed_ext.rs

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use crate::channel_manager_ext::set_channel_resolver;
1+
use crate::channel_manager_ext::set_distributed_channel_resolver;
22
use crate::config_extension_ext::{
33
set_distributed_option_extension, set_distributed_option_extension_from_headers,
44
};
5-
use crate::user_codec_ext::set_user_codec;
5+
use crate::user_codec_ext::set_distributed_user_codec;
66
use crate::ChannelResolver;
77
use datafusion::common::DataFusionError;
88
use datafusion::config::ConfigExtension;
@@ -151,20 +151,20 @@ pub trait DistributedExt: Sized {
151151
/// }
152152
/// }
153153
///
154-
/// let config = SessionConfig::new().with_user_codec(CustomExecCodec);
154+
/// let config = SessionConfig::new().with_distributed_user_codec(CustomExecCodec);
155155
///
156156
/// async fn build_state(ctx: DistributedSessionBuilderContext) -> Result<SessionState, DataFusionError> {
157157
/// // while providing this MyCustomSessionBuilder to an Arrow Flight endpoint, it will
158158
/// // know how to deserialize the CustomExtension from the gRPC metadata.
159159
/// Ok(SessionStateBuilder::new()
160-
/// .with_user_codec(CustomExecCodec)
160+
/// .with_distributed_user_codec(CustomExecCodec)
161161
/// .build())
162162
/// }
163163
/// ```
164-
fn with_user_codec<T: PhysicalExtensionCodec + 'static>(self, codec: T) -> Self;
164+
fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(self, codec: T) -> Self;
165165

166-
/// Same as [DistributedExt::with_user_codec] but with an in-place mutation
167-
fn set_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
166+
/// Same as [DistributedExt::with_distributed_user_codec] but with an in-place mutation
167+
fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
168168

169169
/// Injects a [ChannelResolver] implementation for Distributed DataFusion to resolve worker
170170
/// nodes. When running in distributed mode, setting a [ChannelResolver] is required.
@@ -192,21 +192,26 @@ pub trait DistributedExt: Sized {
192192
/// }
193193
/// }
194194
///
195-
/// let config = SessionConfig::new().with_channel_resolver(CustomChannelResolver);
195+
/// let config = SessionConfig::new().with_distributed_channel_resolver(CustomChannelResolver);
196196
///
197197
/// async fn build_state(ctx: DistributedSessionBuilderContext) -> Result<SessionState, DataFusionError> {
198198
/// // while providing this MyCustomSessionBuilder to an Arrow Flight endpoint, it will
199199
/// // know how to deserialize the CustomExtension from the gRPC metadata.
200200
/// Ok(SessionStateBuilder::new()
201-
/// .with_channel_resolver(CustomChannelResolver)
201+
/// .with_distributed_channel_resolver(CustomChannelResolver)
202202
/// .build())
203203
/// }
204204
/// ```
205-
fn with_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(self, resolver: T)
206-
-> Self;
205+
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
206+
self,
207+
resolver: T,
208+
) -> Self;
207209

208-
/// Same as [DistributedExt::with_channel_resolver] but with an in-place mutation.
209-
fn set_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
210+
/// Same as [DistributedExt::with_distributed_channel_resolver] but with an in-place mutation.
211+
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
212+
&mut self,
213+
resolver: T,
214+
);
210215
}
211216

212217
impl DistributedExt for SessionConfig {
@@ -224,12 +229,15 @@ impl DistributedExt for SessionConfig {
224229
set_distributed_option_extension_from_headers::<T>(self, headers)
225230
}
226231

227-
fn set_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T) {
228-
set_user_codec(self, codec)
232+
fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T) {
233+
set_distributed_user_codec(self, codec)
229234
}
230235

231-
fn set_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T) {
232-
set_channel_resolver(self, resolver)
236+
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
237+
&mut self,
238+
resolver: T,
239+
) {
240+
set_distributed_channel_resolver(self, resolver)
233241
}
234242

235243
delegate! {
@@ -242,13 +250,13 @@ impl DistributedExt for SessionConfig {
242250
#[expr($?;Ok(self))]
243251
fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(mut self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
244252

245-
#[call(set_user_codec)]
253+
#[call(set_distributed_user_codec)]
246254
#[expr($;self)]
247-
fn with_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
255+
fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
248256

249-
#[call(set_channel_resolver)]
257+
#[call(set_distributed_channel_resolver)]
250258
#[expr($;self)]
251-
fn with_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
259+
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
252260
}
253261
}
254262
}
@@ -266,15 +274,15 @@ impl DistributedExt for SessionStateBuilder {
266274
#[expr($?;Ok(self))]
267275
fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(mut self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
268276

269-
fn set_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
270-
#[call(set_user_codec)]
277+
fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
278+
#[call(set_distributed_user_codec)]
271279
#[expr($;self)]
272-
fn with_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
280+
fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
273281

274-
fn set_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
275-
#[call(set_channel_resolver)]
282+
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
283+
#[call(set_distributed_channel_resolver)]
276284
#[expr($;self)]
277-
fn with_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
285+
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
278286
}
279287
}
280288
}
@@ -292,15 +300,15 @@ impl DistributedExt for SessionState {
292300
#[expr($?;Ok(self))]
293301
fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(mut self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
294302

295-
fn set_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
296-
#[call(set_user_codec)]
303+
fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
304+
#[call(set_distributed_user_codec)]
297305
#[expr($;self)]
298-
fn with_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
306+
fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
299307

300-
fn set_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
301-
#[call(set_channel_resolver)]
308+
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
309+
#[call(set_distributed_channel_resolver)]
302310
#[expr($;self)]
303-
fn with_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
311+
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
304312
}
305313
}
306314
}
@@ -318,15 +326,15 @@ impl DistributedExt for SessionContext {
318326
#[expr($?;Ok(self))]
319327
fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
320328

321-
fn set_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
322-
#[call(set_user_codec)]
329+
fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
330+
#[call(set_distributed_user_codec)]
323331
#[expr($;self)]
324-
fn with_user_codec<T: PhysicalExtensionCodec + 'static>(self, codec: T) -> Self;
332+
fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(self, codec: T) -> Self;
325333

326-
fn set_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
327-
#[call(set_channel_resolver)]
334+
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
335+
#[call(set_distributed_channel_resolver)]
328336
#[expr($;self)]
329-
fn with_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(self, resolver: T) -> Self;
337+
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(self, resolver: T) -> Self;
330338
}
331339
}
332340
}

src/flight_service/do_get.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::flight_service::service::ArrowFlightEndpoint;
66
use crate::flight_service::session_builder::DistributedSessionBuilderContext;
77
use crate::plan::{DistributedCodec, PartitionGroup};
88
use crate::stage::{stage_from_proto, ExecutionStage, ExecutionStageProto};
9-
use crate::user_codec_ext::get_user_codec;
9+
use crate::user_codec_ext::get_distributed_user_codec;
1010
use arrow_flight::encode::FlightDataEncoderBuilder;
1111
use arrow_flight::error::FlightError;
1212
use arrow_flight::flight_service_server::FlightService;
@@ -117,7 +117,7 @@ impl ArrowFlightEndpoint {
117117

118118
let mut combined_codec = ComposedPhysicalExtensionCodec::default();
119119
combined_codec.push(DistributedCodec);
120-
if let Some(ref user_codec) = get_user_codec(state.config()) {
120+
if let Some(ref user_codec) = get_distributed_user_codec(state.config()) {
121121
combined_codec.push_arc(Arc::clone(user_codec));
122122
}
123123

src/flight_service/session_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub trait DistributedSessionBuilder {
5252
/// let mut builder = SessionStateBuilder::new()
5353
/// .with_runtime_env(ctx.runtime_env.clone())
5454
/// .with_default_features();
55-
/// builder.set_user_codec(CustomExecCodec);
55+
/// builder.set_distributed_user_codec(CustomExecCodec);
5656
/// // Add your UDFs, optimization rules, etc...
5757
///
5858
/// Ok(builder.build())

src/plan/arrow_flight_read.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use super::combined::CombinedRecordBatchStream;
2-
use crate::channel_manager_ext::get_channel_resolver;
2+
use crate::channel_manager_ext::get_distributed_channel_resolver;
33
use crate::common::ComposedPhysicalExtensionCodec;
44
use crate::config_extension_ext::ContextGrpcMetadata;
55
use crate::errors::tonic_status_to_datafusion_error;
66
use crate::flight_service::{DoGet, StageKey};
77
use crate::plan::DistributedCodec;
88
use crate::stage::{proto_from_stage, ExecutionStage};
9-
use crate::user_codec_ext::get_user_codec;
9+
use crate::user_codec_ext::get_distributed_user_codec;
1010
use crate::ChannelResolver;
1111
use arrow_flight::decode::FlightRecordBatchStream;
1212
use arrow_flight::error::FlightError;
@@ -159,7 +159,8 @@ impl ExecutionPlan for ArrowFlightReadExec {
159159
};
160160

161161
// get the channel manager and current stage from our context
162-
let Some(channel_resolver) = get_channel_resolver(context.session_config()) else {
162+
let Some(channel_resolver) = get_distributed_channel_resolver(context.session_config())
163+
else {
163164
return exec_err!(
164165
"ArrowFlightReadExec requires a ChannelResolver in the session config"
165166
);
@@ -188,7 +189,7 @@ impl ExecutionPlan for ArrowFlightReadExec {
188189

189190
let mut combined_codec = ComposedPhysicalExtensionCodec::default();
190191
combined_codec.push(DistributedCodec {});
191-
if let Some(ref user_codec) = get_user_codec(context.session_config()) {
192+
if let Some(ref user_codec) = get_distributed_user_codec(context.session_config()) {
192193
combined_codec.push_arc(Arc::clone(user_codec));
193194
}
194195

src/stage/execution_stage.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use datafusion::execution::TaskContext;
66
use datafusion::physical_plan::ExecutionPlan;
77
use datafusion::prelude::SessionContext;
88

9-
use crate::channel_manager_ext::get_channel_resolver;
9+
use crate::channel_manager_ext::get_distributed_channel_resolver;
1010
use crate::task::ExecutionTask;
1111
use crate::ChannelResolver;
1212
use itertools::Itertools;
@@ -261,7 +261,8 @@ impl ExecutionPlan for ExecutionStage {
261261
.downcast_ref::<ExecutionStage>()
262262
.expect("Unwrapping myself should always work");
263263

264-
let Some(channel_resolver) = get_channel_resolver(context.session_config()) else {
264+
let Some(channel_resolver) = get_distributed_channel_resolver(context.session_config())
265+
else {
265266
return exec_err!("ChannelManager not found in session config");
266267
};
267268

src/test_utils/localhost.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ where
4646
let channel_resolver = LocalHostChannelResolver::new(ports.clone());
4747
let session_builder = session_builder.map(move |builder: SessionStateBuilder| {
4848
let channel_resolver = channel_resolver.clone();
49-
Ok(builder.with_channel_resolver(channel_resolver).build())
49+
Ok(builder
50+
.with_distributed_channel_resolver(channel_resolver)
51+
.build())
5052
});
5153
let mut join_set = JoinSet::new();
5254
for listener in listeners {

src/user_codec_ext.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ use std::sync::Arc;
44

55
pub struct UserProvidedCodec(Arc<dyn PhysicalExtensionCodec>);
66

7-
pub(crate) fn set_user_codec<T: PhysicalExtensionCodec + 'static>(
7+
pub(crate) fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(
88
cfg: &mut SessionConfig,
99
codec: T,
1010
) {
1111
cfg.set_extension(Arc::new(UserProvidedCodec(Arc::new(codec))))
1212
}
1313

14-
pub(crate) fn get_user_codec(cfg: &SessionConfig) -> Option<Arc<dyn PhysicalExtensionCodec>> {
14+
pub(crate) fn get_distributed_user_codec(
15+
cfg: &SessionConfig,
16+
) -> Option<Arc<dyn PhysicalExtensionCodec>> {
1517
Some(Arc::clone(&cfg.get_extension::<UserProvidedCodec>()?.0))
1618
}

tests/custom_config_extension.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ mod tests {
3232
.with_runtime_env(ctx.runtime_env)
3333
.with_default_features()
3434
.with_distributed_option_extension_from_headers::<CustomExtension>(&ctx.headers)?
35-
.with_user_codec(CustomConfigExtensionRequiredExecCodec)
35+
.with_distributed_user_codec(CustomConfigExtensionRequiredExecCodec)
3636
.build())
3737
}
3838

tests/custom_extension_codec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ mod tests {
4444
Ok(SessionStateBuilder::new()
4545
.with_runtime_env(ctx.runtime_env)
4646
.with_default_features()
47-
.with_user_codec(Int64ListExecCodec)
47+
.with_distributed_user_codec(Int64ListExecCodec)
4848
.build())
4949
}
5050

0 commit comments

Comments
 (0)