Skip to content

Commit 6f69516

Browse files
authored
allow configuring message sizes (#207)
* allow configuring message sizes * actually use * fmt * use usize::MAX and add helpers to create clients/servers
1 parent 2f29bd1 commit 6f69516

File tree

8 files changed

+109
-18
lines changed

8 files changed

+109
-18
lines changed

examples/in_memory_cluster.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
use arrow::util::pretty::pretty_format_batches;
22
use arrow_flight::flight_service_client::FlightServiceClient;
3-
use arrow_flight::flight_service_server::FlightServiceServer;
43
use async_trait::async_trait;
54
use datafusion::common::DataFusionError;
65
use datafusion::execution::SessionStateBuilder;
76
use datafusion::physical_plan::displayable;
87
use datafusion::prelude::{ParquetReadOptions, SessionContext};
98
use datafusion_distributed::{
109
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt,
11-
DistributedPhysicalOptimizerRule, DistributedSessionBuilderContext,
10+
DistributedPhysicalOptimizerRule, DistributedSessionBuilderContext, create_flight_client,
1211
};
1312
use futures::TryStreamExt;
1413
use hyper_util::rt::TokioIo;
@@ -98,7 +97,7 @@ impl InMemoryChannelResolver {
9897
}));
9998

10099
let this = Self {
101-
channel: FlightServiceClient::new(BoxCloneSyncChannel::new(channel)),
100+
channel: create_flight_client(BoxCloneSyncChannel::new(channel)),
102101
};
103102
let this_clone = this.clone();
104103

@@ -117,7 +116,7 @@ impl InMemoryChannelResolver {
117116

118117
tokio::spawn(async move {
119118
Server::builder()
120-
.add_service(FlightServiceServer::new(endpoint))
119+
.add_service(endpoint.into_flight_server())
121120
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
122121
.await
123122
});

examples/localhost_worker.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
use arrow_flight::flight_service_client::FlightServiceClient;
2-
use arrow_flight::flight_service_server::FlightServiceServer;
32
use async_trait::async_trait;
43
use dashmap::{DashMap, Entry};
54
use datafusion::common::DataFusionError;
65
use datafusion::execution::SessionStateBuilder;
76
use datafusion_distributed::{
87
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt,
9-
DistributedSessionBuilderContext,
8+
DistributedSessionBuilderContext, create_flight_client,
109
};
1110
use std::error::Error;
1211
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@@ -46,7 +45,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
4645
})?;
4746

4847
Server::builder()
49-
.add_service(FlightServiceServer::new(endpoint))
48+
.add_service(endpoint.into_flight_server())
5049
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.port))
5150
.await?;
5251

@@ -79,7 +78,7 @@ impl ChannelResolver for LocalhostChannelResolver {
7978
let channel = Channel::from_shared(url.to_string())
8079
.unwrap()
8180
.connect_lazy();
82-
let channel = FlightServiceClient::new(BoxCloneSyncChannel::new(channel));
81+
let channel = create_flight_client(BoxCloneSyncChannel::new(channel));
8382
v.insert(channel.clone());
8483
Ok(channel)
8584
}

src/channel_resolver_ext.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,21 @@ pub type BoxCloneSyncChannel = tower::util::BoxCloneSyncService<
3535

