Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ jobs:
org.apache.comet.exec.CometAggregateSuite
org.apache.comet.exec.CometExec3_4PlusSuite
org.apache.comet.exec.CometExecSuite
org.apache.comet.exec.CometGenerateExecSuite
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.CometNativeSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ jobs:
org.apache.comet.exec.CometAggregateSuite
org.apache.comet.exec.CometExec3_4PlusSuite
org.apache.comet.exec.CometExecSuite
org.apache.comet.exec.CometGenerateExecSuite
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.CometNativeSuite
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ object CometConf extends ShimCometConf {
createExecEnabledConfig("union", defaultValue = true)
val COMET_EXEC_EXPAND_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("expand", defaultValue = true)
val COMET_EXEC_EXPLODE_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("explode", defaultValue = true)
val COMET_EXEC_WINDOW_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("window", defaultValue = true)
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ These settings can be used to determine which parts of the plan are accelerated
| `spark.comet.exec.coalesce.enabled` | Whether to enable coalesce by default. | true |
| `spark.comet.exec.collectLimit.enabled` | Whether to enable collectLimit by default. | true |
| `spark.comet.exec.expand.enabled` | Whether to enable expand by default. | true |
| `spark.comet.exec.explode.enabled` | Whether to enable explode by default. | true |
| `spark.comet.exec.filter.enabled` | Whether to enable filter by default. | true |
| `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true |
| `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true |
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ not supported by Comet will fall back to regular Spark execution.
| ExpandExec | Yes | |
| FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. |
| FilterExec | Yes | |
| GenerateExec | Yes | Supports `explode` generator only. |
| GlobalLimitExec | Yes | |
| HashAggregateExec | Yes | |
| InsertIntoHadoopFsRelationCommand | No | Experimental support for native Parquet writes. Disabled by default. |
Expand Down
103 changes: 103 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ use arrow::array::{
use arrow::buffer::{BooleanBuffer, NullBuffer, OffsetBuffer};
use arrow::row::{OwnedRow, RowConverter, SortField};
use datafusion::common::utils::SingleRowListArrayBuilder;
use datafusion::common::UnnestOptions;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::GlobalLimitExec;
use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
use datafusion_comet_proto::spark_expression::ListLiteral;
use datafusion_comet_proto::spark_operator::SparkFilePartition;
use datafusion_comet_proto::{
Expand Down Expand Up @@ -1528,6 +1530,107 @@ impl PhysicalPlanner {
Arc::new(SparkPlan::new(spark_plan.plan_id, expand, vec![child])),
))
}
OpStruct::Explode(explode) => {
assert_eq!(children.len(), 1);
let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?;

// Create the expression for the array to explode
let child_expr = if let Some(child_expr) = &explode.child {
self.create_expr(child_expr, child.schema())?
} else {
return Err(ExecutionError::GeneralError(
"Explode operator requires a child expression".to_string(),
));
};

// Create projection expressions for other columns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other columns? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, as in SELECT a, b, c, explode(d) FROM ...

let projections: Vec<Arc<dyn PhysicalExpr>> = explode
.project_list
.iter()
.map(|expr| self.create_expr(expr, child.schema()))
.collect::<Result<Vec<_>, _>>()?;

// For UnnestExec, we need to add a projection to put the columns in the right order:
// 1. First add all projection columns
// 2. Then add the array column to be exploded
// Then UnnestExec will unnest the last column

let mut project_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = projections
.iter()
.enumerate()
.map(|(idx, expr)| (Arc::clone(expr), format!("col_{idx}")))
.collect();

// Add the array column as the last column
let array_col_name = format!("col_{}", projections.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this name cause a conflict or issue if original dataset has col_* cols?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this now and preserve original names

project_exprs.push((Arc::clone(&child_expr), array_col_name.clone()));

// Create a projection to arrange columns as needed
let project_exec = Arc::new(ProjectionExec::try_new(
project_exprs,
Arc::clone(&child.native_plan),
)?);

// Get the input schema from the projection
let project_schema = project_exec.schema();

// Build the output schema for UnnestExec
// The output schema replaces the list column with its element type
let mut output_fields: Vec<Field> = Vec::new();

// Add all projection columns (non-array columns)
for i in 0..projections.len() {
output_fields.push(project_schema.field(i).clone());
}

// Add the unnested array element field
// Extract the element type from the list/array type
let array_field = project_schema.field(projections.len());
let element_type = match array_field.data_type() {
DataType::List(field) => field.data_type().clone(),
dt => {
return Err(ExecutionError::GeneralError(format!(
"Expected List type for explode, got {:?}",
dt
)))
}
};

// The output column has the same name as the input array column
// but with the element type instead of the list type
output_fields.push(Field::new(
array_field.name(),
element_type,
true, // Element is nullable after unnesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

));

let output_schema = Arc::new(Schema::new(output_fields));

// Use UnnestExec to explode the last column (the array column)
// ListUnnest specifies which column to unnest and the depth (1 for single level)
let list_unnest = ListUnnest {
index_in_input_schema: projections.len(), // Index of the array column to unnest
depth: 1, // Unnest one level (explode single array)
};

let unnest_options = UnnestOptions {
preserve_nulls: explode.outer,
recursions: vec![],
};

let unnest_exec = Arc::new(UnnestExec::new(
project_exec,
vec![list_unnest],
vec![], // No struct columns to unnest
output_schema,
unnest_options,
));

Ok((
scans,
Arc::new(SparkPlan::new(spark_plan.plan_id, unnest_exec, vec![child])),
))
}
OpStruct::SortMergeJoin(join) => {
let (join_params, scans) = self.parse_join_parameters(
inputs,
Expand Down
10 changes: 10 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ message Operator {
NativeScan native_scan = 111;
IcebergScan iceberg_scan = 112;
ParquetWriter parquet_writer = 113;
Explode explode = 114;
}
}

Expand Down Expand Up @@ -253,6 +254,15 @@ message Expand {
int32 num_expr_per_project = 3;
}

message Explode {
// The array expression to explode into multiple rows
spark.spark_expression.Expr child = 1;
// Whether this is explode_outer (produces null row for empty/null arrays)
bool outer = 2;
// Expressions for other columns to project alongside the exploded values
repeated spark.spark_expression.Expr project_list = 3;
}

message HashJoin {
repeated spark.spark_expression.Expr left_join_keys = 1;
repeated spark.spark_expression.Expr right_join_keys = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ object CometExecRule {
classOf[LocalLimitExec] -> CometLocalLimitExec,
classOf[GlobalLimitExec] -> CometGlobalLimitExec,
classOf[ExpandExec] -> CometExpandExec,
classOf[GenerateExec] -> CometExplodeExec,
classOf[HashAggregateExec] -> CometHashAggregateExec,
classOf[ObjectHashAggregateExec] -> CometObjectHashAggregateExec,
classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoinExec,
Expand Down
125 changes: 122 additions & 3 deletions spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.spark.sql.comet

import java.io.ByteArrayOutputStream
import java.util.Locale

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand All @@ -29,7 +30,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, Generator, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateMode, Final, Partial}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans._
Expand All @@ -43,7 +44,7 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, TimestampNTZType}
import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, TimestampNTZType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.io.ChunkedByteBuffer
Expand All @@ -53,7 +54,7 @@ import com.google.common.base.Objects
import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleEnabled, withInfo}
import org.apache.comet.parquet.CometParquetUtils
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, OperatorOuterClass, SupportLevel, Unsupported}
import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, Operator}
import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto, supportedSortType}
import org.apache.comet.serde.operator.CometSink
Expand Down Expand Up @@ -882,6 +883,124 @@ case class CometExpandExec(
override lazy val metrics: Map[String, SQLMetric] = Map.empty
}

