Skip to content

Commit f1d5980

Browse files
committed
actually use
1 parent 1c6d16c commit f1d5980

File tree

2 files changed

+21
-14
lines changed

2 files changed

+21
-14
lines changed

src/flight_service/do_get.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,18 @@ impl ArrowFlightEndpoint {
134134
.execute(doget.target_partition as usize, session_state.task_ctx())
135135
.map_err(|err| Status::internal(format!("Error executing stage plan: {err:#?}")))?;
136136

137-
let stream = FlightDataEncoderBuilder::new()
138-
.with_schema(stream.schema().clone())
139-
.with_max_flight_data_size(usize::MAX)
140-
.build(stream.map_err(|err| {
141-
FlightError::Tonic(Box::new(datafusion_error_to_tonic_status(&err)))
142-
}));
137+
let mut encoder_builder =
138+
FlightDataEncoderBuilder::new().with_schema(stream.schema().clone());
139+
if let Some(max_message_size) = self.max_message_size {
140+
encoder_builder = encoder_builder.with_max_flight_data_size(max_message_size);
141+
}
142+
143+
let stream =
144+
encoder_builder
145+
.with_max_flight_data_size(usize::MAX)
146+
.build(stream.map_err(|err| {
147+
FlightError::Tonic(Box::new(datafusion_error_to_tonic_status(&err)))
148+
}));
143149

144150
let task_data_entries = Arc::clone(&self.task_data_entries);
145151
let num_partitions_remaining = Arc::clone(&stage_data.num_partitions_remaining);

src/flight_service/service.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,9 @@ pub struct ArrowFlightEndpoint {
2828
pub(super) task_data_entries: Arc<TTLMap<StageKey, Arc<OnceCell<TaskData>>>>,
2929
pub(super) session_builder: Arc<dyn DistributedSessionBuilder + Send + Sync>,
3030
pub(super) hooks: ArrowFlightEndpointHooks,
31-
max_message_size: usize,
31+
pub(super) max_message_size: Option<usize>,
3232
}
3333

34-
/// Default maximum message size for FlightData chunks in ArrowFlightEndpoint.
35-
/// This is the size used for chunking FlightData within the endpoint.
36-
const DEFAULT_MESSAGE_SIZE: usize = 2 * 1024 * 1024; // 2 MB
37-
3834
impl ArrowFlightEndpoint {
3935
pub fn try_new(
4036
session_builder: impl DistributedSessionBuilder + Send + Sync + 'static,
@@ -45,7 +41,7 @@ impl ArrowFlightEndpoint {
4541
task_data_entries: Arc::new(ttl_map),
4642
session_builder: Arc::new(session_builder),
4743
hooks: ArrowFlightEndpointHooks::default(),
48-
max_message_size: DEFAULT_MESSAGE_SIZE,
44+
max_message_size: None,
4945
})
5046
}
5147

@@ -62,14 +58,19 @@ impl ArrowFlightEndpoint {
6258
}
6359

6460
/// Set the maximum message size for FlightData chunks.
65-
/// Defaults to 2 MB.
61+
///
62+
/// Defaults to None, which uses `arrow-rs` default, curerntly 2MB.
63+
/// See [`FlightDataEncoderBuilder::with_max_flight_data_size`] for details.
64+
///
6665
/// If you change this, ensure you configure the server's max_encoding_message_size and
6766
/// max_decoding_message_size to at least 2x this value to allow for overhead.
6867
/// If your service communication is purely internal and there is no risk of DOS attacks,
6968
/// you may want to set this to a considerably larger value to minimize the overhead of chunking
7069
/// larger datasets.
70+
///
71+
/// [`FlightDataEncoderBuilder::with_max_flight_data_size`]: https://arrow.apache.org/rust/arrow_flight/encode/struct.FlightDataEncoderBuilder.html#structfield.max_flight_data_size
7172
pub fn with_max_message_size(mut self, size: usize) -> Self {
72-
self.max_message_size = size;
73+
self.max_message_size = Some(size);
7374
self
7475
}
7576
}

0 commit comments

Comments
 (0)