Skip to content

Commit e73fff0

Browse files
authored
feat: monotonically_increasing_id and spark_partition_id implementation (#2037)
1 parent 1c111d5 commit e73fff0

File tree

9 files changed

+305
-28
lines changed

9 files changed

+305
-28
lines changed

dev/diffs/4.0.0.diff

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git a/pom.xml b/pom.xml
2-
index 443d46a4302..3b8483173f1 100644
2+
index a4b1b2c3c9f..63ec4784625 100644
33
--- a/pom.xml
44
+++ b/pom.xml
55
@@ -148,6 +148,8 @@
@@ -1732,14 +1732,14 @@ index aed11badb71..ab7e9456e26 100644
17321732
spark.range(1).foreach { _ =>
17331733
columnarToRowExec.canonicalized
17341734
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
1735-
index a3cfdc5a240..f4afc393ba0 100644
1735+
index a3cfdc5a240..1b08a1f42ee 100644
17361736
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
17371737
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
17381738
@@ -22,6 +22,7 @@ import org.apache.spark.rdd.MapPartitionsWithEvaluatorRDD
17391739
import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
17401740
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
17411741
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator}
1742-
+import org.apache.spark.sql.comet.{CometHashJoinExec, CometSortExec, CometSortMergeJoinExec}
1742+
+import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometHashJoinExec, CometSortExec, CometSortMergeJoinExec}
17431743
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
17441744
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
17451745
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
@@ -1952,6 +1952,16 @@ index a3cfdc5a240..f4afc393ba0 100644
19521952
val projection = Seq.tabulate(columnNum)(i => s"c$i + c$i as newC$i")
19531953
val df = spark.read.parquet(path).selectExpr(projection: _*)
19541954

1955+
@@ -815,6 +852,9 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1956+
assert(distinctWithId.queryExecution.executedPlan.exists {
1957+
case WholeStageCodegenExec(
1958+
ProjectExec(_, BroadcastHashJoinExec(_, _, _, _, _, _: HashAggregateExec, _, _))) => true
1959+
+ case WholeStageCodegenExec(
1960+
+ ProjectExec(_, BroadcastHashJoinExec(_, _, _, _, _, _: CometColumnarToRowExec, _, _))) =>
1961+
+ true
1962+
case _ => false
1963+
})
1964+
checkAnswer(distinctWithId, Seq(Row(1, 0), Row(1, 0)))
19551965
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
19561966
index 272be70f9fe..06957694002 100644
19571967
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

docs/spark_expressions_support.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,11 +349,11 @@
349349
- [ ] input_file_block_length
350350
- [ ] input_file_block_start
351351
- [ ] input_file_name
352-
- [ ] monotonically_increasing_id
352+
- [x] monotonically_increasing_id
353353
- [ ] raise_error
354354
- [x] rand
355355
- [x] randn
356-
- [ ] spark_partition_id
356+
- [x] spark_partition_id
357357
- [ ] typeof
358358
- [x] user
359359
- [ ] uuid

native/core/src/execution/planner.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ use datafusion_comet_proto::{
100100
},
101101
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
102102
};
103+
use datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncreasingId;
103104
use datafusion_comet_spark_expr::{
104105
ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation, Covariance, CreateNamedStruct,
105106
GetArrayStructFields, GetStructField, IfExpr, ListExtract, NormalizeNaNAndZero, RLike,
@@ -831,6 +832,12 @@ impl PhysicalPlanner {
831832
let seed = expr.seed.wrapping_add(self.partition.into());
832833
Ok(Arc::new(RandnExpr::new(seed)))
833834
}
835+
ExprStruct::SparkPartitionId(_) => Ok(Arc::new(DataFusionLiteral::new(
836+
ScalarValue::Int32(Some(self.partition)),
837+
))),
838+
ExprStruct::MonotonicallyIncreasingId(_) => Ok(Arc::new(
839+
MonotonicallyIncreasingId::from_partition_id(self.partition),
840+
)),
834841
expr => Err(GeneralError(format!("Not implemented: {expr:?}"))),
835842
}
836843
}

native/proto/src/proto/expr.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ message Expr {
8282
ToPrettyString to_pretty_string = 60;
8383
Rand rand = 61;
8484
Rand randn = 62;
85+
EmptyExpr spark_partition_id = 63;
86+
EmptyExpr monotonically_increasing_id = 64;
8587
}
8688
}
8789

@@ -250,6 +252,9 @@ message UnaryExpr {
250252
Expr child = 1;
251253
}
252254

