11use crate :: {
22 ArrowFlightEndpoint , BoxCloneSyncChannel , ChannelResolver , DistributedExt ,
3- DistributedSessionBuilder , DistributedSessionBuilderContext ,
4- MappedDistributedSessionBuilderExt ,
3+ DistributedSessionBuilderContext ,
54} ;
65use arrow_flight:: flight_service_server:: FlightServiceServer ;
76use async_trait:: async_trait;
@@ -21,17 +20,12 @@ pub struct InMemoryChannelResolver {
2120
2221impl Default for InMemoryChannelResolver {
2322 fn default ( ) -> Self {
24- Self :: new ( |ctx : DistributedSessionBuilderContext | async move {
25- let builder = SessionStateBuilder :: new ( )
26- . with_default_features ( )
27- . with_runtime_env ( ctx. runtime_env . clone ( ) ) ;
28- Ok ( builder. build ( ) )
29- } )
23+ Self :: new ( )
3024 }
3125}
3226
3327impl InMemoryChannelResolver {
34- pub fn new ( builder : impl DistributedSessionBuilder + Send + Sync + ' static ) -> Self {
28+ pub fn new ( ) -> Self {
3529 let ( client, server) = tokio:: io:: duplex ( 1024 * 1024 ) ;
3630
3731 let mut client = Some ( client) ;
@@ -49,10 +43,18 @@ impl InMemoryChannelResolver {
4943 } ;
5044 let this_clone = this. clone ( ) ;
5145
52- let builder =
53- builder. map ( move |b| Ok ( b. with_distributed_channel_resolver ( this. clone ( ) ) . build ( ) ) ) ;
54-
55- let endpoint = ArrowFlightEndpoint :: try_new ( builder) . unwrap ( ) ;
46+ let endpoint =
47+ ArrowFlightEndpoint :: try_new ( move |ctx : DistributedSessionBuilderContext | {
48+ let this = this. clone ( ) ;
49+ async move {
50+ let builder = SessionStateBuilder :: new ( )
51+ . with_default_features ( )
52+ . with_distributed_channel_resolver ( this)
53+ . with_runtime_env ( ctx. runtime_env . clone ( ) ) ;
54+ Ok ( builder. build ( ) )
55+ }
56+ } )
57+ . unwrap ( ) ;
5658
5759 tokio:: spawn ( async move {
5860 Server :: builder ( )
0 commit comments