Skip to content
Draft
2 changes: 1 addition & 1 deletion common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ object CometConf extends ShimCometConf {
"This feature is experimental while we investigate scenarios that don't partition data " +
"correctly.")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Comet provides the following configuration settings.
| spark.comet.memoryOverhead | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. This config is optional. If this is not specified, it will be set to `spark.comet.memory.overhead.factor` * `spark.executor.memory`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | |
| spark.comet.metrics.updateInterval | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 |
| spark.comet.native.shuffle.partitioning.hash.enabled | Whether to enable hash partitioning for Comet native shuffle. | true |
| spark.comet.native.shuffle.partitioning.range.enabled | Experimental feature to enable range partitioning for Comet native shuffle. This feature is experimental while we investigate scenarios that don't partition data correctly. | false |
| spark.comet.native.shuffle.partitioning.range.enabled | Experimental feature to enable range partitioning for Comet native shuffle. This feature is experimental while we investigate scenarios that don't partition data correctly. | true |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. | false |
| spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader, if the read ranges submitted are skewed in sizes, this option will cause the reader to break up larger read ranges into smaller ranges to reduce the skew. This will result in a slightly larger number of connections opened to the file system but may give improved performance. | false |
Expand Down
27 changes: 19 additions & 8 deletions native/core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use arrow::array::builder::{Date32Builder, Decimal128Builder, Int32Builder};
use arrow::array::{builder::StringBuilder, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};
use comet::execution::shuffle::RangePartitioner;
use comet::execution::shuffle::{
CometPartitioning, CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec,
};
Expand All @@ -31,6 +32,7 @@ use datafusion::{
physical_plan::{common::collect, ExecutionPlan},
prelude::SessionContext,
};
use itertools::Itertools;
use std::io::Cursor;
use std::sync::Arc;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -84,16 +86,25 @@ fn criterion_benchmark(c: &mut Criterion) {
);
}

let lex_ordering = LexOrdering::new(vec![PhysicalSortExpr::new_default(
col("c0", batch.schema().as_ref()).unwrap(),
)])
.unwrap();

let (bounds_rows, row_converter) = RangePartitioner::generate_bounds(
&Vec::from(batch.columns()),
&lex_ordering,
16,
batch.num_rows(),
100,
42,
)
.unwrap();
let owned_rows = bounds_rows.iter().map(|row| row.owned()).collect_vec();

for partitioning in [
CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
CometPartitioning::RangePartitioning(
LexOrdering::new(vec![PhysicalSortExpr::new_default(
col("c0", batch.schema().as_ref()).unwrap(),
)])
.unwrap(),
16,
100,
),
CometPartitioning::RangePartitioning(lex_ordering, 16, Arc::new(row_converter), owned_rows),
] {
let compression_codec = CompressionCodec::None;
group.bench_function(
Expand Down
54 changes: 51 additions & 3 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,12 @@ use datafusion::physical_expr::LexOrdering;

use crate::parquet::parquet_exec::init_datasource_exec;
use arrow::array::{
BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, NullArray, StringBuilder,
ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, NullArray, StringBuilder,
TimestampMicrosecondArray,
};
use arrow::buffer::BooleanBuffer;
use arrow::row::{OwnedRow, RowConverter, SortField};
use datafusion::common::utils::SingleRowListArrayBuilder;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::filter::FilterExec;
Expand Down Expand Up @@ -2442,16 +2443,63 @@ impl PhysicalPlanner {
))
}
PartitioningStruct::RangePartition(range_partition) => {
// Generate the lexical ordering for comparisons
let exprs: Result<Vec<PhysicalSortExpr>, ExecutionError> = range_partition
.sort_orders
.iter()
.map(|expr| self.create_sort_expr(expr, Arc::clone(&input_schema)))
.collect();
let lex_ordering = LexOrdering::new(exprs?).unwrap();
let boundary_row_len = lex_ordering.len();

// Generate the row converter for comparing incoming batches to boundary rows
let sort_fields: Vec<SortField> = lex_ordering
.iter()
.map(|sort_expr| {
let data_type = sort_expr.expr.data_type(input_schema.as_ref()).unwrap();
SortField::new_with_options(data_type, sort_expr.options)
})
.collect();

// Deserialize the literals to columnar collections of ScalarValues
let mut scalar_values: Vec<Vec<ScalarValue>> = vec![vec![]; lex_ordering.len()];
range_partition
.boundary_rows
.iter()
.for_each(|boundary_row| {
assert_eq!(boundary_row.partition_bounds.len(), boundary_row_len);
// For each serialized expr in a boundary row, convert to a Literal
// expression, then extract the ScalarValue from the Literal and push it
// into the collection of ScalarValues
boundary_row.partition_bounds.iter().enumerate().for_each(
|(col_idx, literal_expr)| {
// TODO: Is there a quicker/cleaner way to go from serialized expr
// that we know is a literal to a ScalarValue?
let expr = self
.create_expr(literal_expr, Arc::clone(&input_schema))
.unwrap();
let literal_expr = expr.as_any().downcast_ref::<Literal>().unwrap();
scalar_values[col_idx].push(literal_expr.value().clone());
},
);
});

// Convert the collection of ScalarValues to collection of Arrow Arrays
let arrays: Vec<ArrayRef> = scalar_values
.iter()
.map(|scalar_vec| ScalarValue::iter_to_array(scalar_vec.iter().cloned()))
.collect::<Result<Vec<_>, _>>()?;

// Create a RowConverter and use to create OwnedRows from the Arrays
let converter = RowConverter::new(sort_fields)?;
let rows = converter.convert_columns(&arrays)?;
let owned_rows: Vec<OwnedRow> = rows.iter().map(|row| row.owned()).collect();

Ok(CometPartitioning::RangePartitioning(
lex_ordering,
range_partition.num_partitions as usize,
range_partition.sample_size as usize,
Arc::new(converter),
owned_rows,
))
}
PartitioningStruct::SinglePartition(_) => Ok(CometPartitioning::SinglePartition),
Expand Down
5 changes: 3 additions & 2 deletions native/core/src/execution/shuffle/comet_partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow::row::{OwnedRow, RowConverter};
use datafusion::physical_expr::{LexOrdering, PhysicalExpr};
use std::sync::Arc;