255+
message EmptyExpr {
256+
}
257+
253258
// Bound to a particular vector array in input batch.
254259
message BoundReference {
255260
int32 index = 1;

native/spark-expr/src/nondetermenistic_funcs/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
pub mod internal;
19+
pub mod monotonically_increasing_id;
1920
pub mod rand;
2021
pub mod randn;
2122

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{Int64Array, RecordBatch};
19+
use arrow::datatypes::{DataType, Schema};
20+
use datafusion::common::Result;
21+
use datafusion::logical_expr::ColumnarValue;
22+
use datafusion::physical_expr::PhysicalExpr;
23+
use std::any::Any;
24+
use std::fmt::{Debug, Display, Formatter};
25+
use std::hash::{Hash, Hasher};
26+
use std::sync::atomic::{AtomicI64, Ordering};
27+
use std::sync::Arc;
28+
29+
#[derive(Debug)]
30+
pub struct MonotonicallyIncreasingId {
31+
initial_offset: i64,
32+
current_offset: AtomicI64,
33+
}
34+
35+
impl MonotonicallyIncreasingId {
36+
pub fn from_offset(offset: i64) -> Self {
37+
Self {
38+
initial_offset: offset,
39+
current_offset: AtomicI64::new(offset),
40+
}
41+
}
42+
43+
pub fn from_partition_id(partition: i32) -> Self {
44+
Self::from_offset((partition as i64) << 33)
45+
}
46+
}
47+
48+
impl Display for MonotonicallyIncreasingId {
49+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50+
write!(f, "monotonically_increasing_id()")
51+
}
52+
}
53+
54+
impl PartialEq for MonotonicallyIncreasingId {
55+
fn eq(&self, other: &Self) -> bool {
56+
self.initial_offset == other.initial_offset
57+
}
58+
}
59+
60+
impl Eq for MonotonicallyIncreasingId {}
61+
62+
impl Hash for MonotonicallyIncreasingId {
63+
fn hash<H: Hasher>(&self, state: &mut H) {
64+
self.initial_offset.hash(state);
65+
}
66+
}
67+
68+
impl PhysicalExpr for MonotonicallyIncreasingId {
69+
fn as_any(&self) -> &dyn Any {
70+
self
71+
}
72+
73+
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
74+
let start = self
75+
.current_offset
76+
.fetch_add(batch.num_rows() as i64, Ordering::Relaxed);
77+
let end = start + batch.num_rows() as i64;
78+
let array_ref = Arc::new(Int64Array::from_iter_values(start..end));
79+
Ok(ColumnarValue::Array(array_ref))
80+
}
81+
82+
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
83+
vec![]
84+
}
85+
86+
fn with_new_children(
87+
self: Arc<Self>,
88+
_: Vec<Arc<dyn PhysicalExpr>>,
89+
) -> Result<Arc<dyn PhysicalExpr>> {
90+
Ok(self)
91+
}
92+
93+
fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
94+
unimplemented!()
95+
}
96+
97+
fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
98+
Ok(DataType::Int64)
99+
}
100+
101+
fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
102+
Ok(false)
103+
}
104+
}
105+
106+
#[cfg(test)]
107+
mod tests {
108+
use super::*;
109+
use arrow::array::{Array, Int64Array};
110+
use arrow::compute::concat;
111+
use arrow::{array::StringArray, datatypes::*};
112+
use datafusion::common::cast::as_int64_array;
113+
114+
#[test]
115+
fn test_monotonically_increasing_id_single_batch() -> Result<()> {
116+
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
117+
let data = StringArray::from(vec![Some("foo"), None, None, Some("bar"), None]);
118+
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)])?;
119+
let mid_expr = MonotonicallyIncreasingId::from_offset(0);
120+
let result = mid_expr.evaluate(&batch)?.into_array(batch.num_rows())?;
121+
let result = as_int64_array(&result)?;
122+
let expected = &Int64Array::from_iter_values(0..batch.num_rows() as i64);
123+
assert_eq!(result, expected);
124+
Ok(())
125+
}
126+
127+
#[test]
128+
fn test_monotonically_increasing_id_multi_batch() -> Result<()> {
129+
let first_batch_schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
130+
let first_batch_data = Int64Array::from(vec![Some(42), None]);
131+
let second_batch_schema = first_batch_schema.clone();
132+
let second_batch_data = Int64Array::from(vec![None, Some(-42), None]);
133+
let starting_offset: i64 = 100;
134+
let mid_expr = MonotonicallyIncreasingId::from_offset(starting_offset);
135+
let first_batch = RecordBatch::try_new(
136+
Arc::new(first_batch_schema),
137+
vec![Arc::new(first_batch_data)],
138+
)?;
139+
let first_batch_result = mid_expr
140+
.evaluate(&first_batch)?
141+
.into_array(first_batch.num_rows())?;
142+
let second_batch = RecordBatch::try_new(
143+
Arc::new(second_batch_schema),
144+
vec![Arc::new(second_batch_data)],
145+
)?;
146+
let second_batch_result = mid_expr
147+
.evaluate(&second_batch)?
148+
.into_array(second_batch.num_rows())?;
149+
let result_arrays: Vec<&dyn Array> = vec![
150+
as_int64_array(&first_batch_result)?,
151+
as_int64_array(&second_batch_result)?,
152+
];
153+
let result_arrays = &concat(&result_arrays)?;
154+
let final_result = as_int64_array(result_arrays)?;
155+
let range_start = starting_offset;
156+
let range_end =
157+
starting_offset + first_batch.num_rows() as i64 + second_batch.num_rows() as i64;
158+
let expected = &Int64Array::from_iter_values(range_start..range_end);
159+
assert_eq!(final_result, expected);
160+
Ok(())
161+
}
162+
}

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,11 @@ object QueryPlanSerde extends Logging with CometExprShim {
125125
classOf[MapKeys] -> CometMapKeys,
126126
classOf[MapValues] -> CometMapValues,
127127
classOf[MapFromArrays] -> CometMapFromArrays,
128-
classOf[GetMapValue] -> CometMapExtract)
128+
classOf[GetMapValue] -> CometMapExtract,
129+
classOf[Rand] -> CometRand,
130+
classOf[Randn] -> CometRandn,
131+
classOf[SparkPartitionID] -> CometSparkPartitionId,
132+
classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId)
129133

