Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ chrono = { version = "0.4.42" }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
arrow-flight = "56.1.0"
arrow-select = "56.1.0"
async-trait = "0.1.88"
tokio = { version = "1.46.1", features = ["full"] }
# Updated to 0.13.1 to match arrow-flight 56.1.0
Expand Down
12 changes: 2 additions & 10 deletions src/execution_plans/network_coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ use arrow_flight::error::FlightError;
use bytes::Bytes;
use dashmap::DashMap;
use datafusion::common::{exec_err, internal_err, plan_err};
use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory;
use datafusion::error::DataFusionError;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use futures::{TryFutureExt, TryStreamExt};
use http::Extensions;
use prost::Message;
use std::any::Any;
Expand Down Expand Up @@ -296,8 +295,6 @@ impl ExecutionPlan for NetworkCoalesceExec {
};

let metrics_collection_capture = self_ready.metrics_collection.clone();
let adapter = DefaultSchemaAdapterFactory::from_schema(self.schema());
let (mapper, _indices) = adapter.map_schema(&self.schema())?;
let stream = async move {
let mut client = channel_resolver.get_flight_client_for_url(&url).await?;
let stream = client
Expand All @@ -312,12 +309,7 @@ impl ExecutionPlan for NetworkCoalesceExec {

Ok(
FlightRecordBatchStream::new_from_flight_data(metrics_collecting_stream)
.map_err(map_flight_to_datafusion_error)
.map(move |batch| {
let batch = batch?;

mapper.map_batch(batch)
}),
.map_err(map_flight_to_datafusion_error),
)
}
.try_flatten_stream();
Expand Down
12 changes: 1 addition & 11 deletions src/execution_plans/network_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use arrow_flight::error::FlightError;
use bytes::Bytes;
use dashmap::DashMap;
use datafusion::common::{exec_err, internal_datafusion_err, plan_err};
use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory;
use datafusion::error::DataFusionError;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::Partitioning;
Expand Down Expand Up @@ -318,12 +317,8 @@ impl ExecutionPlan for NetworkShuffleExec {
let task_context = DistributedTaskContext::from_ctx(&context);
let off = self_ready.properties.partitioning.partition_count() * task_context.task_index;

let adapter = DefaultSchemaAdapterFactory::from_schema(self.schema());
let (mapper, _indices) = adapter.map_schema(&self.schema())?;

let stream = input_stage_tasks.into_iter().enumerate().map(|(i, task)| {
let channel_resolver = Arc::clone(&channel_resolver);
let mapper = mapper.clone();

let ticket = Request::from_parts(
MetadataMap::from_headers(context_headers.clone()),
Expand Down Expand Up @@ -364,12 +359,7 @@ impl ExecutionPlan for NetworkShuffleExec {

Ok(
FlightRecordBatchStream::new_from_flight_data(metrics_collecting_stream)
.map_err(map_flight_to_datafusion_error)
.map(move |batch| {
let batch = batch?;

mapper.map_batch(batch)
}),
.map_err(map_flight_to_datafusion_error),
)
}
.try_flatten_stream()
Expand Down
48 changes: 46 additions & 2 deletions src/flight_service/do_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ use crate::protobuf::{
};
use arrow_flight::FlightData;
use arrow_flight::Ticket;
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::encode::{DictionaryHandling, FlightDataEncoderBuilder};
use arrow_flight::error::FlightError;
use arrow_flight::flight_service_server::FlightService;
use arrow_select::dictionary::garbage_collect_any_dictionary;
use bytes::Bytes;
use datafusion::arrow::array::{Array, AsArray, RecordBatch};

use datafusion::common::exec_datafusion_err;
use datafusion::error::DataFusionError;
Expand Down Expand Up @@ -134,8 +136,22 @@ impl ArrowFlightEndpoint {
.execute(doget.target_partition as usize, session_state.task_ctx())
.map_err(|err| Status::internal(format!("Error executing stage plan: {err:#?}")))?;

let schema = stream.schema().clone();

// Apply garbage collection of dictionary and view arrays before sending over the network
let stream = stream.and_then(|rb| std::future::ready(garbage_collect_arrays(rb)));

let stream = FlightDataEncoderBuilder::new()
.with_schema(stream.schema().clone())
.with_schema(schema)
// This tells the encoder to send dictionaries across the wire as-is.
// The alternative (`DictionaryHandling::Hydrate`) would expand the dictionaries
// into their value types, which can potentially blow up the size of the data transfer.
// The main reason to use `DictionaryHandling::Hydrate` is for compatibility with clients
// that do not support dictionaries, but since we are using the same server/client on both
// sides, we can safely use `DictionaryHandling::Resend`.
// Note that we do garbage collection of unused dictionary values above, so we are not sending
// unused dictionary values over the wire.
.with_dictionary_handling(DictionaryHandling::Resend)
Copy link
Collaborator

@gabotechs gabotechs Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, with this line, we are telling the flight data encoder to just send the dictionaries over the wire instead of hydrating them and re-encoding them. Is that right?

If you could include a very brief comment above this line that would be awesome 🙏

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great optimization. Thanks @adriangb

.build(stream.map_err(|err| {
FlightError::Tonic(Box::new(datafusion_error_to_tonic_status(&err)))
}));
Expand Down Expand Up @@ -210,6 +226,34 @@ fn collect_and_create_metrics_flight_data(
Ok(incoming.with_app_metadata(buf))
}

/// Garbage collects values sub-arrays.
///
/// We apply this before sending RecordBatches over the network to avoid sending
/// values that are not referenced by any dictionary keys or buffers that are not used.
///
/// Unused values can arise from operations such as filtering, where
/// some keys may no longer be referenced in the filtered result.
fn garbage_collect_arrays(batch: RecordBatch) -> Result<RecordBatch, DataFusionError> {
let (schema, arrays, _row_count) = batch.into_parts();

let arrays = arrays
.into_iter()
.map(|array| {
if let Some(array) = array.as_any_dictionary_opt() {
garbage_collect_any_dictionary(array)
} else if let Some(array) = array.as_string_view_opt() {
Ok(Arc::new(array.gc()) as Arc<dyn Array>)
} else if let Some(array) = array.as_binary_view_opt() {
Ok(Arc::new(array.gc()) as Arc<dyn Array>)
} else {
Ok(array)
}
})
.collect::<Result<Vec<_>, _>>()?;

Ok(RecordBatch::try_new(schema, arrays)?)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down