Skip to content

Commit 032febb

Browse files
committed
Allow passing a session builder to in_memory_channel_resolver.rs
1 parent 00c0298 commit 032febb

File tree

3 files changed

+15
-17
lines changed

3 files changed

+15
-17
lines changed

src/distributed_ext.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ impl DistributedExt for SessionContext {
360360
fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
361361
#[call(set_distributed_user_codec_arc)]
362362
#[expr($;self)]
363-
fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
363+
fn with_distributed_user_codec_arc(self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
364364

365365
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
366366
#[call(set_distributed_channel_resolver)]

src/execution_plans/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ mod tests {
310310
let state = SessionStateBuilder::new()
311311
.with_default_features()
312312
.with_config(config)
313-
.with_distributed_channel_resolver(InMemoryChannelResolver::new())
313+
.with_distributed_channel_resolver(InMemoryChannelResolver::default())
314314
.with_physical_optimizer_rule(Arc::new(
315315
DistributedPhysicalOptimizerRule::default()
316316
.with_network_coalesce_tasks(2)

src/test_utils/in_memory_channel_resolver.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::{
22
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt,
3-
DistributedSessionBuilderContext,
3+
DistributedSessionBuilder, DistributedSessionBuilderContext,
4+
MappedDistributedSessionBuilderExt,
45
};
56
use arrow_flight::flight_service_server::FlightServiceServer;
67
use async_trait::async_trait;
@@ -20,12 +21,17 @@ pub struct InMemoryChannelResolver {
2021

2122
impl Default for InMemoryChannelResolver {
2223
fn default() -> Self {
23-
Self::new()
24+
Self::new(|ctx: DistributedSessionBuilderContext| async move {
25+
let builder = SessionStateBuilder::new()
26+
.with_default_features()
27+
.with_runtime_env(ctx.runtime_env.clone());
28+
Ok(builder.build())
29+
})
2430
}
2531
}
2632

2733
impl InMemoryChannelResolver {
28-
pub fn new() -> Self {
34+
pub fn new(builder: impl DistributedSessionBuilder + Send + Sync + 'static) -> Self {
2935
let (client, server) = tokio::io::duplex(1024 * 1024);
3036

3137
let mut client = Some(client);
@@ -43,18 +49,10 @@ impl InMemoryChannelResolver {
4349
};
4450
let this_clone = this.clone();
4551

46-
let endpoint =
47-
ArrowFlightEndpoint::try_new(move |ctx: DistributedSessionBuilderContext| {
48-
let this = this.clone();
49-
async move {
50-
let builder = SessionStateBuilder::new()
51-
.with_default_features()
52-
.with_distributed_channel_resolver(this)
53-
.with_runtime_env(ctx.runtime_env.clone());
54-
Ok(builder.build())
55-
}
56-
})
57-
.unwrap();
52+
let builder =
53+
builder.map(move |b| Ok(b.with_distributed_channel_resolver(this.clone()).build()));
54+
55+
let endpoint = ArrowFlightEndpoint::try_new(builder).unwrap();
5856

5957
tokio::spawn(async move {
6058
Server::builder()

0 commit comments

Comments
 (0)