Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,15 @@ object CometConf extends ShimCometConf {
.longConf
.createWithDefault(3000L)

val COMET_ENABLE_GROUPING_ON_MAP_TYPE: ConfigEntry[Boolean] =
conf("spark.comet.enableGroupingOnMapType")
.doc(
"An experimental feature with limited capabilities to enable grouping on Spark Map type." +
"Requires Spark 4.0 and beyond along with support for scan on Map type." +
s"Set this config to true to enable grouping on map type. $COMPAT_GUIDE.")
.booleanConf
.createWithDefault(false)

/** Create a config to enable a specific operator */
private def createExecEnabledConfig(
exec: String,
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Comet provides the following configuration settings.
| spark.comet.convert.parquet.enabled | When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to Arrow format. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | false |
| spark.comet.debug.enabled | Whether to enable debug mode for Comet. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false |
| spark.comet.dppFallback.enabled | Whether to fall back to Spark for queries that use DPP. | true |
| spark.comet.enableGroupingOnMapType | An experimental feature with limited capabilities to enable grouping on Spark Map type.Requires Spark 4.0 and beyond along with support for scan on Map type.Set this config to true to enable grouping on map type. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
| spark.comet.enabled | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is the value of the env var `ENABLE_COMET` if set, or true otherwise. | true |
| spark.comet.exceptionOnDatetimeRebase | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false |
| spark.comet.exec.aggregate.enabled | Whether to enable aggregate by default. | true |
Expand Down
40 changes: 35 additions & 5 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,21 @@ impl PhysicalPlanner {
.map(|r| (r, format!("col_{idx}")))
})
.collect();
let group_by = PhysicalGroupBy::new_single(group_exprs?);

let mut map_converter =
crate::execution::utils::HashAggregateMapConverter::default();

// Currently DataFusion does not support grouping on Map type, as such pass the
// `group_exprs` through `maybe_wrap_map_type_in_grouping_exprs` which canonicalizes
// any Map type to a List of Struct types for grouping.
let maybe_wrapped_group_exprs = map_converter
.maybe_wrap_map_type_in_grouping_exprs(
&self.session_ctx.state(),
group_exprs?,
child.schema(),
)?;

let group_by = PhysicalGroupBy::new_single(maybe_wrapped_group_exprs);
let schema = child.schema();

let mode = if agg.mode == 0 {
Expand All @@ -1234,20 +1248,36 @@ impl PhysicalPlanner {
Arc::clone(&schema),
)?,
);

// To maintain schema consistency, the `AggregateExec` output is passed through
// `maybe_project_map_type_with_aggregation` which adds a projection that converts
// any canonicalized Map back to its original Map type. Not doing so will
// result in schema mismatch between Spark and DataFusion.
let maybe_aggregate_with_project = map_converter
.maybe_project_map_type_with_aggregation(
&self.session_ctx.state(),
agg,
aggregate,
)?;

let result_exprs: PhyExprResult = agg
.result_exprs
.iter()
.enumerate()
.map(|(idx, expr)| {
self.create_expr(expr, aggregate.schema())
self.create_expr(expr, maybe_aggregate_with_project.schema())
.map(|r| (r, format!("col_{idx}")))
})
.collect();

if agg.result_exprs.is_empty() {
Ok((
scans,
Arc::new(SparkPlan::new(spark_plan.plan_id, aggregate, vec![child])),
Arc::new(SparkPlan::new(
spark_plan.plan_id,
maybe_aggregate_with_project,
vec![child],
)),
))
} else {
// For final aggregation, DF's hash aggregate exec doesn't support Spark's
Expand All @@ -1259,15 +1289,15 @@ impl PhysicalPlanner {
// Spark side.
let projection = Arc::new(ProjectionExec::try_new(
result_exprs?,
Arc::clone(&aggregate),
Arc::clone(&maybe_aggregate_with_project),
)?);
Ok((
scans,
Arc::new(SparkPlan::new_with_additional(
spark_plan.plan_id,
projection,
vec![child],
vec![aggregate],
vec![maybe_aggregate_with_project],
)),
))
}
Expand Down
140 changes: 140 additions & 0 deletions native/core/src/execution/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,21 @@
/// Utils for array vector, etc.
use crate::errors::ExpressionError;
use crate::execution::operators::ExecutionError;
use arrow::datatypes::{DataType, Field, SchemaRef};
use arrow::{
array::ArrayData,
error::ArrowError,
ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
};
use datafusion::execution::FunctionRegistry;
use datafusion::physical_expr::expressions::Column;
use datafusion::physical_expr::{PhysicalExpr, ScalarFunctionExpr};
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_comet_proto::spark_operator::HashAggregate;
use datafusion_comet_spark_expr::create_comet_physical_fun;
use std::collections::HashMap;
use std::sync::Arc;

impl From<ArrowError> for ExecutionError {
fn from(error: ArrowError) -> ExecutionError {
Expand Down Expand Up @@ -127,3 +137,133 @@ pub fn bytes_to_i128(slice: &[u8]) -> i128 {

i128::from_le_bytes(bytes)
}

type GroupingExprs = Vec<(Arc<dyn PhysicalExpr>, String)>;
type GroupingExprResult = Result<GroupingExprs, ExecutionError>;

/// Provides utilities to support grouping on Map type in HashAggregate.
pub struct HashAggregateMapConverter {
// Maps index of a grouping expression to its original Map type. This is used to convert a
// grouping expression return type back to Map type after aggregation.
expr_index_to_map_type: HashMap<usize, DataType>,
}

impl HashAggregateMapConverter {
pub fn default() -> Self {
Self {
expr_index_to_map_type: HashMap::new(),
}
}

/// Iterates through grouping expressions, and wraps those with Map type with `map_to_list`
/// scalar function.
pub fn maybe_wrap_map_type_in_grouping_exprs(
&mut self,
fn_registry: &dyn FunctionRegistry,
grouping_exprs: GroupingExprs,
child_schema: SchemaRef,
) -> GroupingExprResult {
grouping_exprs
.into_iter()
.enumerate()
.map(|(idx, (physical_expr, expr_name))| {
let expr_data_type = physical_expr.data_type(&child_schema)?;

if let DataType::Map(field_ref, _) = &expr_data_type {
let list_type = DataType::List(Arc::clone(field_ref));

// Update the map with the grouping expression index and its original Map type.
self.expr_index_to_map_type
.insert(idx, expr_data_type.clone());

// Create `map_to_list` expression to wrap the original grouping expression.
let map_to_list_func = create_comet_physical_fun(
"map_to_list",
list_type.clone(),
fn_registry,
None,
)?;
let map_to_list_expr = ScalarFunctionExpr::new(
"map_to_list",
map_to_list_func,
vec![physical_expr],
Arc::new(Field::new("map_to_list", list_type, true)),
);

// Return the scalar function expression.
Ok((
Arc::new(map_to_list_expr) as Arc<dyn PhysicalExpr>,
expr_name,
))
} else {
Ok((physical_expr, expr_name))
}
})
.collect()
}

/// Iterates over the aggregate schema, find the grouping expressions with Map type, and
/// wraps them with `map_from_list` scalar function to convert them back to Map type. It returns
/// a new ProjectionExec stacked on top of the original aggregate execution plan. If there was
/// no grouping expression with Map type, it returns the original aggregate execution plan.
pub fn maybe_project_map_type_with_aggregation(
&self,
fn_registry: &dyn FunctionRegistry,
hash_agg: &HashAggregate,
aggregate: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>, ExecutionError> {
// If there was no grouping expression with Map type, return the original aggregate plan.
if self.expr_index_to_map_type.is_empty() {
return Ok(aggregate);
}

// Insert the projection expressions in this.
let mut projection_exprs = Vec::new();

let num_grouping_cols = hash_agg.grouping_exprs.len();
let agg_schema = aggregate.schema();

// Iterate through the aggregate schema. The aggregate schema contains both grouping
// expressions and aggregate expressions. The grouping expressions are at the beginning of
// the schema.
for (field_idx, field) in agg_schema.fields().iter().enumerate() {
let opt_map_type = self.expr_index_to_map_type.get(&field_idx);

// If the current field is not a grouping expression or the grouping expression does not
// have Map type, then project the current field as it is.
if field_idx >= num_grouping_cols || opt_map_type.is_none() {
let col_expr =
Arc::new(Column::new(field.name(), field_idx)) as Arc<dyn PhysicalExpr>;
projection_exprs.push((col_expr, field.name().to_string()));
continue;
}

let map_type = opt_map_type.unwrap();

// Create `map_from_list` expression to convert the List type back to Map type. This
// expression was previously wrapped with `map_to_list` during grouping.
let map_from_list_func =
create_comet_physical_fun("map_from_list", map_type.clone(), fn_registry, None)?;
let col_expr = Arc::new(Column::new(field.name(), field_idx));
let map_to_list_expr = Arc::new(ScalarFunctionExpr::new(
"map_from_list",
map_from_list_func,
vec![col_expr],
Arc::new(Field::new(
field.name(),
map_type.clone(),
field.is_nullable(),
)),
)) as Arc<dyn PhysicalExpr>;

// Add the `map_from_list` expression to the projection expressions.
projection_exprs.push((map_to_list_expr, field.name().to_string()));
}

// Return a new ProjectionExec on top of the original aggregate plan.
Ok(Arc::new(ProjectionExec::try_new(
projection_exprs,
Arc::clone(&aggregate),
)?) as Arc<dyn ExecutionPlan>)
}
}
13 changes: 13 additions & 0 deletions native/spark-expr/src/comet_scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::hash_funcs::*;
use crate::map_funcs::{map_from_list, map_to_list, spark_map_sort};
use crate::math_funcs::checked_arithmetic::{checked_add, checked_div, checked_mul, checked_sub};
use crate::math_funcs::modulo_expr::spark_modulo;
use crate::{
Expand Down Expand Up @@ -157,6 +158,18 @@ pub fn create_comet_physical_fun(
let fail_on_error = fail_on_error.unwrap_or(false);
make_comet_scalar_udf!("spark_modulo", func, without data_type, fail_on_error)
}
"map_sort" => {
let func = Arc::new(spark_map_sort);
make_comet_scalar_udf!("spark_map_sort", func, without data_type)
}
"map_to_list" => {
let func = Arc::new(map_to_list);
make_comet_scalar_udf!("map_to_list", func, without data_type)
}
"map_from_list" => {
let func = Arc::new(map_from_list);
make_comet_scalar_udf!("map_from_list", func, without data_type)
}
_ => registry.udf(fun_name).map_err(|e| {
DataFusionError::Execution(format!(
"Function {fun_name} not found in the registry: {e}",
Expand Down
1 change: 1 addition & 0 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub use bloom_filter::{BloomFilterAgg, BloomFilterMightContain};

mod conditional_funcs;
mod conversion_funcs;
mod map_funcs;
mod math_funcs;
mod nondetermenistic_funcs;

Expand Down
Loading
Loading