130134
def emitWarning(reason: String): Unit = {
131135
logWarning(s"Comet native execution is disabled due to: $reason")
@@ -1729,28 +1733,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
17291733
convert(CometArrayCompact)
17301734
case _: ArrayExcept =>
17311735
convert(CometArrayExcept)
1732-
case Rand(child, _) =>
1733-
val seed = child match {
1734-
case Literal(seed: Long, _) => Some(seed)
1735-
case Literal(null, _) => Some(0L)
1736-
case _ => None
1737-
}
1738-
seed.map(seed =>
1739-
ExprOuterClass.Expr
1740-
.newBuilder()
1741-
.setRand(ExprOuterClass.Rand.newBuilder().setSeed(seed))
1742-
.build())
1743-
case Randn(child, _) =>
1744-
val seed = child match {
1745-
case Literal(seed: Long, _) => Some(seed)
1746-
case Literal(null, _) => Some(0L)
1747-
case _ => None
1748-
}
1749-
seed.map(seed =>
1750-
ExprOuterClass.Expr
1751-
.newBuilder()
1752-
.setRandn(ExprOuterClass.Rand.newBuilder().setSeed(seed))
1753-
.build())
17541736
case expr =>
17551737
QueryPlanSerde.exprSerdeMap.get(expr.getClass) match {
17561738
case Some(handler) => convert(handler)
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, MonotonicallyIncreasingID, Rand, Randn, SparkPartitionID}
23+
24+
object CometSparkPartitionId extends CometExpressionSerde {
25+
override def convert(
26+
expr: Expression,
27+
_inputs: Seq[Attribute],
28+
_binding: Boolean): Option[ExprOuterClass.Expr] = {
29+
assert(expr.isInstanceOf[SparkPartitionID])
30+
Some(
31+
ExprOuterClass.Expr
32+
.newBuilder()
33+
.setSparkPartitionId(ExprOuterClass.EmptyExpr.newBuilder())
34+
.build())
35+
}
36+
}
37+
38+
object CometMonotonicallyIncreasingId extends CometExpressionSerde {
39+
override def convert(
40+
expr: Expression,
41+
_inputs: Seq[Attribute],
42+
_binding: Boolean): Option[ExprOuterClass.Expr] = {
43+
assert(expr.isInstanceOf[MonotonicallyIncreasingID])
44+
Some(
45+
ExprOuterClass.Expr
46+
.newBuilder()
47+
.setMonotonicallyIncreasingId(ExprOuterClass.EmptyExpr.newBuilder())
48+
.build())
49+
}
50+
}
51+
52+
sealed abstract class CometRandCommonSerde extends CometExpressionSerde {
53+
protected def extractSeedFromExpr(expr: Expression): Option[Long] = {
54+
expr match {
55+
case Literal(seed: Long, _) => Some(seed)
56+
case Literal(null, _) => Some(0L)
57+
case _ => None
58+
}
59+
}
60+
}
61+
62+
object CometRand extends CometRandCommonSerde {
63+
override def convert(
64+
expr: Expression,
65+
inputs: Seq[Attribute],
66+
binding: Boolean): Option[ExprOuterClass.Expr] = {
67+
val Rand(child, _) = expr
68+
extractSeedFromExpr(child).map { seed =>
69+
ExprOuterClass.Expr
70+
.newBuilder()
71+
.setRand(ExprOuterClass.Rand.newBuilder().setSeed(seed))
72+
.build()
73+
}
74+
}
75+
}
76+
77+
object CometRandn extends CometRandCommonSerde {
78+
override def convert(
79+
expr: Expression,
80+
inputs: Seq[Attribute],
81+
binding: Boolean): Option[ExprOuterClass.Expr] = {
82+
val Randn(child, _) = expr
83+
extractSeedFromExpr(child).map { seed =>
84+
ExprOuterClass.Expr
85+
.newBuilder()
86+
.setRandn(ExprOuterClass.Rand.newBuilder().setSeed(seed))
87+
.build()
88+
}
89+
}
90+
}

0 commit comments

Comments
 (0)