3636
/// Abstracts networking details so that users can implement their own network resolution
3737
/// mechanism.
38+
///
39+
/// # Implementation Note
40+
///
41+
/// When implementing `get_flight_client_for_url`, it is recommended to use the
42+
/// [`create_flight_client`] helper function to ensure clients are configured with
43+
/// appropriate message size limits for internal communication. This helps avoid message
44+
/// size errors when transferring large datasets.
3845
#[async_trait]
3946
pub trait ChannelResolver {
4047
/// Gets all available worker URLs. Used during stage assignment.
4148
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError>;
4249
/// For a given URL, get an Arrow Flight client for communicating to it.
50+
///
51+
/// Consider using [`create_flight_client`] to create the client with appropriate
52+
/// default message size limits.
4353
async fn get_flight_client_for_url(
4454
&self,
4555
url: &Url,
@@ -59,3 +69,39 @@ impl ChannelResolver for Arc<dyn ChannelResolver + Send + Sync> {
5969
self.as_ref().get_flight_client_for_url(url).await
6070
}
6171
}
72+
73+
/// Creates a [`FlightServiceClient`] with high default message size limits.
74+
///
75+
/// This is a convenience function that wraps [`FlightServiceClient::new`] and configures
76+
/// it with `max_decoding_message_size(usize::MAX)` and `max_encoding_message_size(usize::MAX)`
77+
/// to avoid message size limitations for internal communication.
78+
///
79+
/// Users implementing custom [`ChannelResolver`]s should use this function in their
80+
/// `get_flight_client_for_url` implementations to ensure consistent behavior with built-in
81+
/// implementations.
82+
///
83+
/// # Example
84+
///
85+
/// ```rust,ignore
86+
/// use datafusion_distributed::{create_flight_client, BoxCloneSyncChannel, ChannelResolver};
87+
/// use arrow_flight::flight_service_client::FlightServiceClient;
88+
/// use tonic::transport::Channel;
89+
///
90+
/// #[async_trait]
91+
/// impl ChannelResolver for MyResolver {
92+
/// async fn get_flight_client_for_url(
93+
/// &self,
94+
/// url: &Url,
95+
/// ) -> Result<FlightServiceClient<BoxCloneSyncChannel>, DataFusionError> {
96+
/// let channel = Channel::from_shared(url.to_string())?.connect().await?;
97+
/// Ok(create_flight_client(BoxCloneSyncChannel::new(channel)))
98+
/// }
99+
/// }
100+
/// ```
101+
pub fn create_flight_client(
102+
channel: BoxCloneSyncChannel,
103+
) -> FlightServiceClient<BoxCloneSyncChannel> {
104+
FlightServiceClient::new(channel)
105+
.max_decoding_message_size(usize::MAX)
106+
.max_encoding_message_size(usize::MAX)
107+
}

src/flight_service/do_get.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,15 @@ impl ArrowFlightEndpoint {
152152
// Note that we do garbage collection of unused dictionary values above, so we are not sending
153153
// unused dictionary values over the wire.
154154
.with_dictionary_handling(DictionaryHandling::Resend)
155+
// Set max flight data size to unlimited.
156+
// This requires servers and clients to also be configured to handle unlimited sizes.
157+
// Using unlimited sizes avoids splitting RecordBatches into multiple FlightData messages,
158+
// which could add significant overhead for large RecordBatches.
159+
// The only reason to split them really is if the client/server are configured with a message size limit,
160+
// which mainly makes sense in a public network scenario where you want to avoid DoS attacks.
161+
// Since all of our Arrow Flight communication happens within trusted data plane networks,
162+
// we can safely use unlimited sizes here.
163+
.with_max_flight_data_size(usize::MAX)
155164
.build(stream.map_err(|err| {
156165
FlightError::Tonic(Box::new(datafusion_error_to_tonic_status(&err)))
157166
}));

src/flight_service/service.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::common::ttl_map::{TTLMap, TTLMapConfig};
22
use crate::flight_service::DistributedSessionBuilder;
33
use crate::flight_service::do_get::TaskData;
44
use crate::protobuf::StageKey;
5-
use arrow_flight::flight_service_server::FlightService;
5+
use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
66
use arrow_flight::{
77
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
88
HandshakeRequest, HandshakeResponse, PollInfo, PutResult, SchemaResult, Ticket,
@@ -28,6 +28,7 @@ pub struct ArrowFlightEndpoint {
2828
pub(super) task_data_entries: Arc<TTLMap<StageKey, Arc<OnceCell<TaskData>>>>,
2929
pub(super) session_builder: Arc<dyn DistributedSessionBuilder + Send + Sync>,
3030
pub(super) hooks: ArrowFlightEndpointHooks,
31+
pub(super) max_message_size: Option<usize>,
3132
}
3233

3334
impl ArrowFlightEndpoint {
@@ -40,6 +41,7 @@ impl ArrowFlightEndpoint {
4041
task_data_entries: Arc::new(ttl_map),
4142
session_builder: Arc::new(session_builder),
4243
hooks: ArrowFlightEndpointHooks::default(),
44+
max_message_size: Some(usize::MAX),
4345
})
4446
}
4547

@@ -54,6 +56,44 @@ impl ArrowFlightEndpoint {
5456
) {
5557
self.hooks.on_plan.push(Arc::new(hook));
5658
}
59+
60+
/// Set the maximum message size for FlightData chunks.
61+
///
62+
/// Defaults to `usize::MAX` to minimize chunking overhead for internal communication.
63+
/// See [`FlightDataEncoderBuilder::with_max_flight_data_size`] for details.
64+
///
65+
/// If you change this to a lower value, ensure you configure the server's
66+
/// max_encoding_message_size and max_decoding_message_size to at least 2x this value
67+
/// to allow for overhead. For most use cases, the default of `usize::MAX` is appropriate.
68+
///
69+
/// [`FlightDataEncoderBuilder::with_max_flight_data_size`]: https://arrow.apache.org/rust/arrow_flight/encode/struct.FlightDataEncoderBuilder.html#structfield.max_flight_data_size
70+
pub fn with_max_message_size(mut self, size: usize) -> Self {
71+
self.max_message_size = Some(size);
72+
self
73+
}
74+
75+
/// Converts this endpoint into a [`FlightServiceServer`] with high default message size limits.
76+
///
77+
/// This is a convenience method that wraps the endpoint in a [`FlightServiceServer`] and
78+
/// configures it with `max_decoding_message_size(usize::MAX)` and
79+
/// `max_encoding_message_size(usize::MAX)` to avoid message size limitations for internal
80+
/// communication.
81+
///
82+
/// You can further customize the returned server by chaining additional tonic methods.
83+
///
84+
/// # Example
85+
///
86+
/// ```rust,ignore
87+
/// let endpoint = ArrowFlightEndpoint::try_new(session_builder)?;
88+
/// let server = endpoint.into_flight_server();
89+
/// // Can chain additional tonic methods if needed
90+
/// // let server = server.some_other_tonic_method(...);
91+
/// ```
92+
pub fn into_flight_server(self) -> FlightServiceServer<Self> {
93+
FlightServiceServer::new(self)
94+
.max_decoding_message_size(usize::MAX)
95+
.max_encoding_message_size(usize::MAX)
96+
}
5797
}
5898

5999
#[async_trait]

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ mod protobuf;
1414
#[cfg(any(feature = "integration", test))]
1515
pub mod test_utils;
1616

17-
pub use channel_resolver_ext::{BoxCloneSyncChannel, ChannelResolver};
17+
pub use channel_resolver_ext::{BoxCloneSyncChannel, ChannelResolver, create_flight_client};
1818
pub use distributed_ext::DistributedExt;
1919
pub use distributed_planner::{
2020
DistributedConfig, DistributedPhysicalOptimizerRule, InputStageInfo, NetworkBoundary,

src/test_utils/in_memory_channel_resolver.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use crate::{
22
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt,
3-
DistributedSessionBuilderContext,
3+
DistributedSessionBuilderContext, create_flight_client,
44
};
55
use arrow_flight::flight_service_client::FlightServiceClient;
6-
use arrow_flight::flight_service_server::FlightServiceServer;
76
use async_trait::async_trait;
87
use datafusion::common::DataFusionError;
98
use datafusion::execution::SessionStateBuilder;
@@ -40,7 +39,7 @@ impl InMemoryChannelResolver {
4039
}));
4140

