Skip to content

Commit 25d5924

Browse files
authored
fix: distributed RangePartitioning bounds calculation with native shuffle (#2258)
1 parent 95182b3 commit 25d5924

File tree

75 files changed

+726
-709
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+726
-709
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,14 +325,11 @@ object CometConf extends ShimCometConf {
325325
.booleanConf
326326
.createWithDefault(true)
327327

328-
// RangePartitioning contains bugs https://github.com/apache/datafusion-comet/issues/1906
329328
val COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
330329
conf("spark.comet.native.shuffle.partitioning.range.enabled")
331-
.doc("Experimental feature to enable range partitioning for Comet native shuffle. " +
332-
"This feature is experimental while we investigate scenarios that don't partition data " +
333-
"correctly.")
330+
.doc("Whether to enable range partitioning for Comet native shuffle.")
334331
.booleanConf
335-
.createWithDefault(false)
332+
.createWithDefault(true)
336333

337334
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
338335
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ Comet provides the following configuration settings.
7272
| spark.comet.memoryOverhead | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. This config is optional. If this is not specified, it will be set to `spark.comet.memory.overhead.factor` * `spark.executor.memory`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | |
7373
| spark.comet.metrics.updateInterval | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 |
7474
| spark.comet.native.shuffle.partitioning.hash.enabled | Whether to enable hash partitioning for Comet native shuffle. | true |
75-
| spark.comet.native.shuffle.partitioning.range.enabled | Experimental feature to enable range partitioning for Comet native shuffle. This feature is experimental while we investigate scenarios that don't partition data correctly. | false |
75+
| spark.comet.native.shuffle.partitioning.range.enabled | Whether to enable range partitioning for Comet native shuffle. | true |
7676
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
7777
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. | false |
7878
| spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader, if the read ranges submitted are skewed in sizes, this option will cause the reader to break up larger read ranges into smaller ranges to reduce the skew. This will result in a slightly larger number of connections opened to the file system but may give improved performance. | false |

native/core/benches/shuffle_writer.rs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616
// under the License.
1717

1818
use arrow::array::builder::{Date32Builder, Decimal128Builder, Int32Builder};
19-
use arrow::array::{builder::StringBuilder, RecordBatch};
19+
use arrow::array::{builder::StringBuilder, Array, Int32Array, RecordBatch};
2020
use arrow::datatypes::{DataType, Field, Schema};
21+
use arrow::row::{RowConverter, SortField};
2122
use comet::execution::shuffle::{
2223
CometPartitioning, CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec,
2324
};
@@ -31,6 +32,7 @@ use datafusion::{
3132
physical_plan::{common::collect, ExecutionPlan},
3233
prelude::SessionContext,
3334
};
35+
use itertools::Itertools;
3436
use std::io::Cursor;
3537
use std::sync::Arc;
3638
use tokio::runtime::Runtime;
@@ -84,16 +86,37 @@ fn criterion_benchmark(c: &mut Criterion) {
8486
);
8587
}
8688

89+
let lex_ordering = LexOrdering::new(vec![PhysicalSortExpr::new_default(
90+
col("c0", batch.schema().as_ref()).unwrap(),
91+
)])
92+
.unwrap();
93+
94+
let sort_fields: Vec<SortField> = batch
95+
.columns()
96+
.iter()
97+
.zip(&lex_ordering)
98+
.map(|(array, sort_expr)| {
99+
SortField::new_with_options(array.data_type().clone(), sort_expr.options)
100+
})
101+
.collect();
102+
let row_converter = RowConverter::new(sort_fields).unwrap();
103+
104+
// These are hard-coded values based on the benchmark params of 8192 rows per batch, and 16
105+
// partitions. If these change, these values need to be recalculated, or bring over the
106+
// bounds-finding logic from shuffle_write_test in shuffle_writer.rs.
107+
let bounds_ints = vec![
108+
512, 1024, 1536, 2048, 2560, 3072, 3584, 4096, 4608, 5120, 5632, 6144, 6656, 7168, 7680,
109+
];
110+
let bounds_array: Arc<dyn Array> = Arc::new(Int32Array::from(bounds_ints));
111+
let bounds_rows = row_converter
112+
.convert_columns(vec![bounds_array].as_slice())
113+
.unwrap();
114+
115+
let owned_rows = bounds_rows.iter().map(|row| row.owned()).collect_vec();
116+
87117
for partitioning in [
88118
CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
89-
CometPartitioning::RangePartitioning(
90-
LexOrdering::new(vec![PhysicalSortExpr::new_default(
91-
col("c0", batch.schema().as_ref()).unwrap(),
92-
)])
93-
.unwrap(),
94-
16,
95-
100,
96-
),
119+
CometPartitioning::RangePartitioning(lex_ordering, 16, Arc::new(row_converter), owned_rows),
97120
] {
98121
let compression_codec = CompressionCodec::None;
99122
group.bench_function(

native/core/src/execution/planner.rs

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ use arrow::array::{
9292
NullArray, StringBuilder, TimestampMicrosecondArray,
9393
};
9494
use arrow::buffer::{BooleanBuffer, NullBuffer, OffsetBuffer};
95+
use arrow::row::{OwnedRow, RowConverter, SortField};
9596
use datafusion::common::utils::SingleRowListArrayBuilder;
9697
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
9798
use datafusion::physical_plan::filter::FilterExec;
@@ -484,14 +485,14 @@ impl PhysicalPlanner {
484485
)))
485486
}
486487
}
487-
},
488+
}
488489
Value::ListVal(values) => {
489490
if let DataType::List(_) = data_type {
490491
SingleRowListArrayBuilder::new(literal_to_array_ref(data_type, values.clone())?).build_list_scalar()
491492
} else {
492493
return Err(GeneralError(format!(
493494
"Expected DataType::List but got {data_type:?}"
494-
)))
495+
)));
495496
}
496497
}
497498
}
@@ -1402,8 +1403,14 @@ impl PhysicalPlanner {
14021403
assert_eq!(children.len(), 1);
14031404
let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?;
14041405

1405-
let partitioning = self
1406-
.create_partitioning(writer.partitioning.as_ref().unwrap(), child.schema())?;
1406+
// We wrap native shuffle in a CopyExec. This existed previously, but for
1407+
// RangePartitioning at least we want to ensure that dictionaries are unpacked.
1408+
let wrapped_child = Self::wrap_in_copy_exec(Arc::clone(&child.native_plan));
1409+
1410+
let partitioning = self.create_partitioning(
1411+
writer.partitioning.as_ref().unwrap(),
1412+
wrapped_child.schema(),
1413+
)?;
14071414

14081415
let codec = match writer.codec.try_into() {
14091416
Ok(SparkCompressionCodec::None) => Ok(CompressionCodec::None),
@@ -1419,7 +1426,7 @@ impl PhysicalPlanner {
14191426
}?;
14201427

14211428
let shuffle_writer = Arc::new(ShuffleWriterExec::try_new(
1422-
Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)),
1429+
wrapped_child,
14231430
partitioning,
14241431
codec,
14251432
writer.output_data_file.clone(),
@@ -2344,16 +2351,65 @@ impl PhysicalPlanner {
23442351
))
23452352
}
23462353
PartitioningStruct::RangePartition(range_partition) => {
2354+
// Generate the lexical ordering for comparisons
23472355
let exprs: Result<Vec<PhysicalSortExpr>, ExecutionError> = range_partition
23482356
.sort_orders
23492357
.iter()
23502358
.map(|expr| self.create_sort_expr(expr, Arc::clone(&input_schema)))
23512359
.collect();
23522360
let lex_ordering = LexOrdering::new(exprs?).unwrap();
2361+
2362+
// Generate the row converter for comparing incoming batches to boundary rows
2363+
let sort_fields: Vec<SortField> = lex_ordering
2364+
.iter()
2365+
.map(|sort_expr| {
2366+
sort_expr
2367+
.expr
2368+
.data_type(input_schema.as_ref())
2369+
.map(|dt| SortField::new_with_options(dt, sort_expr.options))
2370+
})
2371+
.collect::<Result<Vec<_>, _>>()?;
2372+
2373+
// Deserialize the literals to columnar collections of ScalarValues
2374+
let mut scalar_values: Vec<Vec<ScalarValue>> = vec![vec![]; lex_ordering.len()];
2375+
for boundary_row in &range_partition.boundary_rows {
2376+
// For each serialized expr in a boundary row, convert to a Literal
2377+
// expression, then extract the ScalarValue from the Literal and push it
2378+
// into the collection of ScalarValues
2379+
for (col_idx, col_values) in scalar_values
2380+
.iter_mut()
2381+
.enumerate()
2382+
.take(lex_ordering.len())
2383+
{
2384+
let expr = self.create_expr(
2385+
&boundary_row.partition_bounds[col_idx],
2386+
Arc::clone(&input_schema),
2387+
)?;
2388+
let literal_expr =
2389+
expr.as_any().downcast_ref::<Literal>().expect("Literal");
2390+
col_values.push(literal_expr.value().clone());
2391+
}
2392+
}
2393+
2394+
// Convert the collection of ScalarValues to collection of Arrow Arrays
2395+
let arrays: Vec<ArrayRef> = scalar_values
2396+
.iter()
2397+
.map(|scalar_vec| ScalarValue::iter_to_array(scalar_vec.iter().cloned()))
2398+
.collect::<Result<Vec<_>, _>>()?;
2399+
2400+
// Create a RowConverter and use to create OwnedRows from the Arrays
2401+
let converter = RowConverter::new(sort_fields)?;
2402+
let boundary_rows = converter.convert_columns(&arrays)?;
2403+
// Rows are only a view into Arrow Arrays. We need to create OwnedRows with their
2404+
// own internal memory ownership to pass as our boundary values to the partitioner.
2405+
let boundary_owned_rows: Vec<OwnedRow> =
2406+
boundary_rows.iter().map(|row| row.owned()).collect();
2407+
23532408
Ok(CometPartitioning::RangePartitioning(
23542409
lex_ordering,
23552410
range_partition.num_partitions as usize,
2356-
range_partition.sample_size as usize,
2411+
Arc::new(converter),
2412+
boundary_owned_rows,
23572413
))
23582414
}
23592415
PartitioningStruct::SinglePartition(_) => Ok(CometPartitioning::SinglePartition),

