Skip to content

Commit d50d7e5

Browse files
committed
adding CoalesceBatches
1 parent 801760e commit d50d7e5

File tree

2 files changed

+9
-6
lines changed

2 files changed

+9
-6
lines changed

native/core/src/execution/planner.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1187,7 +1187,10 @@ impl PhysicalPlanner {
11871187
if join.filter.is_some() {
11881188
// SMJ with join filter produces lots of tiny batches
11891189
let coalesce_batches: Arc<dyn ExecutionPlan> =
1190-
Arc::new(CoalesceBatchesExec::new(join.clone(), 8192));
1190+
Arc::new(CoalesceBatchesExec::new(
1191+
Arc::clone(&join),
1192+
self.session_ctx.state().config_options().batch_size(),
1193+
));
11911194
Ok((
11921195
scans,
11931196
Arc::new(SparkPlan::new_with_additional(

spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,14 @@ package org.apache.comet.exec
2121

2222
import org.scalactic.source.Position
2323
import org.scalatest.Tag
24-
2524
import org.apache.spark.sql.CometTestBase
2625
import org.apache.spark.sql.catalyst.TableIdentifier
2726
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2827
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec}
2928
import org.apache.spark.sql.internal.SQLConf
3029
import org.apache.spark.sql.types.Decimal
31-
3230
import org.apache.comet.CometConf
31+
import org.apache.comet.CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED
3332
import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
3433

3534
class CometJoinSuite extends CometTestBase {
@@ -294,8 +293,10 @@ class CometJoinSuite extends CometTestBase {
294293

295294
test("SortMergeJoin without join filter") {
296295
withSQLConf(
296+
CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true",
297297
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
298-
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
298+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1"
299+
) {
299300
withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") {
300301
withParquetTable((0 until 10).map(i => (i % 10, i + 2)), "tbl_b") {
301302
val df1 = sql("SELECT * FROM tbl_a JOIN tbl_b ON tbl_a._2 = tbl_b._1")
@@ -338,8 +339,7 @@ class CometJoinSuite extends CometTestBase {
338339
}
339340
}
340341

341-
// https://github.com/apache/datafusion-comet/issues/398
342-
ignore("SortMergeJoin with join filter") {
342+
test("SortMergeJoin with join filter") {
343343
withSQLConf(
344344
CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true",
345345
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",

0 commit comments

Comments
 (0)