Skip to content

Commit f8ed109

Browse files
feat: limit with offset support (#2070)
* feat: positive offset support for queries with limit * fix stability suite and spark tests * fix stability suite and spark tests * rolled back accidentally changed logic in a test * replace assert with explicit Err return * fix clippy warnings * Update native/core/src/execution/planner.rs Co-authored-by: Oleks V <[email protected]> * Update spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala Co-authored-by: Oleks V <[email protected]> * Update native/core/src/execution/planner.rs Co-authored-by: Oleks V <[email protected]> * fix compile errors and format warnings * zero offset and limit test --------- Co-authored-by: Oleks V <[email protected]>
1 parent 7976b94 commit f8ed109

File tree

162 files changed

+292
-201
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

162 files changed

+292
-201
lines changed

dev/diffs/4.0.0.diff

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git a/pom.xml b/pom.xml
2-
index a4b1b2c3c9f..63ec4784625 100644
2+
index 443d46a4302..63ec4784625 100644
33
--- a/pom.xml
44
+++ b/pom.xml
55
@@ -148,6 +148,8 @@
@@ -1523,19 +1523,30 @@ index 418ca3430bb..eb8267192f8 100644
15231523
withTempPath { path =>
15241524
val dir = path.getCanonicalPath
15251525
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala
1526-
index d1b11a74cf3..8ea0129b3af 100644
1526+
index d1b11a74cf3..08087c80201 100644
15271527
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala
15281528
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala
1529-
@@ -17,7 +17,7 @@
1529+
@@ -17,8 +17,9 @@
15301530

15311531
package org.apache.spark.sql.execution
15321532

15331533
-import org.apache.spark.sql.{Dataset, QueryTest}
15341534
+import org.apache.spark.sql.{Dataset, IgnoreComet, QueryTest}
15351535
import org.apache.spark.sql.IntegratedUDFTestUtils._
1536+
+import org.apache.spark.sql.comet.CometCollectLimitExec
15361537
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
15371538
import org.apache.spark.sql.functions.rand
1538-
@@ -77,7 +77,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
1539+
import org.apache.spark.sql.internal.SQLConf
1540+
@@ -39,7 +40,7 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
1541+
1542+
private def assertHasCollectLimitExec(plan: SparkPlan): Unit = {
1543+
assert(find(plan) {
1544+
- case _: CollectLimitExec => true
1545+
+ case _: CollectLimitExec | _: CometCollectLimitExec => true
1546+
case _ => false
1547+
}.isDefined)
1548+
}
1549+
@@ -77,7 +78,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
15391550
assert(!hasLocalSort(physicalPlan))
15401551
}
15411552

@@ -1546,7 +1557,7 @@ index d1b11a74cf3..8ea0129b3af 100644
15461557
withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") {
15471558
val df = spark.range(10).orderBy($"id" % 8).limit(2)
15481559
df.collect()
1549-
@@ -88,7 +90,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
1560+
@@ -88,7 +91,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
15501561
}
15511562
}
15521563

@@ -1557,7 +1568,7 @@ index d1b11a74cf3..8ea0129b3af 100644
15571568
withSQLConf(
15581569
SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1",
15591570
// To trigger the bug, we have to disable the coalescing optimization. Otherwise we use only
1560-
@@ -117,7 +121,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
1571+
@@ -117,7 +122,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
15611572
assert(!hasLocalSort(physicalPlan))
15621573
}
15631574

