Skip to content

Commit 88eb80b

Browse files
committed
move into do_get, handle view arrays
1 parent f400367 commit 88eb80b

File tree

2 files changed

+44
-44
lines changed

2 files changed

+44
-44
lines changed

src/execution_plans/network_coalesce.rs

Lines changed: 2 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,17 @@ use crate::metrics::proto::MetricsSetProto;
88
use crate::protobuf::{StageKey, map_flight_to_datafusion_error, map_status_to_datafusion_error};
99
use crate::stage::{MaybeEncodedPlan, Stage};
1010
use crate::{ChannelResolver, DistributedTaskContext};
11-
use arrow::array::{AsArray, RecordBatch};
1211
use arrow_flight::Ticket;
1312
use arrow_flight::decode::FlightRecordBatchStream;
1413
use arrow_flight::error::FlightError;
15-
use arrow_select::dictionary::garbage_collect_any_dictionary;
1614
use bytes::Bytes;
1715
use dashmap::DashMap;
1816
use datafusion::common::{exec_err, internal_err, plan_err};
1917
use datafusion::error::DataFusionError;
2018
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
2119
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
2220
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
23-
use futures::{StreamExt, TryFutureExt, TryStreamExt};
21+
use futures::{TryFutureExt, TryStreamExt};
2422
use http::Extensions;
2523
use prost::Message;
2624
use std::any::Any;
@@ -311,12 +309,7 @@ impl ExecutionPlan for NetworkCoalesceExec {
311309

312310
Ok(
313311
FlightRecordBatchStream::new_from_flight_data(metrics_collecting_stream)
314-
.map_err(map_flight_to_datafusion_error)
315-
.map(move |batch| {
316-
let batch = batch?;
317-
318-
garbage_collect_dictionary_arrays(batch)
319-
}),
312+
.map_err(map_flight_to_datafusion_error),
320313
)
321314
}
322315
.try_flatten_stream();
@@ -327,37 +320,3 @@ impl ExecutionPlan for NetworkCoalesceExec {
327320
)))
328321
}
329322
}
330-
331-
/// Garbage collects dictionary arrays in the given RecordBatch.
332-
///
333-
/// We apply this before sending RecordBatches over the network to avoid sending
334-
/// unused dictionary values.
335-
///
336-
/// Unused dictionary values can arise from operations such as filtering, where
337-
/// some dictionary keys may no longer be referenced in the filtered result.
338-
fn garbage_collect_dictionary_arrays(batch: RecordBatch) -> Result<RecordBatch, DataFusionError> {
339-
// Check if we have any dictionary arrays
340-
let has_dictionary_array = batch
341-
.columns()
342-
.iter()
343-
.any(|array| array.as_any_dictionary_opt().is_some());
344-
345-
if !has_dictionary_array {
346-
return Ok(batch);
347-
}
348-
349-
let (schema, arrays, _row_count) = batch.into_parts();
350-
351-
let arrays = arrays
352-
.into_iter()
353-
.map(|array| {
354-
if let Some(array) = array.as_any_dictionary_opt() {
355-
garbage_collect_any_dictionary(array)
356-
} else {
357-
Ok(array)
358-
}
359-
})
360-
.collect::<Result<Vec<_>, _>>()?;
361-
362-
Ok(RecordBatch::try_new(schema, arrays)?)
363-
}

src/flight_service/do_get.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use arrow_flight::Ticket;
1414
use arrow_flight::encode::{DictionaryHandling, FlightDataEncoderBuilder};
1515
use arrow_flight::error::FlightError;
1616
use arrow_flight::flight_service_server::FlightService;
17+
use arrow_select::dictionary::garbage_collect_any_dictionary;
1718
use bytes::Bytes;
19+
use datafusion::arrow::array::{Array, AsArray, RecordBatch};
1820

1921
use datafusion::common::exec_datafusion_err;
2022
use datafusion::error::DataFusionError;
@@ -134,8 +136,19 @@ impl ArrowFlightEndpoint {
134136
.execute(doget.target_partition as usize, session_state.task_ctx())
135137
.map_err(|err| Status::internal(format!("Error executing stage plan: {err:#?}")))?;
136138

139+
let schema = stream.schema().clone();
140+
141+
// Apply garbage collection of dictionary arrays before sending over the network
142+
let stream = stream.and_then(|rb| std::future::ready(garbage_collect_arrays(rb)));
143+
137144
let stream = FlightDataEncoderBuilder::new()
138-
.with_schema(stream.schema().clone())
145+
.with_schema(schema)
146+
// This tells the encoder to send dictioanries across the wire as-is.
147+
// The alternative (`DictionaryHandling::Hydrate`) would expand the dictionaries
148+
// into their value types, which can potentially blow up the size of the data transfer.
149+
// The main reason to use `DictionaryHandling::Hydrate` is for compatibility with clients
150+
// that do not support dictionaries, but since we are using the same server/client on both
151+
// sides, we can safely use `DictionaryHandling::Resend`.
139152
.with_dictionary_handling(DictionaryHandling::Resend)
140153
.build(stream.map_err(|err| {
141154
FlightError::Tonic(Box::new(datafusion_error_to_tonic_status(&err)))
@@ -211,6 +224,34 @@ fn collect_and_create_metrics_flight_data(
211224
Ok(incoming.with_app_metadata(buf))
212225
}
213226

227+
/// Garbage collects values sub-arrays.
228+
///
229+
/// We apply this before sending RecordBatches over the network to avoid sending
230+
/// values that are not referenced by any dictionary keys or buffers that are not used.
231+
///
232+
/// Unused values can arise from operations such as filtering, where
233+
/// some keys may no longer be referenced in the filtered result.
234+
fn garbage_collect_arrays(batch: RecordBatch) -> Result<RecordBatch, DataFusionError> {
235+
let (schema, arrays, _row_count) = batch.into_parts();
236+
237+
let arrays = arrays
238+
.into_iter()
239+
.map(|array| {
240+
if let Some(array) = array.as_any_dictionary_opt() {
241+
garbage_collect_any_dictionary(array)
242+
} else if let Some(array) = array.as_string_view_opt() {
243+
Ok(Arc::new(array.gc()) as Arc<dyn Array>)
244+
} else if let Some(array) = array.as_binary_view_opt() {
245+
Ok(Arc::new(array.gc()) as Arc<dyn Array>)
246+
} else {
247+
Ok(array)
248+
}
249+
})
250+
.collect::<Result<Vec<_>, _>>()?;
251+
252+
Ok(RecordBatch::try_new(schema, arrays)?)
253+
}
254+
214255
#[cfg(test)]
215256
mod tests {
216257
use super::*;

0 commit comments

Comments
 (0)