Skip to content

Commit 2f29bd1

Browse files
adriangbgabotechs
andauthored
remove schema adapter (#205)
* remove schema adapter * garbage collect dictionaries before sending them across the wire * move into do_get, handle view arrays * update comments * Update src/flight_service/do_get.rs Co-authored-by: Gabriel <[email protected]> --------- Co-authored-by: Gabriel <[email protected]>
1 parent a2dd7ee commit 2f29bd1

File tree

5 files changed

+51
-23
lines changed

5 files changed

+51
-23
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ chrono = { version = "0.4.42" }
1515
datafusion = { workspace = true }
1616
datafusion-proto = { workspace = true }
1717
arrow-flight = "56.1.0"
18+
arrow-select = "56.1.0"
1819
async-trait = "0.1.88"
1920
tokio = { version = "1.46.1", features = ["full"] }
2021
# Updated to 0.13.1 to match arrow-flight 56.1.0

src/execution_plans/network_coalesce.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@ use arrow_flight::error::FlightError;
1414
use bytes::Bytes;
1515
use dashmap::DashMap;
1616
use datafusion::common::{exec_err, internal_err, plan_err};
17-
use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory;
1817
use datafusion::error::DataFusionError;
1918
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
2019
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
2120
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
22-
use futures::{StreamExt, TryFutureExt, TryStreamExt};
21+
use futures::{TryFutureExt, TryStreamExt};
2322
use http::Extensions;
2423
use prost::Message;
2524
use std::any::Any;
@@ -303,8 +302,6 @@ impl ExecutionPlan for NetworkCoalesceExec {
303302
};
304303

305304
let metrics_collection_capture = self_ready.metrics_collection.clone();
306-
let adapter = DefaultSchemaAdapterFactory::from_schema(self.schema());
307-
let (mapper, _indices) = adapter.map_schema(&self.schema())?;
308305
let stream = async move {
309306
let mut client = channel_resolver.get_flight_client_for_url(&url).await?;
310307
let stream = client
@@ -319,12 +316,7 @@ impl ExecutionPlan for NetworkCoalesceExec {
319316

320317
Ok(
321318
FlightRecordBatchStream::new_from_flight_data(metrics_collecting_stream)
322-
.map_err(map_flight_to_datafusion_error)
323-
.map(move |batch| {
324-
let batch = batch?;
325-
326-
mapper.map_batch(batch)
327-
}),
319+
.map_err(map_flight_to_datafusion_error),
328320
)
329321
}
330322
.try_flatten_stream();

src/execution_plans/network_shuffle.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use arrow_flight::error::FlightError;
1414
use bytes::Bytes;
1515
use dashmap::DashMap;
1616
use datafusion::common::{exec_err, internal_datafusion_err, plan_err};
17-
use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory;
1817
use datafusion::error::DataFusionError;
1918
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
2019
use datafusion::physical_expr::Partitioning;
@@ -325,12 +324,8 @@ impl ExecutionPlan for NetworkShuffleExec {
325324
let task_context = DistributedTaskContext::from_ctx(&context);
326325
let off = self_ready.properties.partitioning.partition_count() * task_context.task_index;
327326

328-
let adapter = DefaultSchemaAdapterFactory::from_schema(self.schema());
329-
let (mapper, _indices) = adapter.map_schema(&self.schema())?;
330-
331327
let stream = input_stage_tasks.into_iter().enumerate().map(|(i, task)| {
332328
let channel_resolver = Arc::clone(&channel_resolver);
333-
let mapper = mapper.clone();
334329

335330
let ticket = Request::from_parts(
336331
MetadataMap::from_headers(context_headers.clone()),
@@ -371,12 +366,7 @@ impl ExecutionPlan for NetworkShuffleExec {
371366

372367
Ok(
373368
FlightRecordBatchStream::new_from_flight_data(metrics_collecting_stream)
374-
.map_err(map_flight_to_datafusion_error)
375-
.map(move |batch| {
376-
let batch = batch?;
377-
378-
mapper.map_batch(batch)
379-
}),
369+
.map_err(map_flight_to_datafusion_error),
380370
)
381371
}
382372
.try_flatten_stream()

src/flight_service/do_get.rs

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ use crate::protobuf::{
1111
};
1212
use arrow_flight::FlightData;
1313
use arrow_flight::Ticket;
14-
use arrow_flight::encode::FlightDataEncoderBuilder;
14+
use arrow_flight::encode::{DictionaryHandling, FlightDataEncoderBuilder};
1515
use arrow_flight::error::FlightError;
1616
use arrow_flight::flight_service_server::FlightService;
17+
use arrow_select::dictionary::garbage_collect_any_dictionary;
1718
use bytes::Bytes;
19+
use datafusion::arrow::array::{Array, AsArray, RecordBatch};
1820

1921
use datafusion::common::exec_datafusion_err;
2022
use datafusion::error::DataFusionError;
@@ -134,8 +136,22 @@ impl ArrowFlightEndpoint {
134136
.execute(doget.target_partition as usize, session_state.task_ctx())
135137
.map_err(|err| Status::internal(format!("Error executing stage plan: {err:#?}")))?;
136138

139+
let schema = stream.schema().clone();
140+
141+
// Apply garbage collection of dictionary and view arrays before sending over the network
142+
let stream = stream.and_then(|rb| std::future::ready(garbage_collect_arrays(rb)));
143+
137144
let stream = FlightDataEncoderBuilder::new()
138-
.with_schema(stream.schema().clone())
145+
.with_schema(schema)
146+
// This tells the encoder to send dictionaries across the wire as-is.
147+
// The alternative (`DictionaryHandling::Hydrate`) would expand the dictionaries
148+
// into their value types, which can potentially blow up the size of the data transfer.
149+
// The main reason to use `DictionaryHandling::Hydrate` is for compatibility with clients
150+
// that do not support dictionaries, but since we are using the same server/client on both
151+
// sides, we can safely use `DictionaryHandling::Resend`.
152+
// Note that we do garbage collection of unused dictionary values above, so we are not sending
153+
// unused dictionary values over the wire.
154+
.with_dictionary_handling(DictionaryHandling::Resend)
139155
.build(stream.map_err(|err| {
140156
FlightError::Tonic(Box::new(datafusion_error_to_tonic_status(&err)))
141157
}));
@@ -210,6 +226,34 @@ fn collect_and_create_metrics_flight_data(
210226
Ok(incoming.with_app_metadata(buf))
211227
}
212228

229+
/// Garbage collects values sub-arrays.
230+
///
231+
/// We apply this before sending RecordBatches over the network to avoid sending
232+
/// values that are not referenced by any dictionary keys or buffers that are not used.
233+
///
234+
/// Unused values can arise from operations such as filtering, where
235+
/// some keys may no longer be referenced in the filtered result.
236+
fn garbage_collect_arrays(batch: RecordBatch) -> Result<RecordBatch, DataFusionError> {
237+
let (schema, arrays, _row_count) = batch.into_parts();
238+
239+
let arrays = arrays
240+
.into_iter()
241+
.map(|array| {
242+
if let Some(array) = array.as_any_dictionary_opt() {
243+
garbage_collect_any_dictionary(array)
244+
} else if let Some(array) = array.as_string_view_opt() {
245+
Ok(Arc::new(array.gc()) as Arc<dyn Array>)
246+
} else if let Some(array) = array.as_binary_view_opt() {
247+
Ok(Arc::new(array.gc()) as Arc<dyn Array>)
248+
} else {
249+
Ok(array)
250+
}
251+
})
252+
.collect::<Result<Vec<_>, _>>()?;
253+
254+
Ok(RecordBatch::try_new(schema, arrays)?)
255+
}
256+
213257
#[cfg(test)]
214258
mod tests {
215259
use super::*;

0 commit comments

Comments
 (0)