From 709d6e936bbd831043502e5d7ab74824ecba9800 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 28 Aug 2025 10:35:53 -0400 Subject: [PATCH 1/9] Use Spark's RangePartitioning to compute boundary rows and serialize for native shuffle to consume. Added new test to represent #1906. --- .../scala/org/apache/comet/CometConf.scala | 2 +- docs/source/user-guide/configs.md | 2 +- native/core/benches/shuffle_writer.rs | 31 ++++++-- native/core/src/execution/planner.rs | 54 ++++++++++++- .../execution/shuffle/comet_partitioning.rs | 5 +- native/core/src/execution/shuffle/mod.rs | 2 +- .../src/execution/shuffle/shuffle_writer.rs | 77 ++++++++++--------- native/proto/src/proto/partitioning.proto | 6 +- .../shuffle/CometNativeShuffleWriter.scala | 67 +++++++++------- .../shuffle/CometShuffleDependency.scala | 4 +- .../shuffle/CometShuffleExchangeExec.scala | 60 ++++++++++++++- .../shuffle/CometShuffleManager.scala | 3 +- .../org/apache/comet/CometFuzzTestSuite.scala | 16 +++- .../comet/exec/CometNativeShuffleSuite.scala | 54 ++++++++++++- 14 files changed, 294 insertions(+), 89 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 367af5f199..8f6a833f4b 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -332,7 +332,7 @@ object CometConf extends ShimCometConf { "This feature is experimental while we investigate scenarios that don't partition data " + "correctly.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec") diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 434c1934fb..fe35d7fdbd 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -74,7 +74,7 @@ Comet provides the following configuration settings. | spark.comet.memoryOverhead | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. This config is optional. If this is not specified, it will be set to `spark.comet.memory.overhead.factor` * `spark.executor.memory`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | | | spark.comet.metrics.updateInterval | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 | | spark.comet.native.shuffle.partitioning.hash.enabled | Whether to enable hash partitioning for Comet native shuffle. | true | -| spark.comet.native.shuffle.partitioning.range.enabled | Experimental feature to enable range partitioning for Comet native shuffle. This feature is experimental while we investigate scenarios that don't partition data correctly. | false | +| spark.comet.native.shuffle.partitioning.range.enabled | Experimental feature to enable range partitioning for Comet native shuffle. This feature is experimental while we investigate scenarios that don't partition data correctly. | true | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | | spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. | false | | spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader, if the read ranges submitted are skewed in sizes, this option will cause the reader to break up larger read ranges into smaller ranges to reduce the skew. This will result in a slightly larger number of connections opened to the file system but may give improved performance. | false | diff --git a/native/core/benches/shuffle_writer.rs b/native/core/benches/shuffle_writer.rs index 52638d92a9..2795254ac7 100644 --- a/native/core/benches/shuffle_writer.rs +++ b/native/core/benches/shuffle_writer.rs @@ -18,6 +18,7 @@ use arrow::array::builder::{Date32Builder, Decimal128Builder, Int32Builder}; use arrow::array::{builder::StringBuilder, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema}; +use comet::execution::shuffle::range_partitioner::RangePartitioner; use comet::execution::shuffle::{ CometPartitioning, CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec, }; @@ -84,16 +85,30 @@ fn criterion_benchmark(c: &mut Criterion) { ); } - for partitioning in [ - CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), - CometPartitioning::RangePartitioning( - LexOrdering::new(vec![PhysicalSortExpr::new_default( - col("c0", batch.schema().as_ref()).unwrap(), - )]) - .unwrap(), + let lex_ordering = LexOrdering::new(vec![PhysicalSortExpr::new_default( + col("a", batch.schema().as_ref()).unwrap(), + )]) + .unwrap(); + + let (owned_rows, row_converter) = { + let (bounds_rows, row_converter) = RangePartitioner::generate_bounds( + &Vec::from(batch.columns()), + &lex_ordering, 16, + batch.num_rows(), 100, - ), + 42, + ) + .unwrap(); + ( + bounds_rows.iter().map(|row| row.owned()).collect_vec(), + row_converter, + ) + }; + + for partitioning in [ + CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), + CometPartitioning::RangePartitioning(lex_ordering, 16, Arc::new(row_converter), owned_rows), ] { let compression_codec = CompressionCodec::None; group.bench_function( diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 0c3d345c8e..b265a0c9dc 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -85,11 +85,12 @@ use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; use arrow::array::{ - BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, - Int16Array, Int32Array, Int64Array, Int8Array, NullArray, StringBuilder, + ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, + Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, NullArray, StringBuilder, TimestampMicrosecondArray, }; use arrow::buffer::BooleanBuffer; +use arrow::row::{OwnedRow, RowConverter, SortField}; use datafusion::common::utils::SingleRowListArrayBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::filter::FilterExec; @@ -2442,16 +2443,63 @@ impl PhysicalPlanner { )) } PartitioningStruct::RangePartition(range_partition) => { + // Generate the lexical ordering for comparisons let exprs: Result, ExecutionError> = range_partition .sort_orders .iter() .map(|expr| self.create_sort_expr(expr, Arc::clone(&input_schema))) .collect(); let lex_ordering = LexOrdering::new(exprs?).unwrap(); + let boundary_row_len = lex_ordering.len(); + + // Generate the row converter for comparing incoming batches to boundary rows + let sort_fields: Vec = lex_ordering + .iter() + .map(|sort_expr| { + let data_type = sort_expr.expr.data_type(input_schema.as_ref()).unwrap(); + SortField::new_with_options(data_type, sort_expr.options) + }) + .collect(); + + // Deserialize the literals to columnar collections of ScalarValues + let mut scalar_values: Vec> = vec![vec![]; lex_ordering.len()]; + range_partition + .boundary_rows + .iter() + .for_each(|boundary_row| { + assert_eq!(boundary_row.partition_bounds.len(), boundary_row_len); + // For each serialized expr in a boundary row, convert to a Literal + // expression, then extract the ScalarValue from the Literal and push it + // into the collection of ScalarValues + boundary_row.partition_bounds.iter().enumerate().for_each( + |(col_idx, literal_expr)| { + // TODO: Is there a quicker/cleaner way to go from serialized expr + // that we know is a literal to a ScalarValue? + let expr = self + .create_expr(literal_expr, Arc::clone(&input_schema)) + .unwrap(); + let literal_expr = expr.as_any().downcast_ref::().unwrap(); + scalar_values[col_idx].push(literal_expr.value().clone()); + }, + ); + }); + + // Convert the collection of ScalarValues to collection of Arrow Arrays + let arrays: Vec = scalar_values + .iter() + .map(|scalar_vec| ScalarValue::iter_to_array(scalar_vec.iter().cloned())) + .collect::, _>>()?; + + // Create a RowConverter and use to create OwnedRows from the Arrays + let converter = RowConverter::new(sort_fields)?; + let rows = converter.convert_columns(&arrays)?; + let owned_rows: Vec = rows.iter().map(|row| row.owned()).collect(); + Ok(CometPartitioning::RangePartitioning( lex_ordering, range_partition.num_partitions as usize, - range_partition.sample_size as usize, + Arc::new(converter), + owned_rows, )) } PartitioningStruct::SinglePartition(_) => Ok(CometPartitioning::SinglePartition), diff --git a/native/core/src/execution/shuffle/comet_partitioning.rs b/native/core/src/execution/shuffle/comet_partitioning.rs index 9c33da8e93..90f9ef9327 100644 --- a/native/core/src/execution/shuffle/comet_partitioning.rs +++ b/native/core/src/execution/shuffle/comet_partitioning.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use arrow::row::{OwnedRow, RowConverter}; use datafusion::physical_expr::{LexOrdering, PhysicalExpr}; use std::sync::Arc; @@ -26,7 +27,7 @@ pub enum CometPartitioning { Hash(Vec>, usize), /// Allocate rows based on the lexical order of one of more expressions and the specified number of /// partitions - RangePartitioning(LexOrdering, usize, usize), + RangePartitioning(LexOrdering, usize, Arc, Vec), } impl CometPartitioning { @@ -34,7 +35,7 @@ impl CometPartitioning { use CometPartitioning::*; match self { SinglePartition => 1, - Hash(_, n) | RangePartitioning(_, n, _) => *n, + Hash(_, n) | RangePartitioning(_, n, _, _) => *n, } } } diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index c3cb610afa..ee770b1b85 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -19,7 +19,7 @@ pub(crate) mod codec; mod comet_partitioning; mod list; mod map; -mod range_partitioner; +pub mod range_partitioner; pub mod row; mod shuffle_writer; diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index fe303618c3..0c6adb1f8b 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -21,7 +21,6 @@ use crate::execution::shuffle::range_partitioner::RangePartitioner; use crate::execution::shuffle::{CometPartitioning, CompressionCodec, ShuffleBlockWriter}; use crate::execution::tracing::{with_trace, with_trace_async}; use arrow::compute::interleave_record_batch; -use arrow::row::{OwnedRow, RowConverter}; use async_trait::async_trait; use datafusion::common::utils::proxy::VecAllocExt; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; @@ -333,10 +332,6 @@ struct MultiPartitionShuffleRepartitioner { /// Reservation for repartitioning reservation: MemoryReservation, tracing_enabled: bool, - /// RangePartitioning-specific state - bounds_rows: Option>, - row_converter: Option, - seed: u64, } #[derive(Default)] @@ -413,10 +408,6 @@ impl MultiPartitionShuffleRepartitioner { batch_size, reservation, tracing_enabled, - bounds_rows: None, - row_converter: None, - // Spark RangePartitioner seeds off of partition number. - seed: partition as u64, }) } @@ -546,7 +537,8 @@ impl MultiPartitionShuffleRepartitioner { CometPartitioning::RangePartitioning( lex_ordering, num_output_partitions, - sample_size, + row_converter, + bounds, ) => { let mut scratch = std::mem::take(&mut self.scratch); let (partition_starts, partition_row_indices): (&Vec, &Vec) = { @@ -560,35 +552,14 @@ impl MultiPartitionShuffleRepartitioner { let num_rows = arrays[0].len(); - // If necessary (i.e., when first batch arrives) generate the bounds (as Rows) - // for range partitioning based on randomly reservoir sampling the batch. - if self.row_converter.is_none() { - let (bounds_rows, row_converter) = RangePartitioner::generate_bounds( - &arrays, - lex_ordering, - *num_output_partitions, - input.num_rows(), - *sample_size, - self.seed, - )?; - - self.bounds_rows = - Some(bounds_rows.iter().map(|row| row.owned()).collect_vec()); - self.row_converter = Some(row_converter); - } - // Generate partition ids for every row, first by converting the partition // arrays to Rows, and then doing binary search for each Row against the // bounds Rows. - let row_batch = self - .row_converter - .as_ref() - .unwrap() - .convert_columns(arrays.as_slice())?; + let row_batch = row_converter.convert_columns(arrays.as_slice())?; RangePartitioner::partition_indices_for_batch( &row_batch, - self.bounds_rows.as_ref().unwrap().as_slice(), + bounds.as_slice(), &mut scratch.partition_ids[..num_rows], ); @@ -1278,6 +1249,7 @@ mod test { use super::*; use crate::execution::shuffle::read_ipc_compressed; use arrow::datatypes::{DataType, Field, Schema}; + use arrow::row::{RowConverter, SortField}; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; use datafusion::execution::config::SessionConfig; @@ -1404,15 +1376,44 @@ mod test { ) { let batch = create_batch(batch_size); + let lex_ordering = LexOrdering::new(vec![PhysicalSortExpr::new_default( + col("a", batch.schema().as_ref()).unwrap(), + )]) + .unwrap(); + + let (owned_rows, row_converter) = if num_partitions == 1 { + let sort_fields: Vec = batch + .columns() + .iter() + .zip(&lex_ordering) + .map(|(array, sort_expr)| { + SortField::new_with_options(array.data_type().clone(), sort_expr.options) + }) + .collect(); + (vec![], RowConverter::new(sort_fields).unwrap()) + } else { + let (bounds_rows, row_converter) = RangePartitioner::generate_bounds( + &Vec::from(batch.columns()), + &lex_ordering, + num_partitions, + batch_size, + 100, + 42, + ) + .unwrap(); + ( + bounds_rows.iter().map(|row| row.owned()).collect_vec(), + row_converter, + ) + }; + for partitioning in [ CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), CometPartitioning::RangePartitioning( - LexOrdering::new(vec![PhysicalSortExpr::new_default( - col("a", batch.schema().as_ref()).unwrap(), - )]) - .unwrap(), + lex_ordering, num_partitions, - 100, + Arc::new(row_converter), + owned_rows, ), ] { let batches = (0..num_batches).map(|_| batch.clone()).collect::>(); diff --git a/native/proto/src/proto/partitioning.proto b/native/proto/src/proto/partitioning.proto index ea0e586fb1..e11d7a384b 100644 --- a/native/proto/src/proto/partitioning.proto +++ b/native/proto/src/proto/partitioning.proto @@ -42,8 +42,12 @@ message HashPartition { message SinglePartition { } +message BoundaryRow { + repeated spark.spark_expression.Expr partition_bounds = 1; +} + message RangePartition { repeated spark.spark_expression.Expr sort_orders = 1; int32 num_partitions = 2; - int32 sample_size = 3; + repeated BoundaryRow boundary_rows = 4; } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 5d772be403..58018223f7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -28,11 +28,11 @@ import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriteMetricsReporter, ShuffleWriter} -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, SinglePartition} import org.apache.spark.sql.comet.{CometExec, CometMetricNode} import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.comet.CometConf @@ -51,7 +51,8 @@ class CometNativeShuffleWriter[K, V]( shuffleId: Int, mapId: Long, context: TaskContext, - metricsReporter: ShuffleWriteMetricsReporter) + metricsReporter: ShuffleWriteMetricsReporter, + rangePartitionBounds: Option[Seq[InternalRow]] = None) extends ShuffleWriter[K, V] with Logging { @@ -194,35 +195,43 @@ class CometNativeShuffleWriter[K, V]( val partitioning = PartitioningOuterClass.RangePartition.newBuilder() partitioning.setNumPartitions(outputPartitioning.numPartitions) - val sampleSize = { - // taken from org.apache.spark.RangePartitioner#rangeBounds - // This is the sample size we need to have roughly balanced output partitions, - // capped at 1M. - // Cast to double to avoid overflowing ints or longs - val sampleSize = math.min( - SQLConf.get - .getConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION) - .toDouble * outputPartitioning.numPartitions, - 1e6) - // Assume the input partitions are roughly balanced and over-sample a little bit. - // Comet: we don't divide by numPartitions since each DF plan handles one partition. - math.ceil(3.0 * sampleSize).toInt - } - if (sampleSize > 8192) { - logWarning( - s"RangePartitioning sampleSize of s$sampleSize exceeds Comet RecordBatch size.") - } - partitioning.setSampleSize(sampleSize) - - val orderingExprs = rangePartitioning.ordering - .flatMap(e => QueryPlanSerde.exprToProto(e, outputAttributes)) - if (orderingExprs.length != rangePartitioning.ordering.length) { - throw new UnsupportedOperationException( - s"Partitioning $rangePartitioning is not supported.") + { + // Serialize the ordering expressions for comparisons + val orderingExprs = rangePartitioning.ordering + .flatMap(e => QueryPlanSerde.exprToProto(e, outputAttributes)) + if (orderingExprs.length != rangePartitioning.ordering.length) { + throw new UnsupportedOperationException( + s"Partitioning $rangePartitioning is not supported.") + } + partitioning.addAllSortOrders(orderingExprs.asJava) } - partitioning.addAllSortOrders(orderingExprs.asJava) + // Convert Spark's sequence of InternalRows that represent partitioning boundaries to sequences of Literals, + // where each outer entry represents a boundary row, and each internal entry is a value in that row. In other + // words, these are stored in row major order, not column major + val boundarySchema = rangePartitioning.ordering.flatMap(e => Some(e.dataType)) + val boundaryExprs: Seq[Seq[Literal]] = + rangePartitionBounds.get.map((row: InternalRow) => + // For every InternalRow, map its values to Literals to ao collection of Literals + row.toSeq(boundarySchema).zip(boundarySchema).map { case (value, valueType) => + Literal(value, valueType) + }) + + { + // Convert the sequences of Literals to a collection of serialized BoundaryRows + val boundaryRows: Seq[PartitioningOuterClass.BoundaryRow] = boundaryExprs + .map((rowLiterals: Seq[Literal]) => { + // Serialize each sequence of Literals as a BoundaryRow + val rowBuilder = PartitioningOuterClass.BoundaryRow.newBuilder(); + val serializedExprs = + rowLiterals.map(lit_value => + QueryPlanSerde.exprToProto(lit_value, outputAttributes).get) + rowBuilder.addAllPartitionBounds(serializedExprs.asJava) + rowBuilder.build() + }) + partitioning.addAllBoundaryRows(boundaryRows.asJava) + } val partitioningBuilder = PartitioningOuterClass.Partitioning.newBuilder() shuffleWriterBuilder.setPartitioning( diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleDependency.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleDependency.scala index ff35b10eb6..2b74e5a168 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleDependency.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleDependency.scala @@ -25,6 +25,7 @@ import org.apache.spark.{Aggregator, Partitioner, ShuffleDependency, SparkEnv} import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleWriteProcessor +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.metric.SQLMetric @@ -47,7 +48,8 @@ class CometShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val outputPartitioning: Option[Partitioning] = None, val outputAttributes: Seq[Attribute] = Seq.empty, val shuffleWriteMetrics: Map[String, SQLMetric] = Map.empty, - val numParts: Int = 0) + val numParts: Int = 0, + val rangePartitionBounds: Option[Seq[InternalRow]] = None) extends ShuffleDependency[K, V, C]( _rdd, partitioner, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index df67e7b7f2..54941b72a7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -21,6 +21,7 @@ package org.apache.spark.sql.comet.execution.shuffle import java.util.function.Supplier +import scala.collection.JavaConverters.asScalaIteratorConverter import scala.concurrent.Future import org.apache.spark._ @@ -211,6 +212,54 @@ case class CometShuffleExchangeExec( } object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { + + /** + * Computes range partition bounds by sampling across all partitions. This ensures all executors + * use the same partition boundaries for range partitioning. + */ + def computeRangePartitionBounds( + rdd: RDD[ColumnarBatch], + rangePartitioning: RangePartitioning, + outputAttributes: Seq[Attribute]): Seq[InternalRow] = { + + // Create sampling RDD similar to existing JVM shuffle logic + val rddForSampling = rdd.mapPartitionsInternal { iter => + val projection = + UnsafeProjection.create(rangePartitioning.ordering.map(_.child), outputAttributes) + val mutablePair = new MutablePair[InternalRow, Null]() + + // Convert ColumnarBatch to rows and project sorting columns + iter.flatMap { batch => + val rowIter = batch.rowIterator().asScala + rowIter.map { row => + // Copy the mutable keys for accurate sampling + mutablePair.update(projection(row).copy(), null) + } + } + } + + // Construct ordering on extracted sort key + val orderingAttributes = rangePartitioning.ordering.zipWithIndex.map { case (ord, i) => + ord.copy(child = BoundReference(i, ord.dataType, ord.nullable)) + } + implicit val ordering = new LazilyGeneratedOrdering(orderingAttributes) + + // Use Spark's RangePartitioner to compute bounds from global samples + val rangePartitioner = new RangePartitioner( + rangePartitioning.numPartitions, + rddForSampling, + ascending = true, + samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition) + + // Use reflection to access the private rangeBounds field + val rangePartitionerClass = rangePartitioner.getClass + val rangeBoundsField = rangePartitionerClass.getDeclaredField("rangeBounds") + rangeBoundsField.setAccessible(true) + val rangeBounds = + rangeBoundsField.get(rangePartitioner).asInstanceOf[Array[InternalRow]].toSeq + + rangeBounds + } def prepareShuffleDependency( rdd: RDD[ColumnarBatch], outputAttributes: Seq[Attribute], @@ -218,6 +267,14 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { serializer: Serializer, metrics: Map[String, SQLMetric]): ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = { val numParts = rdd.getNumPartitions + + // Compute range partition bounds for range partitioning + val rangePartitionBounds: Option[Seq[InternalRow]] = outputPartitioning match { + case rangePartitioning: RangePartitioning => + Some(computeRangePartitionBounds(rdd, rangePartitioning, outputAttributes)) + case _ => None + } + val dependency = new CometShuffleDependency[Int, ColumnarBatch, ColumnarBatch]( rdd.map( (0, _) @@ -233,7 +290,8 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { outputPartitioning = Some(outputPartitioning), outputAttributes = outputAttributes, shuffleWriteMetrics = metrics, - numParts = numParts) + numParts = numParts, + rangePartitionBounds = rangePartitionBounds) dependency } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala index 1142c6af17..ebc481fc85 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala @@ -209,7 +209,8 @@ class CometShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { dep.shuffleId, mapId, context, - metrics) + metrics, + dep.rangePartitionBounds) case bypassMergeSortHandle: CometBypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new CometBypassMergeSortShuffleWriter( env.blockManager, diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index ed250e141c..d6daf779ab 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -253,7 +253,20 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { df2.collect() if (usingDataSourceExec) { val cometShuffles = collectCometShuffleExchanges(df2.queryExecution.executedPlan) - assert(1 == cometShuffles.length) + val expectedNumCometShuffles = CometConf.COMET_NATIVE_SCAN_IMPL.get() match { + case CometConf.SCAN_NATIVE_COMET => + // native_comet does not support reading complex types + 0 + case CometConf.SCAN_NATIVE_ICEBERG_COMPAT | CometConf.SCAN_NATIVE_DATAFUSION => + CometConf.COMET_SHUFFLE_MODE.get() match { + case "jvm" => + 1 + case "native" => + // native shuffle does not support complex types as partitioning keys + 2 + } + } + assert(cometShuffles.length == expectedNumCometShuffles) } } @@ -380,6 +393,7 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { scanImpl => super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: _*) { withSQLConf( + CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "true", CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl, CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index 5e2abd7c33..624375846e 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.Tag import org.apache.hadoop.fs.Path import org.apache.spark.SparkEnv -import org.apache.spark.sql.{CometTestBase, DataFrame} +import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.col @@ -40,6 +40,7 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "native", + CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { testFun } @@ -248,6 +249,57 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } } + // This adapts the PySpark example in https://github.com/apache/datafusion-comet/issues/1906 to + // test for incorrect partition values after native RangePartitioning + test("fix: range partitioning #1906") { + withSQLConf(CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "true") { + withParquetTable((0 until 100000).map(i => (i, i + 1)), "tbl") { + val df = sql("SELECT * from tbl") + + // Repartition with two sort columns + val df_range_partitioned = df.repartitionByRange(10, df.col("_1"), df.col("_2")) + + val partition_bounds = df_range_partitioned.rdd + .mapPartitionsWithIndex((idx: Int, iterator: Iterator[Row]) => { + // Find the min and max value in each partition + var min: Option[Int] = None + var max: Option[Int] = None + iterator.foreach((row: Row) => { + val row_val = row.get(0).asInstanceOf[Int] + if (min.isEmpty || row_val < min.get) { + min = Some(row_val) + } + if (max.isEmpty || row_val > max.get) { + max = Some(row_val) + } + }) + Iterator.single((idx, min, max)) + }) + .collect() + + // Check min and max values in each partition + for (i <- partition_bounds.indices.init) { + val currentPartition = partition_bounds(i) + val nextPartition = partition_bounds(i + 1) + + if (currentPartition._2.isDefined && currentPartition._3.isDefined) { + val currentMin = currentPartition._2.get + val currentMax = currentPartition._3.get + assert(currentMin <= currentMax) + } + + if (currentPartition._3.isDefined && nextPartition._2.isDefined) { + val currentMax = currentPartition._3.get + val nextMin = nextPartition._2.get + assert(currentMax <= nextMin) + } + } + + } + } + + } + /** * Checks that `df` produces the same answer as Spark does, and has the `expectedNum` Comet * exchange operators. When `checkNativeOperators` is true, this also checks that all operators From 4ee3d8e214f1eb613eaa1f9ee5ae432aa850fdad Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 28 Aug 2025 11:04:38 -0400 Subject: [PATCH 2/9] Fix warnings and benchmark compilation. --- native/core/benches/shuffle_writer.rs | 3 ++- native/core/src/execution/shuffle/mod.rs | 3 ++- native/core/src/execution/shuffle/range_partitioner.rs | 3 +++ .../comet/execution/shuffle/CometNativeShuffleWriter.scala | 7 ++++--- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/native/core/benches/shuffle_writer.rs b/native/core/benches/shuffle_writer.rs index 2795254ac7..f4d025b9f8 100644 --- a/native/core/benches/shuffle_writer.rs +++ b/native/core/benches/shuffle_writer.rs @@ -18,7 +18,7 @@ use arrow::array::builder::{Date32Builder, Decimal128Builder, Int32Builder}; use arrow::array::{builder::StringBuilder, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema}; -use comet::execution::shuffle::range_partitioner::RangePartitioner; +use comet::execution::shuffle::RangePartitioner; use comet::execution::shuffle::{ CometPartitioning, CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec, }; @@ -32,6 +32,7 @@ use datafusion::{ physical_plan::{common::collect, ExecutionPlan}, prelude::SessionContext, }; +use itertools::Itertools; use std::io::Cursor; use std::sync::Arc; use tokio::runtime::Runtime; diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index ee770b1b85..71869066fe 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -19,10 +19,11 @@ pub(crate) mod codec; mod comet_partitioning; mod list; mod map; -pub mod range_partitioner; +pub(crate) mod range_partitioner; pub mod row; mod shuffle_writer; pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter}; pub use comet_partitioning::CometPartitioning; +pub use range_partitioner::RangePartitioner; pub use shuffle_writer::ShuffleWriterExec; diff --git a/native/core/src/execution/shuffle/range_partitioner.rs b/native/core/src/execution/shuffle/range_partitioner.rs index e14cf61482..5dabb45635 100644 --- a/native/core/src/execution/shuffle/range_partitioner.rs +++ b/native/core/src/execution/shuffle/range_partitioner.rs @@ -31,6 +31,7 @@ impl RangePartitioner { /// We use sample_size instead of k and num_rows instead of n. /// We use indices instead of actual values in the reservoir since we'll do one take() on the /// input arrays at the end. + #[allow(dead_code)] fn reservoir_sample_indices(num_rows: usize, sample_size: usize, seed: u64) -> Vec { assert!(sample_size > 0); assert!( @@ -76,6 +77,7 @@ impl RangePartitioner { /// Given input arrays and range partitioning metadata: samples the input arrays, generates /// partition bounds, and returns Rows (for comparison against) and a RowConverter (for /// adapting future incoming batches). + #[allow(dead_code)] pub fn generate_bounds( partition_arrays: &Vec, lex_ordering: &LexOrdering, @@ -143,6 +145,7 @@ impl RangePartitioner { /// values since we don't have cross-partition samples to merge. /// We normalize the math to use ints instead of floating point by replacing 1.0 with a /// (imagined) num_candidates * partitions range. + #[allow(dead_code)] fn determine_bounds_for_rows( sort_fields: Vec, sampled_columns: &[ArrayRef], diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 58018223f7..290b38f880 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -207,9 +207,10 @@ class CometNativeShuffleWriter[K, V]( partitioning.addAllSortOrders(orderingExprs.asJava) } - // Convert Spark's sequence of InternalRows that represent partitioning boundaries to sequences of Literals, - // where each outer entry represents a boundary row, and each internal entry is a value in that row. In other - // words, these are stored in row major order, not column major + // Convert Spark's sequence of InternalRows that represent partitioning boundaries to + // sequences of Literals, where each outer entry represents a boundary row, and each + // internal entry is a value in that row. In other words, these are stored in row major + // order, not column major val boundarySchema = rangePartitioning.ordering.flatMap(e => Some(e.dataType)) val boundaryExprs: Seq[Seq[Literal]] = rangePartitionBounds.get.map((row: InternalRow) => From 6f34e35337d5bd5a8a24fe7e0b8cf021abd4e8c1 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 28 Aug 2025 11:34:51 -0400 Subject: [PATCH 3/9] Fix benchmark bug. --- native/core/benches/shuffle_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/benches/shuffle_writer.rs b/native/core/benches/shuffle_writer.rs index f4d025b9f8..85da4d42d4 100644 --- a/native/core/benches/shuffle_writer.rs +++ b/native/core/benches/shuffle_writer.rs @@ -87,7 +87,7 @@ fn criterion_benchmark(c: &mut Criterion) { } let lex_ordering = LexOrdering::new(vec![PhysicalSortExpr::new_default( - col("a", batch.schema().as_ref()).unwrap(), + col("c0", batch.schema().as_ref()).unwrap(), )]) .unwrap(); From 332e76afb9a158a508af0b4547a3bad3a12947b1 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 28 Aug 2025 11:40:51 -0400 Subject: [PATCH 4/9] Minor refactor. --- native/core/benches/shuffle_writer.rs | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/native/core/benches/shuffle_writer.rs b/native/core/benches/shuffle_writer.rs index 85da4d42d4..13dba9c06c 100644 --- a/native/core/benches/shuffle_writer.rs +++ b/native/core/benches/shuffle_writer.rs @@ -91,21 +91,16 @@ fn criterion_benchmark(c: &mut Criterion) { )]) .unwrap(); - let (owned_rows, row_converter) = { - let (bounds_rows, row_converter) = RangePartitioner::generate_bounds( - &Vec::from(batch.columns()), - &lex_ordering, - 16, - batch.num_rows(), - 100, - 42, - ) - .unwrap(); - ( - bounds_rows.iter().map(|row| row.owned()).collect_vec(), - row_converter, - ) - }; + let (bounds_rows, row_converter) = RangePartitioner::generate_bounds( + &Vec::from(batch.columns()), + &lex_ordering, + 16, + batch.num_rows(), + 100, + 42, + ) + .unwrap(); + let owned_rows = bounds_rows.iter().map(|row| row.owned()).collect_vec(); for partitioning in [ CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), From 0eb1134a20881a951bba025e4683d0e491089002 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 28 Aug 2025 11:59:30 -0400 Subject: [PATCH 5/9] Cleanup to make it more clear what code came from Spark. --- .../shuffle/CometShuffleExchangeExec.scala | 59 +++++++++++-------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 54941b72a7..fe576b186e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -36,8 +36,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.comet.{CometMetricNode, CometPlan} import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike, ShuffleOrigin} -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch @@ -222,34 +221,38 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { rangePartitioning: RangePartitioning, outputAttributes: Seq[Attribute]): Seq[InternalRow] = { - // Create sampling RDD similar to existing JVM shuffle logic - val rddForSampling = rdd.mapPartitionsInternal { iter => - val projection = - UnsafeProjection.create(rangePartitioning.ordering.map(_.child), outputAttributes) - val mutablePair = new MutablePair[InternalRow, Null]() - - // Convert ColumnarBatch to rows and project sorting columns - iter.flatMap { batch => - val rowIter = batch.rowIterator().asScala - rowIter.map { row => - // Copy the mutable keys for accurate sampling - mutablePair.update(projection(row).copy(), null) + // The code block below is mostly brought over from + // ShuffleExchangeExec::prepareShuffleDependency and modified for columnar batches + val rangePartitioner = { + // Extract only fields used for sorting to avoid collecting large fields that does not + // affect sorting result when deciding partition bounds in RangePartitioner + val rddForSampling = rdd.mapPartitionsInternal { iter => + val projection = + UnsafeProjection.create(rangePartitioning.ordering.map(_.child), outputAttributes) + val mutablePair = new MutablePair[InternalRow, Null]() + + // Internally, RangePartitioner runs a job on the RDD that samples keys to compute + // partition bounds. To get accurate samples, we need to copy the mutable keys. + iter.flatMap { batch => + val rowIter = batch.rowIterator().asScala + rowIter.map { row => + mutablePair.update(projection(row).copy(), null) + } } } - } - // Construct ordering on extracted sort key - val orderingAttributes = rangePartitioning.ordering.zipWithIndex.map { case (ord, i) => - ord.copy(child = BoundReference(i, ord.dataType, ord.nullable)) + // Construct ordering on extracted sort key. + val orderingAttributes = rangePartitioning.ordering.zipWithIndex.map { case (ord, i) => + ord.copy(child = BoundReference(i, ord.dataType, ord.nullable)) + } + implicit val ordering = new LazilyGeneratedOrdering(orderingAttributes) + // Use Spark's RangePartitioner to compute bounds from global samples + new RangePartitioner( + rangePartitioning.numPartitions, + rddForSampling, + ascending = true, + samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition) } - implicit val ordering = new LazilyGeneratedOrdering(orderingAttributes) - - // Use Spark's RangePartitioner to compute bounds from global samples - val rangePartitioner = new RangePartitioner( - rangePartitioning.numPartitions, - rddForSampling, - ascending = true, - samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition) // Use reflection to access the private rangeBounds field val rangePartitionerClass = rangePartitioner.getClass @@ -260,6 +263,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { rangeBounds } + def prepareShuffleDependency( rdd: RDD[ColumnarBatch], outputAttributes: Seq[Attribute], @@ -284,6 +288,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { shuffleType = CometNativeShuffle, partitioner = new Partitioner { override def numPartitions: Int = outputPartitioning.numPartitions + override def getPartition(key: Any): Int = key.asInstanceOf[Int] }, decodeTime = metrics("decode_time"), @@ -381,6 +386,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning. } + def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match { case RoundRobinPartitioning(numPartitions) => // Distributes elements evenly across output partitions, starting from a random partition. @@ -433,6 +439,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { // limited range. val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix + override def computePrefix( row: InternalRow): UnsafeExternalRowSorter.PrefixComputer.Prefix = { // The hashcode generated from the binary form of a [[UnsafeRow]] should not be null. From bb67f73119f09c5e09692e1368644380126724db Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 28 Aug 2025 12:11:48 -0400 Subject: [PATCH 6/9] Fix errant comment. --- spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index d6daf779ab..31d9c6cd94 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -262,7 +262,6 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { case "jvm" => 1 case "native" => - // native shuffle does not support complex types as partitioning keys 2 } } From abd895869f9360060db1082d232eb1f626e08844 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 28 Aug 2025 13:50:19 -0400 Subject: [PATCH 7/9] Override partitioning scheme at serialization when num_partitions is 1. --- .../shuffle/CometNativeShuffleWriter.scala | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 290b38f880..96754e3e64 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -141,6 +141,16 @@ class CometNativeShuffleWriter[K, V]( MapStatus.apply(SparkEnv.get.blockManager.shuffleServerId, partitionLengths, mapId) } + // Spark sometimes generates partitioning schemes other than SinglePartition with + // numPartitions == 1, typically near the output of a query. In this case Comet just + // serializes a SinglePartition scheme to native. + private def isSinglePartitioning(p: Partitioning): Boolean = p match { + case SinglePartition => true + case rp: RangePartitioning => rp.numPartitions == 1 + case hp: HashPartitioning => hp.numPartitions == 1 + case _ => false + } + private def getNativePlan(dataFile: String, indexFile: String): Operator = { val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("ShuffleWriterInput") val opBuilder = OperatorOuterClass.Operator.newBuilder() @@ -171,6 +181,12 @@ class CometNativeShuffleWriter[K, V]( CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL.get) outputPartitioning match { + case p if isSinglePartitioning(p) => + val partitioning = PartitioningOuterClass.SinglePartition.newBuilder() + + val partitioningBuilder = PartitioningOuterClass.Partitioning.newBuilder() + shuffleWriterBuilder.setPartitioning( + partitioningBuilder.setSinglePartition(partitioning).build()) case _: HashPartitioning => val hashPartitioning = outputPartitioning.asInstanceOf[HashPartitioning] @@ -237,12 +253,6 @@ class CometNativeShuffleWriter[K, V]( val partitioningBuilder = PartitioningOuterClass.Partitioning.newBuilder() shuffleWriterBuilder.setPartitioning( partitioningBuilder.setRangePartition(partitioning).build()) - case SinglePartition => - val partitioning = PartitioningOuterClass.SinglePartition.newBuilder() - - val partitioningBuilder = PartitioningOuterClass.Partitioning.newBuilder() - shuffleWriterBuilder.setPartitioning( - partitioningBuilder.setSinglePartition(partitioning).build()) case _ => throw new UnsupportedOperationException( From 967d1a10e6f9ade005702ff58194faa63611048b Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 28 Aug 2025 15:01:16 -0400 Subject: [PATCH 8/9] Override partitioning scheme at serialization when computed bounds result in 1 partition. --- .../shuffle/CometNativeShuffleWriter.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 96754e3e64..b11f9fead9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -141,12 +141,18 @@ class CometNativeShuffleWriter[K, V]( MapStatus.apply(SparkEnv.get.blockManager.shuffleServerId, partitionLengths, mapId) } - // Spark sometimes generates partitioning schemes other than SinglePartition with - // numPartitions == 1, typically near the output of a query. In this case Comet just - // serializes a SinglePartition scheme to native. private def isSinglePartitioning(p: Partitioning): Boolean = p match { case SinglePartition => true - case rp: RangePartitioning => rp.numPartitions == 1 + case rp: RangePartitioning => + // Spark sometimes generates RangePartitioning schemes with numPartitions == 1, + // or the computed bounds results in a single target partition. + // In this case Comet just serializes a SinglePartition scheme to native. + if ((rp.numPartitions == 1) || rangePartitionBounds.isEmpty || + rangePartitionBounds.get.isEmpty) { + true + } else { + false + } case hp: HashPartitioning => hp.numPartitions == 1 case _ => false } From 522ef802c4980975db41a8482f73ffa4c77d8594 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 29 Aug 2025 11:11:44 -0400 Subject: [PATCH 9/9] Remove string and binary range partitioning types until we sort out how to handle dictionary encoding. --- .../org/apache/comet/rules/CometExecRule.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 0dcd3828a5..63396f63b4 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -759,7 +759,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { * operations like `groupByKey`, `reduceByKey` or `join`. Native code does not support hashing * complex types, see hash_funcs/utils.rs */ - def supportedPartitioningDataType(dt: DataType): Boolean = dt match { + def supportedHashPartitioningDataType(dt: DataType): Boolean = dt match { case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType | _: DecimalType | _: DateType => @@ -768,6 +768,15 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { false } + def supportedRangePartitioningDataType(dt: DataType): Boolean = dt match { + case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: LongType | + _: FloatType | _: DoubleType | _: TimestampType | _: TimestampNTZType | _: DecimalType | + _: DateType => + true + case _ => + false + } + /** * Determine which data types are supported as data columns in native shuffle. * @@ -832,7 +841,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { } } for (dt <- expressions.map(_.dataType).distinct) { - if (!supportedPartitioningDataType(dt)) { + if (!supportedHashPartitioningDataType(dt)) { withInfo(s, s"unsupported hash partitioning data type for native shuffle: $dt") supported = false } @@ -858,7 +867,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { } } for (dt <- orderings.map(_.dataType).distinct) { - if (!supportedPartitioningDataType(dt)) { + if (!supportedRangePartitioningDataType(dt)) { withInfo(s, s"unsupported range partitioning data type for native shuffle: $dt") supported = false }