4241
let this = Self {
43-
channel: FlightServiceClient::new(BoxCloneSyncChannel::new(channel)),
42+
channel: create_flight_client(BoxCloneSyncChannel::new(channel)),
4443
};
4544
let this_clone = this.clone();
4645

@@ -59,7 +58,7 @@ impl InMemoryChannelResolver {
5958

6059
tokio::spawn(async move {
6160
Server::builder()
62-
.add_service(FlightServiceServer::new(endpoint))
61+
.add_service(endpoint.into_flight_server())
6362
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
6463
.await
6564
});

src/test_utils/localhost.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use crate::{
22
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt,
33
DistributedSessionBuilder, DistributedSessionBuilderContext,
4-
MappedDistributedSessionBuilderExt,
4+
MappedDistributedSessionBuilderExt, create_flight_client,
55
};
66
use arrow_flight::flight_service_client::FlightServiceClient;
7-
use arrow_flight::flight_service_server::FlightServiceServer;
87
use async_trait::async_trait;
98
use datafusion::common::DataFusionError;
109
use datafusion::common::runtime::JoinSet;
@@ -105,7 +104,7 @@ impl ChannelResolver for LocalHostChannelResolver {
105104
) -> Result<FlightServiceClient<BoxCloneSyncChannel>, DataFusionError> {
106105
let endpoint = Channel::from_shared(url.to_string()).map_err(external_err)?;
107106
let channel = endpoint.connect().await.map_err(external_err)?;
108-
Ok(FlightServiceClient::new(BoxCloneSyncChannel::new(channel)))
107+
Ok(create_flight_client(BoxCloneSyncChannel::new(channel)))
109108
}
110109
}
111110

@@ -118,7 +117,7 @@ pub async fn spawn_flight_service(
118117
let incoming = tokio_stream::wrappers::TcpListenerStream::new(incoming);
119118

120119
Ok(Server::builder()
121-
.add_service(FlightServiceServer::new(endpoint))
120+
.add_service(endpoint.into_flight_server())
122121
.serve_with_incoming(incoming)
123122
.await?)
124123
}

0 commit comments

Comments
 (0)