diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 367af5f199..8f6a833f4b 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -332,7 +332,7 @@ object CometConf extends ShimCometConf { "This feature is experimental while we investigate scenarios that don't partition data " + "correctly.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec") diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 434c1934fb..fe35d7fdbd 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -74,7 +74,7 @@ Comet provides the following configuration settings. | spark.comet.memoryOverhead | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. This config is optional. If this is not specified, it will be set to `spark.comet.memory.overhead.factor` * `spark.executor.memory`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | | | spark.comet.metrics.updateInterval | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 | | spark.comet.native.shuffle.partitioning.hash.enabled | Whether to enable hash partitioning for Comet native shuffle. | true | -| spark.comet.native.shuffle.partitioning.range.enabled | Experimental feature to enable range partitioning for Comet native shuffle. This feature is experimental while we investigate scenarios that don't partition data correctly. | false | +| spark.comet.native.shuffle.partitioning.range.enabled | Experimental feature to enable range partitioning for Comet native shuffle. This feature is experimental while we investigate scenarios that don't partition data correctly. | true | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | | spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. | false | | spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader, if the read ranges submitted are skewed in sizes, this option will cause the reader to break up larger read ranges into smaller ranges to reduce the skew. This will result in a slightly larger number of connections opened to the file system but may give improved performance. | false | diff --git a/native/core/benches/shuffle_writer.rs b/native/core/benches/shuffle_writer.rs index 52638d92a9..13dba9c06c 100644 --- a/native/core/benches/shuffle_writer.rs +++ b/native/core/benches/shuffle_writer.rs @@ -18,6 +18,7 @@ use arrow::array::builder::{Date32Builder, Decimal128Builder, Int32Builder}; use arrow::array::{builder::StringBuilder, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema}; +use comet::execution::shuffle::RangePartitioner; use comet::execution::shuffle::{ CometPartitioning, CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec, }; @@ -31,6 +32,7 @@ use datafusion::{ physical_plan::{common::collect, ExecutionPlan}, prelude::SessionContext, }; +use itertools::Itertools; use std::io::Cursor; use std::sync::Arc; use tokio::runtime::Runtime; @@ -84,16 +86,25 @@ fn criterion_benchmark(c: &mut Criterion) { ); } + let lex_ordering = LexOrdering::new(vec![PhysicalSortExpr::new_default( + col("c0", batch.schema().as_ref()).unwrap(), + )]) + .unwrap(); + + let (bounds_rows, row_converter) = RangePartitioner::generate_bounds( + &Vec::from(batch.columns()), + &lex_ordering, + 16, + batch.num_rows(), + 100, + 42, + ) + .unwrap(); + let owned_rows = bounds_rows.iter().map(|row| row.owned()).collect_vec(); + for partitioning in [ CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), - CometPartitioning::RangePartitioning( - LexOrdering::new(vec![PhysicalSortExpr::new_default( - col("c0", batch.schema().as_ref()).unwrap(), - )]) - .unwrap(), - 16, - 100, - ), + CometPartitioning::RangePartitioning(lex_ordering, 16, Arc::new(row_converter), owned_rows), ] { let compression_codec = CompressionCodec::None; group.bench_function( diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 0c3d345c8e..b265a0c9dc 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -85,11 +85,12 @@ use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; use arrow::array::{ - BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, - Int16Array, Int32Array, Int64Array, Int8Array, NullArray, StringBuilder, + ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, + Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, NullArray, StringBuilder, TimestampMicrosecondArray, }; use arrow::buffer::BooleanBuffer; +use arrow::row::{OwnedRow, RowConverter, SortField}; use datafusion::common::utils::SingleRowListArrayBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::filter::FilterExec; @@ -2442,16 +2443,63 @@ impl PhysicalPlanner { )) } PartitioningStruct::RangePartition(range_partition) => { + // Generate the lexical ordering for comparisons let exprs: Result, ExecutionError> = range_partition .sort_orders .iter() .map(|expr| self.create_sort_expr(expr, Arc::clone(&input_schema))) .collect(); let lex_ordering = LexOrdering::new(exprs?).unwrap(); + let boundary_row_len = lex_ordering.len(); + + // Generate the row converter for comparing incoming batches to boundary rows + let sort_fields: Vec = lex_ordering + .iter() + .map(|sort_expr| { + let data_type = sort_expr.expr.data_type(input_schema.as_ref()).unwrap(); + SortField::new_with_options(data_type, sort_expr.options) + }) + .collect(); + + // Deserialize the literals to columnar collections of ScalarValues + let mut scalar_values: Vec> = vec![vec![]; lex_ordering.len()]; + range_partition + .boundary_rows + .iter() + .for_each(|boundary_row| { + assert_eq!(boundary_row.partition_bounds.len(), boundary_row_len); + // For each serialized expr in a boundary row, convert to a Literal + // expression, then extract the ScalarValue from the Literal and push it + // into the collection of ScalarValues + boundary_row.partition_bounds.iter().enumerate().for_each( + |(col_idx, literal_expr)| { + // TODO: Is there a quicker/cleaner way to go from serialized expr + // that we know is a literal to a ScalarValue? + let expr = self + .create_expr(literal_expr, Arc::clone(&input_schema)) + .unwrap(); + let literal_expr = expr.as_any().downcast_ref::().unwrap(); + scalar_values[col_idx].push(literal_expr.value().clone()); + }, + ); + }); + + // Convert the collection of ScalarValues to collection of Arrow Arrays + let arrays: Vec = scalar_values + .iter() + .map(|scalar_vec| ScalarValue::iter_to_array(scalar_vec.iter().cloned())) + .collect::, _>>()?; + + // Create a RowConverter and use to create OwnedRows from the Arrays + let converter = RowConverter::new(sort_fields)?; + let rows = converter.convert_columns(&arrays)?; + let owned_rows: Vec = rows.iter().map(|row| row.owned()).collect(); + Ok(CometPartitioning::RangePartitioning( lex_ordering, range_partition.num_partitions as usize, - range_partition.sample_size as usize, + Arc::new(converter), + owned_rows, )) } PartitioningStruct::SinglePartition(_) => Ok(CometPartitioning::SinglePartition), diff --git a/native/core/src/execution/shuffle/comet_partitioning.rs b/native/core/src/execution/shuffle/comet_partitioning.rs index 9c33da8e93..90f9ef9327 100644 --- a/native/core/src/execution/shuffle/comet_partitioning.rs +++ b/native/core/src/execution/shuffle/comet_partitioning.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use arrow::row::{OwnedRow, RowConverter}; use datafusion::physical_expr::{LexOrdering, PhysicalExpr}; use std::sync::Arc; @@ -26,7 +27,7 @@ pub enum CometPartitioning { Hash(Vec>, usize), /// Allocate rows based on the lexical order of one of more expressions and the specified number of /// partitions - RangePartitioning(LexOrdering, usize, usize), + RangePartitioning(LexOrdering, usize, Arc, Vec), } impl CometPartitioning { @@ -34,7 +35,7 @@ impl CometPartitioning { use CometPartitioning::*; match self { SinglePartition => 1, - Hash(_, n) | RangePartitioning(_, n, _) => *n, + Hash(_, n) | RangePartitioning(_, n, _, _) => *n, } } } diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index c3cb610afa..71869066fe 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -19,10 +19,11 @@ pub(crate) mod codec; mod comet_partitioning; mod list; mod map; -mod range_partitioner; +pub(crate) mod range_partitioner; pub mod row; mod shuffle_writer; pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter}; pub use comet_partitioning::CometPartitioning; +pub use range_partitioner::RangePartitioner; pub use shuffle_writer::ShuffleWriterExec; diff --git a/native/core/src/execution/shuffle/range_partitioner.rs b/native/core/src/execution/shuffle/range_partitioner.rs index e14cf61482..5dabb45635 100644 --- a/native/core/src/execution/shuffle/range_partitioner.rs +++ b/native/core/src/execution/shuffle/range_partitioner.rs @@ -31,6 +31,7 @@ impl RangePartitioner { /// We use sample_size instead of k and num_rows instead of n. /// We use indices instead of actual values in the reservoir since we'll do one take() on the /// input arrays at the end. + #[allow(dead_code)] fn reservoir_sample_indices(num_rows: usize, sample_size: usize, seed: u64) -> Vec { assert!(sample_size > 0); assert!( @@ -76,6 +77,7 @@ impl RangePartitioner { /// Given input arrays and range partitioning metadata: samples the input arrays, generates /// partition bounds, and returns Rows (for comparison against) and a RowConverter (for /// adapting future incoming batches). + #[allow(dead_code)] pub fn generate_bounds( partition_arrays: &Vec, lex_ordering: &LexOrdering, @@ -143,6 +145,7 @@ impl RangePartitioner { /// values since we don't have cross-partition samples to merge. /// We normalize the math to use ints instead of floating point by replacing 1.0 with a /// (imagined) num_candidates * partitions range. + #[allow(dead_code)] fn determine_bounds_for_rows( sort_fields: Vec, sampled_columns: &[ArrayRef], diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index fe303618c3..0c6adb1f8b 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -21,7 +21,6 @@ use crate::execution::shuffle::range_partitioner::RangePartitioner; use crate::execution::shuffle::{CometPartitioning, CompressionCodec, ShuffleBlockWriter}; use crate::execution::tracing::{with_trace, with_trace_async}; use arrow::compute::interleave_record_batch; -use arrow::row::{OwnedRow, RowConverter}; use async_trait::async_trait; use datafusion::common::utils::proxy::VecAllocExt; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; @@ -333,10 +332,6 @@ struct MultiPartitionShuffleRepartitioner { /// Reservation for repartitioning reservation: MemoryReservation, tracing_enabled: bool, - /// RangePartitioning-specific state - bounds_rows: Option>, - row_converter: Option, - seed: u64, } #[derive(Default)] @@ -413,10 +408,6 @@ impl MultiPartitionShuffleRepartitioner { batch_size, reservation, tracing_enabled, - bounds_rows: None, - row_converter: None, - // Spark RangePartitioner seeds off of partition number. - seed: partition as u64, }) } @@ -546,7 +537,8 @@ impl MultiPartitionShuffleRepartitioner { CometPartitioning::RangePartitioning( lex_ordering, num_output_partitions, - sample_size, + row_converter, + bounds, ) => { let mut scratch = std::mem::take(&mut self.scratch); let (partition_starts, partition_row_indices): (&Vec, &Vec) = { @@ -560,35 +552,14 @@ impl MultiPartitionShuffleRepartitioner { let num_rows = arrays[0].len(); - // If necessary (i.e., when first batch arrives) generate the bounds (as Rows) - // for range partitioning based on randomly reservoir sampling the batch. - if self.row_converter.is_none() { - let (bounds_rows, row_converter) = RangePartitioner::generate_bounds( - &arrays, - lex_ordering, - *num_output_partitions, - input.num_rows(), - *sample_size, - self.seed, - )?; - - self.bounds_rows = - Some(bounds_rows.iter().map(|row| row.owned()).collect_vec()); - self.row_converter = Some(row_converter); - } - // Generate partition ids for every row, first by converting the partition // arrays to Rows, and then doing binary search for each Row against the // bounds Rows. - let row_batch = self - .row_converter - .as_ref() - .unwrap() - .convert_columns(arrays.as_slice())?; + let row_batch = row_converter.convert_columns(arrays.as_slice())?; RangePartitioner::partition_indices_for_batch( &row_batch, - self.bounds_rows.as_ref().unwrap().as_slice(), + bounds.as_slice(), &mut scratch.partition_ids[..num_rows], ); @@ -1278,6 +1249,7 @@ mod test { use super::*; use crate::execution::shuffle::read_ipc_compressed; use arrow::datatypes::{DataType, Field, Schema}; + use arrow::row::{RowConverter, SortField}; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; use datafusion::execution::config::SessionConfig; @@ -1404,15 +1376,44 @@ mod test { ) { let batch = create_batch(batch_size); + let lex_ordering = LexOrdering::new(vec![PhysicalSortExpr::new_default( + col("a", batch.schema().as_ref()).unwrap(), + )]) + .unwrap(); + + let (owned_rows, row_converter) = if num_partitions == 1 { + let sort_fields: Vec = batch + .columns() + .iter() + .zip(&lex_ordering) + .map(|(array, sort_expr)| { + SortField::new_with_options(array.data_type().clone(), sort_expr.options) + }) + .collect(); + (vec![], RowConverter::new(sort_fields).unwrap()) + } else { + let (bounds_rows, row_converter) = RangePartitioner::generate_bounds( + &Vec::from(batch.columns()), + &lex_ordering, + num_partitions, + batch_size, + 100, + 42, + ) + .unwrap(); + ( + bounds_rows.iter().map(|row| row.owned()).collect_vec(), + row_converter, + ) + }; + for partitioning in [ CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), CometPartitioning::RangePartitioning( - LexOrdering::new(vec![PhysicalSortExpr::new_default( - col("a", batch.schema().as_ref()).unwrap(), - )]) - .unwrap(), + lex_ordering, num_partitions, - 100, + Arc::new(row_converter), + owned_rows, ), ] { let batches = (0..num_batches).map(|_| batch.clone()).collect::>(); diff --git a/native/proto/src/proto/partitioning.proto b/native/proto/src/proto/partitioning.proto index ea0e586fb1..e11d7a384b 100644 --- a/native/proto/src/proto/partitioning.proto +++ b/native/proto/src/proto/partitioning.proto @@ -42,8 +42,12 @@ message HashPartition { message SinglePartition { } +message BoundaryRow { + repeated spark.spark_expression.Expr partition_bounds = 1; +} + message RangePartition { repeated spark.spark_expression.Expr sort_orders = 1; int32 num_partitions = 2; - int32 sample_size = 3; + repeated BoundaryRow boundary_rows = 4; } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 0dcd3828a5..63396f63b4 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -759,7 +759,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { * operations like `groupByKey`, `reduceByKey` or `join`. Native code does not support hashing * complex types, see hash_funcs/utils.rs */ - def supportedPartitioningDataType(dt: DataType): Boolean = dt match { + def supportedHashPartitioningDataType(dt: DataType): Boolean = dt match { case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType | _: DecimalType | _: DateType => @@ -768,6 +768,15 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { false } + def supportedRangePartitioningDataType(dt: DataType): Boolean = dt match { + case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: LongType | + _: FloatType | _: DoubleType | _: TimestampType | _: TimestampNTZType | _: DecimalType | + _: DateType => + true + case _ => + false + } + /** * Determine which data types are supported as data columns in native shuffle. * @@ -832,7 +841,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { } } for (dt <- expressions.map(_.dataType).distinct) { - if (!supportedPartitioningDataType(dt)) { + if (!supportedHashPartitioningDataType(dt)) { withInfo(s, s"unsupported hash partitioning data type for native shuffle: $dt") supported = false } @@ -858,7 +867,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { } } for (dt <- orderings.map(_.dataType).distinct) { - if (!supportedPartitioningDataType(dt)) { + if (!supportedRangePartitioningDataType(dt)) { withInfo(s, s"unsupported range partitioning data type for native shuffle: $dt") supported = false } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 5d772be403..b11f9fead9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -28,11 +28,11 @@ import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriteMetricsReporter, ShuffleWriter} -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, SinglePartition} import org.apache.spark.sql.comet.{CometExec, CometMetricNode} import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.comet.CometConf @@ -51,7 +51,8 @@ class CometNativeShuffleWriter[K, V]( shuffleId: Int, mapId: Long, context: TaskContext, - metricsReporter: ShuffleWriteMetricsReporter) + metricsReporter: ShuffleWriteMetricsReporter, + rangePartitionBounds: Option[Seq[InternalRow]] = None) extends ShuffleWriter[K, V] with Logging { @@ -140,6 +141,22 @@ class CometNativeShuffleWriter[K, V]( MapStatus.apply(SparkEnv.get.blockManager.shuffleServerId, partitionLengths, mapId) } + private def isSinglePartitioning(p: Partitioning): Boolean = p match { + case SinglePartition => true + case rp: RangePartitioning => + // Spark sometimes generates RangePartitioning schemes with numPartitions == 1, + // or the computed bounds results in a single target partition. + // In this case Comet just serializes a SinglePartition scheme to native. + if ((rp.numPartitions == 1) || rangePartitionBounds.isEmpty || + rangePartitionBounds.get.isEmpty) { + true + } else { + false + } + case hp: HashPartitioning => hp.numPartitions == 1 + case _ => false + } + private def getNativePlan(dataFile: String, indexFile: String): Operator = { val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("ShuffleWriterInput") val opBuilder = OperatorOuterClass.Operator.newBuilder() @@ -170,6 +187,12 @@ class CometNativeShuffleWriter[K, V]( CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL.get) outputPartitioning match { + case p if isSinglePartitioning(p) => + val partitioning = PartitioningOuterClass.SinglePartition.newBuilder() + + val partitioningBuilder = PartitioningOuterClass.Partitioning.newBuilder() + shuffleWriterBuilder.setPartitioning( + partitioningBuilder.setSinglePartition(partitioning).build()) case _: HashPartitioning => val hashPartitioning = outputPartitioning.asInstanceOf[HashPartitioning] @@ -194,45 +217,48 @@ class CometNativeShuffleWriter[K, V]( val partitioning = PartitioningOuterClass.RangePartition.newBuilder() partitioning.setNumPartitions(outputPartitioning.numPartitions) - val sampleSize = { - // taken from org.apache.spark.RangePartitioner#rangeBounds - // This is the sample size we need to have roughly balanced output partitions, - // capped at 1M. - // Cast to double to avoid overflowing ints or longs - val sampleSize = math.min( - SQLConf.get - .getConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION) - .toDouble * outputPartitioning.numPartitions, - 1e6) - // Assume the input partitions are roughly balanced and over-sample a little bit. - // Comet: we don't divide by numPartitions since each DF plan handles one partition. - math.ceil(3.0 * sampleSize).toInt - } - if (sampleSize > 8192) { - logWarning( - s"RangePartitioning sampleSize of s$sampleSize exceeds Comet RecordBatch size.") - } - partitioning.setSampleSize(sampleSize) - - val orderingExprs = rangePartitioning.ordering - .flatMap(e => QueryPlanSerde.exprToProto(e, outputAttributes)) - if (orderingExprs.length != rangePartitioning.ordering.length) { - throw new UnsupportedOperationException( - s"Partitioning $rangePartitioning is not supported.") + { + // Serialize the ordering expressions for comparisons + val orderingExprs = rangePartitioning.ordering + .flatMap(e => QueryPlanSerde.exprToProto(e, outputAttributes)) + if (orderingExprs.length != rangePartitioning.ordering.length) { + throw new UnsupportedOperationException( + s"Partitioning $rangePartitioning is not supported.") + } + partitioning.addAllSortOrders(orderingExprs.asJava) } - partitioning.addAllSortOrders(orderingExprs.asJava) + // Convert Spark's sequence of InternalRows that represent partitioning boundaries to + // sequences of Literals, where each outer entry represents a boundary row, and each + // internal entry is a value in that row. In other words, these are stored in row major + // order, not column major + val boundarySchema = rangePartitioning.ordering.flatMap(e => Some(e.dataType)) + val boundaryExprs: Seq[Seq[Literal]] = + rangePartitionBounds.get.map((row: InternalRow) => + // For every InternalRow, map its values to Literals to ao collection of Literals + row.toSeq(boundarySchema).zip(boundarySchema).map { case (value, valueType) => + Literal(value, valueType) + }) + + { + // Convert the sequences of Literals to a collection of serialized BoundaryRows + val boundaryRows: Seq[PartitioningOuterClass.BoundaryRow] = boundaryExprs + .map((rowLiterals: Seq[Literal]) => { + // Serialize each sequence of Literals as a BoundaryRow + val rowBuilder = PartitioningOuterClass.BoundaryRow.newBuilder(); + val serializedExprs = + rowLiterals.map(lit_value => + QueryPlanSerde.exprToProto(lit_value, outputAttributes).get) + rowBuilder.addAllPartitionBounds(serializedExprs.asJava) + rowBuilder.build() + }) + partitioning.addAllBoundaryRows(boundaryRows.asJava) + } val partitioningBuilder = PartitioningOuterClass.Partitioning.newBuilder() shuffleWriterBuilder.setPartitioning( partitioningBuilder.setRangePartition(partitioning).build()) - case SinglePartition => - val partitioning = PartitioningOuterClass.SinglePartition.newBuilder() - - val partitioningBuilder = PartitioningOuterClass.Partitioning.newBuilder() - shuffleWriterBuilder.setPartitioning( - partitioningBuilder.setSinglePartition(partitioning).build()) case _ => throw new UnsupportedOperationException( diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleDependency.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleDependency.scala index ff35b10eb6..2b74e5a168 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleDependency.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleDependency.scala @@ -25,6 +25,7 @@ import org.apache.spark.{Aggregator, Partitioner, ShuffleDependency, SparkEnv} import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleWriteProcessor +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.metric.SQLMetric @@ -47,7 +48,8 @@ class CometShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val outputPartitioning: Option[Partitioning] = None, val outputAttributes: Seq[Attribute] = Seq.empty, val shuffleWriteMetrics: Map[String, SQLMetric] = Map.empty, - val numParts: Int = 0) + val numParts: Int = 0, + val rangePartitionBounds: Option[Seq[InternalRow]] = None) extends ShuffleDependency[K, V, C]( _rdd, partitioner, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index df67e7b7f2..fe576b186e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -21,6 +21,7 @@ package org.apache.spark.sql.comet.execution.shuffle import java.util.function.Supplier +import scala.collection.JavaConverters.asScalaIteratorConverter import scala.concurrent.Future import org.apache.spark._ @@ -35,8 +36,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.comet.{CometMetricNode, CometPlan} import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike, ShuffleOrigin} -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch @@ -211,6 +211,59 @@ case class CometShuffleExchangeExec( } object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { + + /** + * Computes range partition bounds by sampling across all partitions. This ensures all executors + * use the same partition boundaries for range partitioning. + */ + def computeRangePartitionBounds( + rdd: RDD[ColumnarBatch], + rangePartitioning: RangePartitioning, + outputAttributes: Seq[Attribute]): Seq[InternalRow] = { + + // The code block below is mostly brought over from + // ShuffleExchangeExec::prepareShuffleDependency and modified for columnar batches + val rangePartitioner = { + // Extract only fields used for sorting to avoid collecting large fields that does not + // affect sorting result when deciding partition bounds in RangePartitioner + val rddForSampling = rdd.mapPartitionsInternal { iter => + val projection = + UnsafeProjection.create(rangePartitioning.ordering.map(_.child), outputAttributes) + val mutablePair = new MutablePair[InternalRow, Null]() + + // Internally, RangePartitioner runs a job on the RDD that samples keys to compute + // partition bounds. To get accurate samples, we need to copy the mutable keys. + iter.flatMap { batch => + val rowIter = batch.rowIterator().asScala + rowIter.map { row => + mutablePair.update(projection(row).copy(), null) + } + } + } + + // Construct ordering on extracted sort key. + val orderingAttributes = rangePartitioning.ordering.zipWithIndex.map { case (ord, i) => + ord.copy(child = BoundReference(i, ord.dataType, ord.nullable)) + } + implicit val ordering = new LazilyGeneratedOrdering(orderingAttributes) + // Use Spark's RangePartitioner to compute bounds from global samples + new RangePartitioner( + rangePartitioning.numPartitions, + rddForSampling, + ascending = true, + samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition) + } + + // Use reflection to access the private rangeBounds field + val rangePartitionerClass = rangePartitioner.getClass + val rangeBoundsField = rangePartitionerClass.getDeclaredField("rangeBounds") + rangeBoundsField.setAccessible(true) + val rangeBounds = + rangeBoundsField.get(rangePartitioner).asInstanceOf[Array[InternalRow]].toSeq + + rangeBounds + } + def prepareShuffleDependency( rdd: RDD[ColumnarBatch], outputAttributes: Seq[Attribute], @@ -218,6 +271,14 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { serializer: Serializer, metrics: Map[String, SQLMetric]): ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = { val numParts = rdd.getNumPartitions + + // Compute range partition bounds for range partitioning + val rangePartitionBounds: Option[Seq[InternalRow]] = outputPartitioning match { + case rangePartitioning: RangePartitioning => + Some(computeRangePartitionBounds(rdd, rangePartitioning, outputAttributes)) + case _ => None + } + val dependency = new CometShuffleDependency[Int, ColumnarBatch, ColumnarBatch]( rdd.map( (0, _) @@ -227,13 +288,15 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { shuffleType = CometNativeShuffle, partitioner = new Partitioner { override def numPartitions: Int = outputPartitioning.numPartitions + override def getPartition(key: Any): Int = key.asInstanceOf[Int] }, decodeTime = metrics("decode_time"), outputPartitioning = Some(outputPartitioning), outputAttributes = outputAttributes, shuffleWriteMetrics = metrics, - numParts = numParts) + numParts = numParts, + rangePartitionBounds = rangePartitionBounds) dependency } @@ -323,6 +386,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning. } + def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match { case RoundRobinPartitioning(numPartitions) => // Distributes elements evenly across output partitions, starting from a random partition. @@ -375,6 +439,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { // limited range. val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix + override def computePrefix( row: InternalRow): UnsafeExternalRowSorter.PrefixComputer.Prefix = { // The hashcode generated from the binary form of a [[UnsafeRow]] should not be null. diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala index 1142c6af17..ebc481fc85 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala @@ -209,7 +209,8 @@ class CometShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { dep.shuffleId, mapId, context, - metrics) + metrics, + dep.rangePartitionBounds) case bypassMergeSortHandle: CometBypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new CometBypassMergeSortShuffleWriter( env.blockManager, diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index ed250e141c..31d9c6cd94 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -253,7 +253,19 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { df2.collect() if (usingDataSourceExec) { val cometShuffles = collectCometShuffleExchanges(df2.queryExecution.executedPlan) - assert(1 == cometShuffles.length) + val expectedNumCometShuffles = CometConf.COMET_NATIVE_SCAN_IMPL.get() match { + case CometConf.SCAN_NATIVE_COMET => + // native_comet does not support reading complex types + 0 + case CometConf.SCAN_NATIVE_ICEBERG_COMPAT | CometConf.SCAN_NATIVE_DATAFUSION => + CometConf.COMET_SHUFFLE_MODE.get() match { + case "jvm" => + 1 + case "native" => + 2 + } + } + assert(cometShuffles.length == expectedNumCometShuffles) } } @@ -380,6 +392,7 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { scanImpl => super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: _*) { withSQLConf( + CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "true", CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl, CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index 5e2abd7c33..624375846e 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.Tag import org.apache.hadoop.fs.Path import org.apache.spark.SparkEnv -import org.apache.spark.sql.{CometTestBase, DataFrame} +import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.col @@ -40,6 +40,7 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "native", + CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { testFun } @@ -248,6 +249,57 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } } + // This adapts the PySpark example in https://github.com/apache/datafusion-comet/issues/1906 to + // test for incorrect partition values after native RangePartitioning + test("fix: range partitioning #1906") { + withSQLConf(CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "true") { + withParquetTable((0 until 100000).map(i => (i, i + 1)), "tbl") { + val df = sql("SELECT * from tbl") + + // Repartition with two sort columns + val df_range_partitioned = df.repartitionByRange(10, df.col("_1"), df.col("_2")) + + val partition_bounds = df_range_partitioned.rdd + .mapPartitionsWithIndex((idx: Int, iterator: Iterator[Row]) => { + // Find the min and max value in each partition + var min: Option[Int] = None + var max: Option[Int] = None + iterator.foreach((row: Row) => { + val row_val = row.get(0).asInstanceOf[Int] + if (min.isEmpty || row_val < min.get) { + min = Some(row_val) + } + if (max.isEmpty || row_val > max.get) { + max = Some(row_val) + } + }) + Iterator.single((idx, min, max)) + }) + .collect() + + // Check min and max values in each partition + for (i <- partition_bounds.indices.init) { + val currentPartition = partition_bounds(i) + val nextPartition = partition_bounds(i + 1) + + if (currentPartition._2.isDefined && currentPartition._3.isDefined) { + val currentMin = currentPartition._2.get + val currentMax = currentPartition._3.get + assert(currentMin <= currentMax) + } + + if (currentPartition._3.isDefined && nextPartition._2.isDefined) { + val currentMax = currentPartition._3.get + val nextMin = nextPartition._2.get + assert(currentMax <= nextMin) + } + } + + } + } + + } + /** * Checks that `df` produces the same answer as Spark does, and has the `expectedNum` Comet * exchange operators. When `checkNativeOperators` is true, this also checks that all operators