object CometExplodeExec extends CometOperatorSerde[GenerateExec] {

override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
CometConf.COMET_EXEC_EXPLODE_ENABLED)

override def getSupportLevel(op: GenerateExec): SupportLevel = {
if (!op.generator.deterministic) {
return Unsupported(Some("Only deterministic generators are supported"))
}
if (op.generator.children.length != 1) {
return Unsupported(Some("generators with multiple inputs are not supported"))
}
if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") {
return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}"))
}
if (op.outer) {
// DataFusion UnnestExec has different semantics to Spark for this case
// https://github.com/apache/datafusion/issues/19053
return Incompatible(Some("Empty arrays are not preserved as null outputs when outer=true"))
}
op.generator.children.head.dataType match {
case _: ArrayType =>
Compatible()
case _: MapType =>
// TODO add support for map types
// https://github.com/apache/datafusion-comet/issues/2837
Unsupported(Some("Comet only supports explode/explode_outer for arrays, not maps"))
case other =>
Unsupported(Some(s"Unsupported data type: $other"))
}
}

override def convert(
op: GenerateExec,
builder: Operator.Builder,
childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = {
val childExpr = op.generator.children.head
val childExprProto = exprToProto(childExpr, op.child.output)

if (childExprProto.isEmpty) {
withInfo(op, childExpr)
return None
}

// Convert the projection expressions (columns to carry forward)
// These are the non-generator output columns
val generatorOutput = op.generatorOutput.toSet
val projectExprs = op.output.filterNot(generatorOutput.contains).map { attr =>
exprToProto(attr, op.child.output)
}

if (projectExprs.exists(_.isEmpty) || childOp.isEmpty) {
withInfo(op, op.output: _*)
return None
}

val explodeBuilder = OperatorOuterClass.Explode
.newBuilder()
.setChild(childExprProto.get)
.setOuter(op.outer)
.addAllProjectList(projectExprs.map(_.get).asJava)

Some(builder.setExplode(explodeBuilder).build())
}

override def createExec(nativeOp: Operator, op: GenerateExec): CometNativeExec = {
CometExplodeExec(
nativeOp,
op,
op.output,
op.generator,
op.generatorOutput,
op.child,
SerializedPlan(None))
}
}

case class CometExplodeExec(
override val nativeOp: Operator,
override val originalPlan: SparkPlan,
override val output: Seq[Attribute],
generator: Generator,
generatorOutput: Seq[Attribute],
child: SparkPlan,
override val serializedPlanOpt: SerializedPlan)
extends CometUnaryExec {
override def outputPartitioning: Partitioning = child.outputPartitioning

override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)

override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
this.copy(child = newChild)

override def stringArgs: Iterator[Any] = Iterator(generator, generatorOutput, output, child)

override def equals(obj: Any): Boolean = {
obj match {
case other: CometExplodeExec =>
this.output == other.output &&
this.generator == other.generator &&
this.generatorOutput == other.generatorOutput &&
this.child == other.child &&
this.serializedPlanOpt == other.serializedPlanOpt
case _ =>
false
}
}

override def hashCode(): Int = Objects.hashCode(output, generator, generatorOutput, child)

override lazy val metrics: Map[String, SQLMetric] =
CometMetricNode.baselineMetrics(sparkContext) ++
Map(
"input_batches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"),
"input_rows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"output_batches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"))
}

object CometUnionExec extends CometSink[UnionExec] {

override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
Expand Down
4 changes: 4 additions & 0 deletions spark/src/test/resources/tpcds-micro-benchmarks/explode.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
SELECT i_item_sk, explode(array(i_brand_id, i_class_id, i_category_id, i_manufact_id, i_manager_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also have explode_outer?

FROM item
ORDER BY i_item_sk
LIMIT 1000
Loading
Loading