native/core/src/execution/shuffle/comet_partitioning.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,30 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use arrow::row::{OwnedRow, RowConverter};
1819
use datafusion::physical_expr::{LexOrdering, PhysicalExpr};
1920
use std::sync::Arc;
2021

2122
#[derive(Debug, Clone)]
2223
pub enum CometPartitioning {
2324
SinglePartition,
2425
/// Allocate rows based on a hash of one of more expressions and the specified number of
25-
/// partitions
26+
/// partitions. Args are 1) the expression to hash on, and 2) the number of partitions.
2627
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
2728
/// Allocate rows based on the lexical order of one of more expressions and the specified number of
28-
/// partitions
29-
RangePartitioning(LexOrdering, usize, usize),
29+
/// partitions. Args are 1) the LexOrdering to use to compare values and split into partitions,
30+
/// 2) the number of partitions, 3) the RowConverter used to view incoming RecordBatches as Arrow
31+
/// Rows for comparing to 4) OwnedRows that represent the boundaries of each partition, used with
32+
/// LexOrdering to bin each value in the RecordBatch to a partition.
33+
RangePartitioning(LexOrdering, usize, Arc<RowConverter>, Vec<OwnedRow>),
3034
}
3135

3236
impl CometPartitioning {
3337
pub fn partition_count(&self) -> usize {
3438
use CometPartitioning::*;
3539
match self {
3640
SinglePartition => 1,
37-
Hash(_, n) | RangePartitioning(_, n, _) => *n,
41+
Hash(_, n) | RangePartitioning(_, n, _, _) => *n,
3842
}
3943
}
4044
}

native/core/src/execution/shuffle/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ pub(crate) mod codec;
1919
mod comet_partitioning;
2020
mod list;
2121
mod map;
22-
mod range_partitioner;
2322
pub mod row;
2423
mod shuffle_writer;
2524

0 commit comments

Comments
 (0)