Expand All @@ -26,15 +27,15 @@ pub enum CometPartitioning {
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
/// Allocate rows based on the lexical order of one of more expressions and the specified number of
/// partitions
RangePartitioning(LexOrdering, usize, usize),
RangePartitioning(LexOrdering, usize, Arc<RowConverter>, Vec<OwnedRow>),
}

impl CometPartitioning {
pub fn partition_count(&self) -> usize {
use CometPartitioning::*;
match self {
SinglePartition => 1,
Hash(_, n) | RangePartitioning(_, n, _) => *n,
Hash(_, n) | RangePartitioning(_, n, _, _) => *n,
}
}
}
3 changes: 2 additions & 1 deletion native/core/src/execution/shuffle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ pub(crate) mod codec;
mod comet_partitioning;
mod list;
mod map;
mod range_partitioner;
pub(crate) mod range_partitioner;
pub mod row;
mod shuffle_writer;

pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter};
pub use comet_partitioning::CometPartitioning;
pub use range_partitioner::RangePartitioner;
pub use shuffle_writer::ShuffleWriterExec;
3 changes: 3 additions & 0 deletions native/core/src/execution/shuffle/range_partitioner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl RangePartitioner {
/// We use sample_size instead of k and num_rows instead of n.
/// We use indices instead of actual values in the reservoir since we'll do one take() on the
/// input arrays at the end.
#[allow(dead_code)]
fn reservoir_sample_indices(num_rows: usize, sample_size: usize, seed: u64) -> Vec<u64> {
assert!(sample_size > 0);
assert!(
Expand Down Expand Up @@ -76,6 +77,7 @@ impl RangePartitioner {
/// Given input arrays and range partitioning metadata: samples the input arrays, generates
/// partition bounds, and returns Rows (for comparison against) and a RowConverter (for
/// adapting future incoming batches).
#[allow(dead_code)]
pub fn generate_bounds(
partition_arrays: &Vec<ArrayRef>,
lex_ordering: &LexOrdering,
Expand Down Expand Up @@ -143,6 +145,7 @@ impl RangePartitioner {
/// values since we don't have cross-partition samples to merge.
/// We normalize the math to use ints instead of floating point by replacing 1.0 with a
/// (imagined) num_candidates * partitions range.
#[allow(dead_code)]
fn determine_bounds_for_rows(
sort_fields: Vec<SortField>,
sampled_columns: &[ArrayRef],
Expand Down
77 changes: 39 additions & 38 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::execution::shuffle::range_partitioner::RangePartitioner;
use crate::execution::shuffle::{CometPartitioning, CompressionCodec, ShuffleBlockWriter};
use crate::execution::tracing::{with_trace, with_trace_async};
use arrow::compute::interleave_record_batch;
use arrow::row::{OwnedRow, RowConverter};
use async_trait::async_trait;
use datafusion::common::utils::proxy::VecAllocExt;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
Expand Down Expand Up @@ -333,10 +332,6 @@ struct MultiPartitionShuffleRepartitioner {
/// Reservation for repartitioning
reservation: MemoryReservation,
tracing_enabled: bool,
/// RangePartitioning-specific state
bounds_rows: Option<Vec<OwnedRow>>,
row_converter: Option<RowConverter>,
seed: u64,
}

#[derive(Default)]
Expand Down Expand Up @@ -413,10 +408,6 @@ impl MultiPartitionShuffleRepartitioner {
batch_size,
reservation,
tracing_enabled,
bounds_rows: None,
row_converter: None,
// Spark RangePartitioner seeds off of partition number.
seed: partition as u64,
})
}

Expand Down Expand Up @@ -546,7 +537,8 @@ impl MultiPartitionShuffleRepartitioner {
CometPartitioning::RangePartitioning(
lex_ordering,
num_output_partitions,
sample_size,
row_converter,
bounds,
) => {
let mut scratch = std::mem::take(&mut self.scratch);
let (partition_starts, partition_row_indices): (&Vec<u32>, &Vec<u32>) = {
Expand All @@ -560,35 +552,14 @@ impl MultiPartitionShuffleRepartitioner {

let num_rows = arrays[0].len();

// If necessary (i.e., when first batch arrives) generate the bounds (as Rows)
// for range partitioning based on randomly reservoir sampling the batch.
if self.row_converter.is_none() {
let (bounds_rows, row_converter) = RangePartitioner::generate_bounds(
&arrays,
lex_ordering,
*num_output_partitions,
input.num_rows(),
*sample_size,
self.seed,
)?;

self.bounds_rows =
Some(bounds_rows.iter().map(|row| row.owned()).collect_vec());
self.row_converter = Some(row_converter);
}

// Generate partition ids for every row, first by converting the partition
// arrays to Rows, and then doing binary search for each Row against the
// bounds Rows.
let row_batch = self
.row_converter
.as_ref()
.unwrap()
.convert_columns(arrays.as_slice())?;
let row_batch = row_converter.convert_columns(arrays.as_slice())?;

RangePartitioner::partition_indices_for_batch(
&row_batch,
self.bounds_rows.as_ref().unwrap().as_slice(),
bounds.as_slice(),
&mut scratch.partition_ids[..num_rows],
);

Expand Down Expand Up @@ -1278,6 +1249,7 @@ mod test {
use super::*;
use crate::execution::shuffle::read_ipc_compressed;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::row::{RowConverter, SortField};
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::config::SessionConfig;
Expand Down Expand Up @@ -1404,15 +1376,44 @@ mod test {
) {
let batch = create_batch(batch_size);

let lex_ordering = LexOrdering::new(vec![PhysicalSortExpr::new_default(
col("a", batch.schema().as_ref()).unwrap(),
)])
.unwrap();

let (owned_rows, row_converter) = if num_partitions == 1 {
let sort_fields: Vec<SortField> = batch
.columns()
.iter()
.zip(&lex_ordering)
.map(|(array, sort_expr)| {
SortField::new_with_options(array.data_type().clone(), sort_expr.options)
})
.collect();
(vec![], RowConverter::new(sort_fields).unwrap())
} else {
let (bounds_rows, row_converter) = RangePartitioner::generate_bounds(
&Vec::from(batch.columns()),
&lex_ordering,
num_partitions,
batch_size,
100,
42,
)
.unwrap();
(
bounds_rows.iter().map(|row| row.owned()).collect_vec(),
row_converter,
)
};

for partitioning in [
CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions),
CometPartitioning::RangePartitioning(
LexOrdering::new(vec![PhysicalSortExpr::new_default(
col("a", batch.schema().as_ref()).unwrap(),
)])
.unwrap(),
lex_ordering,
num_partitions,
100,
Arc::new(row_converter),
owned_rows,
),
] {
let batches = (0..num_batches).map(|_| batch.clone()).collect::<Vec<_>>();
Expand Down
6 changes: 5 additions & 1 deletion native/proto/src/proto/partitioning.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ message HashPartition {
message SinglePartition {
}

message BoundaryRow {
repeated spark.spark_expression.Expr partition_bounds = 1;
}

message RangePartition {
repeated spark.spark_expression.Expr sort_orders = 1;
int32 num_partitions = 2;
int32 sample_size = 3;
repeated BoundaryRow boundary_rows = 4;
}
Loading
Loading