Skip to content

Commit f400367

Browse files
committed
garbage collect dictionaries before sending them across the wire
1 parent 98ff52e commit f400367

File tree

3 files changed

+45
-2
lines changed

3 files changed

+45
-2
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ chrono = { version = "0.4.42" }
1515
datafusion = { workspace = true }
1616
datafusion-proto = { workspace = true }
1717
arrow-flight = "56.1.0"
18+
arrow-select = "56.1.0"
1819
async-trait = "0.1.88"
1920
tokio = { version = "1.46.1", features = ["full"] }
2021
# Updated to 0.13.1 to match arrow-flight 56.1.0

src/execution_plans/network_coalesce.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,19 @@ use crate::metrics::proto::MetricsSetProto;
88
use crate::protobuf::{StageKey, map_flight_to_datafusion_error, map_status_to_datafusion_error};
99
use crate::stage::{MaybeEncodedPlan, Stage};
1010
use crate::{ChannelResolver, DistributedTaskContext};
11+
use arrow::array::{AsArray, RecordBatch};
1112
use arrow_flight::Ticket;
1213
use arrow_flight::decode::FlightRecordBatchStream;
1314
use arrow_flight::error::FlightError;
15+
use arrow_select::dictionary::garbage_collect_any_dictionary;
1416
use bytes::Bytes;
1517
use dashmap::DashMap;
1618
use datafusion::common::{exec_err, internal_err, plan_err};
1719
use datafusion::error::DataFusionError;
1820
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
1921
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
2022
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
21-
use futures::{TryFutureExt, TryStreamExt};
23+
use futures::{StreamExt, TryFutureExt, TryStreamExt};
2224
use http::Extensions;
2325
use prost::Message;
2426
use std::any::Any;
@@ -309,7 +311,12 @@ impl ExecutionPlan for NetworkCoalesceExec {
309311

310312
Ok(
311313
FlightRecordBatchStream::new_from_flight_data(metrics_collecting_stream)
312-
.map_err(map_flight_to_datafusion_error),
314+
.map_err(map_flight_to_datafusion_error)
315+
.map(move |batch| {
316+
let batch = batch?;
317+
318+
garbage_collect_dictionary_arrays(batch)
319+
}),
313320
)
314321
}
315322
.try_flatten_stream();
@@ -320,3 +327,37 @@ impl ExecutionPlan for NetworkCoalesceExec {
320327
)))
321328
}
322329
}
330+
331+
/// Garbage collects dictionary arrays in the given RecordBatch.
332+
///
333+
/// We apply this before sending RecordBatches over the network to avoid sending
334+
/// unused dictionary values.
335+
///
336+
/// Unused dictionary values can arise from operations such as filtering, where
337+
/// some dictionary keys may no longer be referenced in the filtered result.
338+
fn garbage_collect_dictionary_arrays(batch: RecordBatch) -> Result<RecordBatch, DataFusionError> {
339+
// Check if we have any dictionary arrays
340+
let has_dictionary_array = batch
341+
.columns()
342+
.iter()
343+
.any(|array| array.as_any_dictionary_opt().is_some());
344+
345+
if !has_dictionary_array {
346+
return Ok(batch);
347+
}
348+
349+
let (schema, arrays, _row_count) = batch.into_parts();
350+
351+
let arrays = arrays
352+
.into_iter()
353+
.map(|array| {
354+
if let Some(array) = array.as_any_dictionary_opt() {
355+
garbage_collect_any_dictionary(array)
356+
} else {
357+
Ok(array)
358+
}
359+
})
360+
.collect::<Result<Vec<_>, _>>()?;
361+
362+
Ok(RecordBatch::try_new(schema, arrays)?)
363+
}

0 commit comments

Comments
 (0)