From add1b5ac4505d34f3b5d58c20617ce08ec41ee3e Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sun, 31 Aug 2025 01:26:48 -0700 Subject: [PATCH 1/8] lazy_eval_coalesce --- .../apache/comet/CometExpressionSuite.scala | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index bf1733b3ff..ae1f5f6744 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.types._ import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { + import testImplicits._ override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit @@ -367,15 +368,15 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withParquetTable(data, "tbl") { checkSparkAnswerAndOperator("SELECT try_divide(_1, _2) FROM tbl") checkSparkAnswerAndOperator(""" - |SELECT - | try_divide(10, 0), - | try_divide(NULL, 5), - | try_divide(5, NULL), - | try_divide(-2147483648, -1), - | try_divide(-9223372036854775808, -1), - | try_divide(DECIMAL('9999999999999999999999999999'), 0.1) - | from tbl - |""".stripMargin) + |SELECT + | try_divide(10, 0), + | try_divide(NULL, 5), + | try_divide(5, NULL), + | try_divide(-2147483648, -1), + | try_divide(-9223372036854775808, -1), + | try_divide(DECIMAL('9999999999999999999999999999'), 0.1) + | from tbl + |""".stripMargin) } } @@ -384,13 +385,28 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withParquetTable(data, "tbl") { checkSparkAnswerAndOperator("SELECT try_divide(_1, _2) FROM tbl") checkSparkAnswerAndOperator(""" - |SELECT try_divide(-128, -1), - |try_divide(-32768, -1), - |try_divide(-2147483648, -1), - |try_divide(-9223372036854775808, -1), - |try_divide(CAST(99999 AS DECIMAL(5,0)), CAST(0.0001 AS DECIMAL(5,4))) - |from tbl - |""".stripMargin) + |SELECT try_divide(-128, -1), + |try_divide(-32768, -1), + |try_divide(-2147483648, -1), + |try_divide(-9223372036854775808, -1), + |try_divide(CAST(99999 AS DECIMAL(5,0)), CAST(0.0001 AS DECIMAL(5,4))) + |from tbl + |""".stripMargin) + } + } + + test("test coalesce lazy eval") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { + val data = Seq((100, 0)) + withParquetTable(data, "t1") { + val res = spark.sql(""" + |SELECT coalesce(_1 , 1/0) from t1; + | """.stripMargin) + + res.explain(true) + + checkSparkAnswer(res) + } } } From fa65c2c697f4ce40dff65ec4ec6d81377554e2a8 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sun, 31 Aug 2025 15:14:52 -0700 Subject: [PATCH 2/8] lazy_coalesce_fallback_case_statement --- .../apache/comet/CometExpressionSuite.scala | 42 ++++++++----------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index ae1f5f6744..7552a8e4de 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -44,7 +44,6 @@ import org.apache.spark.sql.types._ import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { - import testImplicits._ override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit @@ -368,15 +367,15 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withParquetTable(data, "tbl") { checkSparkAnswerAndOperator("SELECT try_divide(_1, _2) FROM tbl") checkSparkAnswerAndOperator(""" - |SELECT - | try_divide(10, 0), - | try_divide(NULL, 5), - | try_divide(5, NULL), - | try_divide(-2147483648, -1), - | try_divide(-9223372036854775808, -1), - | try_divide(DECIMAL('9999999999999999999999999999'), 0.1) - | from tbl - |""".stripMargin) + |SELECT + | try_divide(10, 0), + | try_divide(NULL, 5), + | try_divide(5, NULL), + | try_divide(-2147483648, -1), + | try_divide(-9223372036854775808, -1), + | try_divide(DECIMAL('9999999999999999999999999999'), 0.1) + | from tbl + |""".stripMargin) } } @@ -385,13 +384,13 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withParquetTable(data, "tbl") { checkSparkAnswerAndOperator("SELECT try_divide(_1, _2) FROM tbl") checkSparkAnswerAndOperator(""" - |SELECT try_divide(-128, -1), - |try_divide(-32768, -1), - |try_divide(-2147483648, -1), - |try_divide(-9223372036854775808, -1), - |try_divide(CAST(99999 AS DECIMAL(5,0)), CAST(0.0001 AS DECIMAL(5,4))) - |from tbl - |""".stripMargin) + |SELECT try_divide(-128, -1), + |try_divide(-32768, -1), + |try_divide(-2147483648, -1), + |try_divide(-9223372036854775808, -1), + |try_divide(CAST(99999 AS DECIMAL(5,0)), CAST(0.0001 AS DECIMAL(5,4))) + |from tbl + |""".stripMargin) } } @@ -400,15 +399,10 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { val data = Seq((100, 0)) withParquetTable(data, "t1") { val res = spark.sql(""" - |SELECT coalesce(_1 , 1/0) from t1; - | """.stripMargin) - - res.explain(true) - + |SELECT coalesce(_1 , 1/0) from t1; + | """.stripMargin) checkSparkAnswer(res) } - } - } test("dictionary arithmetic") { // TODO: test ANSI mode From d24005c6a9c382e0c7c165edd73dc6b67e7406ee Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sun, 31 Aug 2025 15:52:11 -0700 Subject: [PATCH 3/8] lazy_coalesce_fallback_case_statement --- .../scala/org/apache/comet/CometExpressionSuite.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 7552a8e4de..5391fed71f 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -398,11 +398,14 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { val data = Seq((100, 0)) withParquetTable(data, "t1") { - val res = spark.sql(""" - |SELECT coalesce(_1 , 1/0) from t1; - | """.stripMargin) + val res = spark.sql( + """ + |SELECT coalesce(_1 , 1/0) from t1; + | """.stripMargin) checkSparkAnswer(res) } + } + } test("dictionary arithmetic") { // TODO: test ANSI mode From 584380719b1977b11fdc0d2ecab0d125b1275c01 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sun, 31 Aug 2025 16:57:22 -0700 Subject: [PATCH 4/8] lazy_coalesce_fallback_case_statement --- .../src/test/scala/org/apache/comet/CometExpressionSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 5391fed71f..a07230d780 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -398,8 +398,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { val data = Seq((100, 0)) withParquetTable(data, "t1") { - val res = spark.sql( - """ + val res = spark.sql(""" |SELECT coalesce(_1 , 1/0) from t1; | """.stripMargin) checkSparkAnswer(res) From 297d934d2b0786bfa16d127ec1fbcd21e279da2b Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Tue, 2 Sep 2025 15:29:57 -0700 Subject: [PATCH 5/8] lazy_coalesce_fallback_case_statement --- .../apache/comet/serde/QueryPlanSerde.scala | 7 +-- .../org/apache/comet/serde/conditional.scala | 47 ++++++++++++++++++- .../apache/comet/CometExpressionSuite.scala | 10 ++-- 3 files changed, 54 insertions(+), 10 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index d4c3be1877..84ffd11daa 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -192,7 +192,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[Log2] -> CometLog2, classOf[Pow] -> CometScalarFunction[Pow]("pow"), classOf[If] -> CometIf, - classOf[CaseWhen] -> CometCaseWhen) + classOf[CaseWhen] -> CometCaseWhen, + classOf[Coalesce] -> CometCoalesce) /** * Mapping of Spark aggregate expression class to Comet expression handler. @@ -999,10 +1000,6 @@ object QueryPlanSerde extends Logging with CometExprShim { None } - case a @ Coalesce(_) => - val exprChildren = a.children.map(exprToProtoInternal(_, inputs, binding)) - scalarFunctionExprToProto("coalesce", exprChildren: _*) - // With Spark 3.4, CharVarcharCodegenUtils.readSidePadding gets called to pad spaces for // char types. // See https://github.com/apache/spark/pull/38151 diff --git a/spark/src/main/scala/org/apache/comet/serde/conditional.scala b/spark/src/main/scala/org/apache/comet/serde/conditional.scala index db86afc4cb..cac65219a3 100644 --- a/spark/src/main/scala/org/apache/comet/serde/conditional.scala +++ b/spark/src/main/scala/org/apache/comet/serde/conditional.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalyst.expressions.{Attribute, CaseWhen, Expression, If} +import org.apache.spark.sql.catalyst.expressions.{Attribute, CaseWhen, Coalesce, Expression, If, IsNotNull} import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal @@ -91,3 +91,48 @@ object CometCaseWhen extends CometExpressionSerde[CaseWhen] { } } } + +object CometCoalesce extends CometExpressionSerde[Coalesce] { + override def convert( + expr: Coalesce, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val branches = expr.children.dropRight(1).map { child => + (IsNotNull(child), child) + } + val elseValue = Some(expr.children.last) + var allBranches: Seq[Expression] = Seq() + val whenSeq = branches.map(elements => { + allBranches = allBranches :+ elements._1 + exprToProtoInternal(elements._1, inputs, binding) + }) + val thenSeq = branches.map(elements => { + allBranches = allBranches :+ elements._2 + exprToProtoInternal(elements._2, inputs, binding) + }) + assert(whenSeq.length == thenSeq.length) + if (whenSeq.forall(_.isDefined) && thenSeq.forall(_.isDefined)) { + val builder = ExprOuterClass.CaseWhen.newBuilder() + builder.addAllWhen(whenSeq.map(_.get).asJava) + builder.addAllThen(thenSeq.map(_.get).asJava) + if (elseValue.isDefined) { + val elseValueExpr = + exprToProtoInternal(elseValue.get, inputs, binding) + if (elseValueExpr.isDefined) { + builder.setElseExpr(elseValueExpr.get) + } else { + withInfo(expr, elseValue.get) + return None + } + } + Some( + ExprOuterClass.Expr + .newBuilder() + .setCaseWhen(builder) + .build()) + } else { + withInfo(expr, allBranches: _*) + None + } + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index a07230d780..bf0e55b7e4 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -395,13 +395,15 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("test coalesce lazy eval") { - withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { - val data = Seq((100, 0)) + withSQLConf( + SQLConf.ANSI_ENABLED.key -> "true", + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + val data = Seq((9999999999999L, 0)) withParquetTable(data, "t1") { val res = spark.sql(""" - |SELECT coalesce(_1 , 1/0) from t1; + |SELECT coalesce(_1, CAST(_1 AS TINYINT)) from t1; | """.stripMargin) - checkSparkAnswer(res) + checkSparkAnswerAndOperator(res) } } } From 8b1dff6b8ac8f8ba51d7ebb6bb10897384cf0236 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Wed, 3 Sep 2025 09:52:59 -0700 Subject: [PATCH 6/8] address_review_comments --- .../org/apache/comet/serde/conditional.scala | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/conditional.scala b/spark/src/main/scala/org/apache/comet/serde/conditional.scala index cac65219a3..86bf9c89e8 100644 --- a/spark/src/main/scala/org/apache/comet/serde/conditional.scala +++ b/spark/src/main/scala/org/apache/comet/serde/conditional.scala @@ -100,14 +100,11 @@ object CometCoalesce extends CometExpressionSerde[Coalesce] { val branches = expr.children.dropRight(1).map { child => (IsNotNull(child), child) } - val elseValue = Some(expr.children.last) - var allBranches: Seq[Expression] = Seq() + val elseValue = expr.children.last val whenSeq = branches.map(elements => { - allBranches = allBranches :+ elements._1 exprToProtoInternal(elements._1, inputs, binding) }) val thenSeq = branches.map(elements => { - allBranches = allBranches :+ elements._2 exprToProtoInternal(elements._2, inputs, binding) }) assert(whenSeq.length == thenSeq.length) @@ -115,23 +112,20 @@ object CometCoalesce extends CometExpressionSerde[Coalesce] { val builder = ExprOuterClass.CaseWhen.newBuilder() builder.addAllWhen(whenSeq.map(_.get).asJava) builder.addAllThen(thenSeq.map(_.get).asJava) - if (elseValue.isDefined) { - val elseValueExpr = - exprToProtoInternal(elseValue.get, inputs, binding) - if (elseValueExpr.isDefined) { + val elseValueExpr = exprToProtoInternal(elseValue, inputs, binding) + if (elseValueExpr.isDefined) { builder.setElseExpr(elseValueExpr.get) - } else { - withInfo(expr, elseValue.get) + } else { + withInfo(expr, elseValue) return None } - } Some( ExprOuterClass.Expr .newBuilder() .setCaseWhen(builder) .build()) } else { - withInfo(expr, allBranches: _*) + withInfo(expr, branches: _*) None } } From 6a8a51197ec5d2886cba86a94f7a677038f94499 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Wed, 3 Sep 2025 11:32:08 -0700 Subject: [PATCH 7/8] address_review_comments --- .../scala/org/apache/comet/serde/conditional.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/conditional.scala b/spark/src/main/scala/org/apache/comet/serde/conditional.scala index 86bf9c89e8..e4f76c101e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/conditional.scala +++ b/spark/src/main/scala/org/apache/comet/serde/conditional.scala @@ -114,18 +114,18 @@ object CometCoalesce extends CometExpressionSerde[Coalesce] { builder.addAllThen(thenSeq.map(_.get).asJava) val elseValueExpr = exprToProtoInternal(elseValue, inputs, binding) if (elseValueExpr.isDefined) { - builder.setElseExpr(elseValueExpr.get) + builder.setElseExpr(elseValueExpr.get) } else { - withInfo(expr, elseValue) - return None - } + withInfo(expr, elseValue) + return None + } Some( ExprOuterClass.Expr .newBuilder() .setCaseWhen(builder) .build()) } else { - withInfo(expr, branches: _*) + withInfo(expr, branches.map(_._2): _*) None } } From 206432aa4353c28942b530a95f01a4bc4ea5de9d Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Wed, 3 Sep 2025 12:18:12 -0700 Subject: [PATCH 8/8] address_review_comments --- .../src/test/scala/org/apache/comet/CometExpressionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index bf0e55b7e4..51bc709078 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -397,7 +397,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("test coalesce lazy eval") { withSQLConf( SQLConf.ANSI_ENABLED.key -> "true", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val data = Seq((9999999999999L, 0)) withParquetTable(data, "t1") { val res = spark.sql("""