native/core/src/execution/planner.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ use arrow::buffer::BooleanBuffer;
9494
use datafusion::common::utils::SingleRowListArrayBuilder;
9595
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
9696
use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec;
97+
use datafusion::physical_plan::limit::GlobalLimitExec;
9798
use datafusion_comet_proto::spark_operator::SparkFilePartition;
9899
use datafusion_comet_proto::{
99100
spark_expression::{
@@ -1283,12 +1284,30 @@ impl PhysicalPlanner {
12831284
OpStruct::Limit(limit) => {
12841285
assert_eq!(children.len(), 1);
12851286
let num = limit.limit;
1287+
let offset: i32 = limit.offset;
1288+
if num != -1 && offset > num {
1289+
return Err(GeneralError(format!(
1290+
"Invalid limit/offset combination: [{num}. {offset}]"
1291+
)));
1292+
}
12861293
let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?;
1287-
1288-
let limit = Arc::new(LocalLimitExec::new(
1289-
Arc::clone(&child.native_plan),
1290-
num as usize,
1291-
));
1294+
let limit: Arc<dyn ExecutionPlan> = if offset == 0 {
1295+
Arc::new(LocalLimitExec::new(
1296+
Arc::clone(&child.native_plan),
1297+
num as usize,
1298+
))
1299+
} else {
1300+
let fetch = if num == -1 {
1301+
None
1302+
} else {
1303+
Some((num - offset) as usize)
1304+
};
1305+
Arc::new(GlobalLimitExec::new(
1306+
Arc::clone(&child.native_plan),
1307+
offset as usize,
1308+
fetch,
1309+
))
1310+
};
12921311
Ok((
12931312
scans,
12941313
Arc::new(SparkPlan::new(spark_plan.plan_id, limit, vec![child])),
@@ -1305,23 +1324,26 @@ impl PhysicalPlanner {
13051324
.collect();
13061325

13071326
let fetch = sort.fetch.map(|num| num as usize);
1308-
13091327
// SortExec caches batches so we need to make a copy of incoming batches. Also,
13101328
// SortExec fails in some cases if we do not unpack dictionary-encoded arrays, and
13111329
// it would be more efficient if we could avoid that.
13121330
// https://github.com/apache/datafusion-comet/issues/963
13131331
let child_copied = Self::wrap_in_copy_exec(Arc::clone(&child.native_plan));
13141332

1315-
let sort = Arc::new(
1333+
let mut sort_exec: Arc<dyn ExecutionPlan> = Arc::new(
13161334
SortExec::new(LexOrdering::new(exprs?).unwrap(), Arc::clone(&child_copied))
13171335
.with_fetch(fetch),
13181336
);
13191337

1338+
if let Some(skip) = sort.skip.filter(|&n| n > 0).map(|n| n as usize) {
1339+
sort_exec = Arc::new(GlobalLimitExec::new(sort_exec, skip, None));
1340+
}
1341+
13201342
Ok((
13211343
scans,
13221344
Arc::new(SparkPlan::new(
13231345
spark_plan.plan_id,
1324-
sort,
1346+
sort_exec,
13251347
vec![Arc::clone(&child)],
13261348
)),
13271349
))

native/proto/src/proto/operator.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ message Filter {
117117
message Sort {
118118
repeated spark.spark_expression.Expr sort_orders = 1;
119119
optional int32 fetch = 3;
120+
optional int32 skip = 4;
120121
}
121122

122123
message HashAggregate {

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,10 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
201201
case op: LocalLimitExec =>
202202
newPlanWithProto(op, CometLocalLimitExec(_, op, op.limit, op.child, SerializedPlan(None)))
203203

204-
case op: GlobalLimitExec if op.offset == 0 =>
204+
case op: GlobalLimitExec =>
205205
newPlanWithProto(
206206
op,
207-
CometGlobalLimitExec(_, op, op.limit, op.child, SerializedPlan(None)))
207+
CometGlobalLimitExec(_, op, op.limit, op.offset, op.child, SerializedPlan(None)))
208208

209209
case op: CollectLimitExec =>
210210
val fallbackReasons = new ListBuffer[String]()
@@ -214,9 +214,6 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
214214
if (!isCometShuffleEnabled(conf)) {
215215
fallbackReasons += "Comet shuffle is not enabled"
216216
}
217-
if (op.offset != 0) {
218-
fallbackReasons += "CollectLimit with non-zero offset is not supported"
219-
}
220217
if (fallbackReasons.nonEmpty) {
221218
withInfos(op, fallbackReasons.toSet)
222219
} else {
@@ -382,6 +379,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
382379
s,
383380
s.output,
384381
s.limit,
382+
s.offset,
385383
s.sortOrder,
386384
s.projectList,
387385
s.child)

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1840,13 +1840,10 @@ object QueryPlanSerde extends Logging with CometExprShim {
18401840

18411841
case globalLimitExec: GlobalLimitExec
18421842
if CometConf.COMET_EXEC_GLOBAL_LIMIT_ENABLED.get(conf) =>
1843-
// TODO: We don't support negative limit for now.
1844-
if (childOp.nonEmpty && globalLimitExec.limit >= 0) {
1843+
if (childOp.nonEmpty) {
18451844
val limitBuilder = OperatorOuterClass.Limit.newBuilder()
18461845

1847-
// TODO: Spark 3.3 might have negative limit (-1) for Offset usage.
1848-
// When we upgrade to Spark 3.3., we need to address it here.
1849-
limitBuilder.setLimit(globalLimitExec.limit)
1846+
limitBuilder.setLimit(globalLimitExec.limit).setOffset(globalLimitExec.offset)
18501847

18511848
Some(builder.setLimit(limitBuilder).build())
18521849
} else {

spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ import com.google.common.base.Objects
3636
*
3737
* Similar to `CometTakeOrderedAndProjectExec`, it contains two native executions seperated by a
3838
* Comet shuffle.
39-
*
40-
* TODO: support offset semantics
4139
*/
4240
case class CometCollectLimitExec(
4341
override val originalPlan: SparkPlan,
@@ -64,7 +62,10 @@ case class CometCollectLimitExec(
6462
new UnsafeRowSerializer(child.output.size, longMetric("dataSize"))
6563

6664
override def executeCollect(): Array[InternalRow] = {
67-
ColumnarToRowExec(child).executeTake(limit)
65+
val rows =
66+
if (limit >= 0) ColumnarToRowExec(child).executeTake(limit)
67+
else ColumnarToRowExec(child).executeCollect()
68+
if (offset > 0) rows.drop(offset) else rows
6869
}
6970

7071
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
@@ -91,7 +92,7 @@ case class CometCollectLimitExec(
9192

9293
new CometShuffledBatchRDD(dep, readMetrics)
9394
}
94-
CometExecUtils.getNativeLimitRDD(singlePartitionRDD, output, limit)
95+
CometExecUtils.getNativeLimitRDD(singlePartitionRDD, output, limit, offset)
9596
}
9697
}
9798

spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,11 @@ object CometExecUtils {
5050
def getNativeLimitRDD(
5151
childPlan: RDD[ColumnarBatch],
5252
outputAttribute: Seq[Attribute],
53-
limit: Int): RDD[ColumnarBatch] = {
53+
limit: Int,
54+
offset: Int = 0): RDD[ColumnarBatch] = {
5455
val numParts = childPlan.getNumPartitions
5556
childPlan.mapPartitionsWithIndexInternal { case (idx, iter) =>
56-
val limitOp = CometExecUtils.getLimitNativePlan(outputAttribute, limit).get
57+
val limitOp = CometExecUtils.getLimitNativePlan(outputAttribute, limit, offset).get
5758
CometExec.getCometIterator(Seq(iter), outputAttribute.length, limitOp, numParts, idx)
5859
}
5960
}
@@ -66,8 +67,9 @@ object CometExecUtils {
6667
outputAttributes: Seq[Attribute],
6768
sortOrder: Seq[SortOrder],
6869
child: SparkPlan,
69-
limit: Int): Option[Operator] = {
70-
getTopKNativePlan(outputAttributes, sortOrder, child, limit).flatMap { topK =>
70+
limit: Int,
71+
offset: Int = 0): Option[Operator] = {
72+
getTopKNativePlan(outputAttributes, sortOrder, child, limit, offset).flatMap { topK =>
7173
val exprs = projectList.map(exprToProto(_, child.output))
7274

7375
if (exprs.forall(_.isDefined)) {
@@ -87,7 +89,10 @@ object CometExecUtils {
8789
* Prepare Limit native plan for Comet operators which take the first `limit` elements of each
8890
* child partition
8991
*/
90-
def getLimitNativePlan(outputAttributes: Seq[Attribute], limit: Int): Option[Operator] = {
92+
def getLimitNativePlan(
93+
outputAttributes: Seq[Attribute],
94+
limit: Int,
95+
offset: Int = 0): Option[Operator] = {
9196
val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("LimitInput")
9297
val scanOpBuilder = OperatorOuterClass.Operator.newBuilder()
9398

@@ -100,6 +105,7 @@ object CometExecUtils {
100105

101106
val limitBuilder = OperatorOuterClass.Limit.newBuilder()
102107
limitBuilder.setLimit(limit)
108+
limitBuilder.setOffset(offset)
103109

104110
val limitOpBuilder = OperatorOuterClass.Operator
105111
.newBuilder()
@@ -117,7 +123,8 @@ object CometExecUtils {
117123
outputAttributes: Seq[Attribute],
118124
sortOrder: Seq[SortOrder],
119125
child: SparkPlan,
120-
limit: Int): Option[Operator] = {
126+
limit: Int,
127+
offset: Int = 0): Option[Operator] = {
121128
val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("TopKInput")
122129
val scanOpBuilder = OperatorOuterClass.Operator.newBuilder()
123130

@@ -134,6 +141,7 @@ object CometExecUtils {
134141
val sortBuilder = OperatorOuterClass.Sort.newBuilder()
135142
sortBuilder.addAllSortOrders(sortOrders.map(_.get).asJava)
136143
sortBuilder.setFetch(limit)
144+
sortBuilder.setSkip(offset)
137145

138146
val sortOpBuilder = OperatorOuterClass.Operator
139147
.newBuilder()

spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ case class CometTakeOrderedAndProjectExec(
4343
override val originalPlan: SparkPlan,
4444
override val output: Seq[Attribute],
4545
limit: Int,
46+
offset: Int,
4647
sortOrder: Seq[SortOrder],
4748
projectList: Seq[NamedExpression],
4849
child: SparkPlan)
@@ -68,7 +69,7 @@ case class CometTakeOrderedAndProjectExec(
6869

6970
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
7071
val childRDD = child.executeColumnar()
71-
if (childRDD.getNumPartitions == 0) {
72+
if (childRDD.getNumPartitions == 0 || limit == 0) {
7273
new ParallelCollectionRDD(sparkContext, Seq.empty[ColumnarBatch], 1, Map.empty)
7374
} else {
7475
val singlePartitionRDD = if (childRDD.getNumPartitions == 1) {
@@ -101,7 +102,7 @@ case class CometTakeOrderedAndProjectExec(
101102

102103
singlePartitionRDD.mapPartitionsInternal { iter =>
103104
val topKAndProjection = CometExecUtils
104-
.getProjectionNativePlan(projectList, child.output, sortOrder, child, limit)
105+
.getProjectionNativePlan(projectList, child.output, sortOrder, child, limit, offset)
105106
.get
106107
val it = CometExec.getCometIterator(Seq(iter), output.length, topKAndProjection, 1, 0)
107108
setSubqueries(it.id, this)
@@ -122,19 +123,19 @@ case class CometTakeOrderedAndProjectExec(
122123
val orderByString = truncatedString(sortOrder, "[", ",", "]", maxFields)
123124
val outputString = truncatedString(output, "[", ",", "]", maxFields)
124125

125-
s"CometTakeOrderedAndProjectExec(limit=$limit, orderBy=$orderByString, output=$outputString)"
126+
s"CometTakeOrderedAndProjectExec(limit=$limit, offset=$offset, " +
127+
s"orderBy=$orderByString, output=$outputString)"
126128
}
127129

128130
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
129131
this.copy(child = newChild)
130132
}
131133

132134
object CometTakeOrderedAndProjectExec {
133-
// TODO: support offset for Spark 3.4
134135
def isSupported(plan: TakeOrderedAndProjectExec): Boolean = {
135136
val exprs = plan.projectList.map(exprToProto(_, plan.child.output))
136137
val sortOrders = plan.sortOrder.map(exprToProto(_, plan.child.output))
137-
exprs.forall(_.isDefined) && sortOrders.forall(_.isDefined) && plan.offset == 0 &&
138+
exprs.forall(_.isDefined) && sortOrders.forall(_.isDefined) &&
138139
supportedSortType(plan, plan.sortOrder)
139140
}
140141
}

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,7 @@ case class CometGlobalLimitExec(
577577
override val nativeOp: Operator,
578578
override val originalPlan: SparkPlan,
579579
limit: Int,
580+
offset: Int,
580581
child: SparkPlan,
581582
override val serializedPlanOpt: SerializedPlan)
582583
extends CometUnaryExec {
@@ -588,20 +589,23 @@ case class CometGlobalLimitExec(
588589
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
589590
this.copy(child = newChild)
590591

591-
override def stringArgs: Iterator[Any] = Iterator(limit, child)
592+
override def stringArgs: Iterator[Any] = Iterator(limit, offset, child)
592593

593594
override def equals(obj: Any): Boolean = {
594595
obj match {
595596
case other: CometGlobalLimitExec =>
596597
this.output == other.output &&
597-
this.limit == other.limit && this.child == other.child &&
598+
this.limit == other.limit &&
599+
this.offset == other.offset &&
600+
this.child == other.child &&
598601
this.serializedPlanOpt == other.serializedPlanOpt
599602
case _ =>
600603
false
601604
}
602605
}
603606

604-
override def hashCode(): Int = Objects.hashCode(output, limit: java.lang.Integer, child)
607+
override def hashCode(): Int =
608+
Objects.hashCode(output, limit: java.lang.Integer, offset: java.lang.Integer, child)
605609
}
606610

607611
case class CometExpandExec(

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q1/explain.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ Arguments: [c_customer_id#28], [c_customer_id#28]
227227

228228
(40) CometTakeOrderedAndProject
229229
Input [1]: [c_customer_id#28]
230-
Arguments: TakeOrderedAndProject(limit=100, orderBy=[c_customer_id#28 ASC NULLS FIRST], output=[c_customer_id#28]), [c_customer_id#28], 100, [c_customer_id#28 ASC NULLS FIRST], [c_customer_id#28]
230+
Arguments: TakeOrderedAndProject(limit=100, orderBy=[c_customer_id#28 ASC NULLS FIRST], output=[c_customer_id#28]), [c_customer_id#28], 100, 0, [c_customer_id#28 ASC NULLS FIRST], [c_customer_id#28]
231231

232232
(41) CometColumnarToRow [codegen id : 1]
233233
Input [1]: [c_customer_id#28]

0 commit comments

Comments
 (0)