Skip to content

Commit eb197ca

Browse files
authored
feat: Add config option to log fallback reasons (#2154)
1 parent f31cf78 commit eb197ca

File tree

5 files changed

+53
-41
lines changed

5 files changed

+53
-41
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,12 @@ object CometConf extends ShimCometConf {
482482
.booleanConf
483483
.createWithDefault(false)
484484

485+
val COMET_LOG_FALLBACK_REASONS: ConfigEntry[Boolean] =
486+
conf("spark.comet.logFallbackReasons.enabled")
487+
.doc("When this setting is enabled, Comet will log warnings for all fallback reasons.")
488+
.booleanConf
489+
.createWithDefault(false)
490+
485491
val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
486492
conf("spark.comet.explainFallback.enabled")
487493
.doc(

docs/source/user-guide/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ Comet provides the following configuration settings.
6868
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
6969
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
7070
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
71+
| spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |
7172
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional memory for Comet when running Spark in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 0.2 |
7273
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 402653184b |
7374
| spark.comet.memoryOverhead | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. This config is optional. If this is not specified, it will be set to `spark.comet.memory.overhead.factor` * `spark.executor.memory`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | |

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,11 @@ object CometSparkSessionExtensions extends Logging {
366366
* The node with information (if any) attached
367367
*/
368368
def withInfos[T <: TreeNode[_]](node: T, info: Set[String], exprs: T*): T = {
369+
if (CometConf.COMET_LOG_FALLBACK_REASONS.get()) {
370+
for (reason <- info) {
371+
logWarning(s"Comet cannot accelerate ${node.getClass.getSimpleName} because: $reason")
372+
}
373+
}
369374
val existingNodeInfos = node.getTagValue(CometExplainInfo.EXTENSION_INFO)
370375
val newNodeInfo = (existingNodeInfos ++ exprs
371376
.flatMap(_.getTagValue(CometExplainInfo.EXTENSION_INFO))).flatten.toSet

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -789,7 +789,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
789789
}
790790

791791
if (!isCometPlan(s.child)) {
792-
withInfo(s, "Child {s.child.getClass.getName} is not native")
792+
withInfo(s, s"Child ${s.child.getClass.getName} is not native")
793793
return false
794794
}
795795

@@ -841,12 +841,12 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
841841
}
842842

843843
if (isShuffleOperator(s.child)) {
844-
withInfo(s, "Child {s.child.getClass.getName} is a shuffle operator")
844+
withInfo(s, s"Child ${s.child.getClass.getName} is a shuffle operator")
845845
return false
846846
}
847847

848848
if (!(!s.child.supportsColumnar || isCometPlan(s.child))) {
849-
withInfo(s, "Child {s.child.getClass.getName} is a neither row-based or a Comet operator")
849+
withInfo(s, s"Child ${s.child.getClass.getName} is a neither row-based or a Comet operator")
850850
return false
851851
}
852852

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -192,10 +192,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
192192
classOf[Corr] -> CometCorr,
193193
classOf[BloomFilterAggregate] -> CometBloomFilterAggregate)
194194

195-
def emitWarning(reason: String): Unit = {
196-
logWarning(s"Comet native execution is disabled due to: $reason")
197-
}
198-
199195
def supportedDataType(dt: DataType, allowComplex: Boolean = false): Boolean = dt match {
200196
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
201197
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType |
@@ -207,8 +203,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
207203
supportedDataType(a.elementType, allowComplex)
208204
case m: MapType if allowComplex =>
209205
supportedDataType(m.keyType, allowComplex) && supportedDataType(m.valueType, allowComplex)
210-
case dt =>
211-
emitWarning(s"unsupported Spark data type: $dt")
206+
case _ =>
212207
false
213208
}
214209

@@ -237,7 +232,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
237232
case _: MapType => 15
238233
case _: StructType => 16
239234
case dt =>
240-
emitWarning(s"Cannot serialize Spark data type: $dt")
235+
logWarning(s"Cannot serialize Spark data type: $dt")
241236
return None
242237
}
243238

@@ -503,9 +498,10 @@ object QueryPlanSerde extends Logging with CometExprShim {
503498
case Some(handler) =>
504499
handler.convert(aggExpr, fn, inputs, binding, conf)
505500
case _ =>
506-
val msg = s"unsupported Spark aggregate function: ${fn.prettyName}"
507-
emitWarning(msg)
508-
withInfo(aggExpr, msg, fn.children: _*)
501+
withInfo(
502+
aggExpr,
503+
s"unsupported Spark aggregate function: ${fn.prettyName}",
504+
fn.children: _*)
509505
None
510506
}
511507
}
@@ -1365,18 +1361,23 @@ object QueryPlanSerde extends Logging with CometExprShim {
13651361
ExprOuterClass.Expr.newBuilder().setNormalizeNanAndZero(builder).build()
13661362
}
13671363

1368-
case s @ execution.ScalarSubquery(_, _) if supportedDataType(s.dataType) =>
1369-
val dataType = serializeDataType(s.dataType)
1370-
if (dataType.isEmpty) {
1371-
withInfo(s, s"Scalar subquery returns unsupported datatype ${s.dataType}")
1372-
return None
1373-
}
1364+
case s @ execution.ScalarSubquery(_, _) =>
1365+
if (supportedDataType(s.dataType)) {
1366+
val dataType = serializeDataType(s.dataType)
1367+
if (dataType.isEmpty) {
1368+
withInfo(s, s"Scalar subquery returns unsupported datatype ${s.dataType}")
1369+
return None
1370+
}
13741371

1375-
val builder = ExprOuterClass.Subquery
1376-
.newBuilder()
1377-
.setId(s.exprId.id)
1378-
.setDatatype(dataType.get)
1379-
Some(ExprOuterClass.Expr.newBuilder().setSubquery(builder).build())
1372+
val builder = ExprOuterClass.Subquery
1373+
.newBuilder()
1374+
.setId(s.exprId.id)
1375+
.setDatatype(dataType.get)
1376+
Some(ExprOuterClass.Expr.newBuilder().setSubquery(builder).build())
1377+
} else {
1378+
withInfo(s, s"Unsupported data type: ${s.dataType}")
1379+
None
1380+
}
13801381

13811382
case UnscaledValue(child) =>
13821383
val childExpr = exprToProtoInternal(child, inputs, binding)
@@ -1777,10 +1778,9 @@ object QueryPlanSerde extends Logging with CometExprShim {
17771778

17781779
} else {
17791780
// There are unsupported scan type
1780-
val msg =
1781-
s"unsupported Comet operator: ${op.nodeName}, due to unsupported data types above"
1782-
emitWarning(msg)
1783-
withInfo(op, msg)
1781+
withInfo(
1782+
op,
1783+
s"unsupported Comet operator: ${op.nodeName}, due to unsupported data types above")
17841784
None
17851785
}
17861786

@@ -1952,9 +1952,10 @@ object QueryPlanSerde extends Logging with CometExprShim {
19521952
val attributes = groupingExpressions.map(_.toAttribute) ++ aggregateAttributes
19531953
val resultExprs = resultExpressions.map(exprToProto(_, attributes))
19541954
if (resultExprs.exists(_.isEmpty)) {
1955-
val msg = s"Unsupported result expressions found in: ${resultExpressions}"
1956-
emitWarning(msg)
1957-
withInfo(op, msg, resultExpressions: _*)
1955+
withInfo(
1956+
op,
1957+
s"Unsupported result expressions found in: $resultExpressions",
1958+
resultExpressions: _*)
19581959
return None
19591960
}
19601961
hashAggBuilder.addAllResultExprs(resultExprs.map(_.get).asJava)
@@ -1996,9 +1997,10 @@ object QueryPlanSerde extends Logging with CometExprShim {
19961997
val attributes = groupingExpressions.map(_.toAttribute) ++ aggregateAttributes
19971998
val resultExprs = resultExpressions.map(exprToProto(_, attributes))
19981999
if (resultExprs.exists(_.isEmpty)) {
1999-
val msg = s"Unsupported result expressions found in: ${resultExpressions}"
2000-
emitWarning(msg)
2001-
withInfo(op, msg, resultExpressions: _*)
2000+
withInfo(
2001+
op,
2002+
s"Unsupported result expressions found in: $resultExpressions",
2003+
resultExpressions: _*)
20022004
return None
20032005
}
20042006
hashAggBuilder.addAllResultExprs(resultExprs.map(_.get).asJava)
@@ -2172,6 +2174,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
21722174
op.output.forall(a => supportedDataType(a.dataType, allowComplex = true))
21732175

21742176
if (!supportedTypes) {
2177+
withInfo(op, "Unsupported data type")
21752178
return None
21762179
}
21772180

@@ -2197,10 +2200,9 @@ object QueryPlanSerde extends Logging with CometExprShim {
21972200
Some(builder.setScan(scanBuilder).build())
21982201
} else {
21992202
// There are unsupported scan type
2200-
val msg =
2201-
s"unsupported Comet operator: ${op.nodeName}, due to unsupported data types above"
2202-
emitWarning(msg)
2203-
withInfo(op, msg)
2203+
withInfo(
2204+
op,
2205+
s"unsupported Comet operator: ${op.nodeName}, due to unsupported data types above")
22042206
None
22052207
}
22062208

@@ -2222,9 +2224,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
22222224
// 1. it is not Spark shuffle operator, which is handled separately
22232225
// 2. it is not a Comet operator
22242226
if (!op.nodeName.contains("Comet") && !op.isInstanceOf[ShuffleExchangeExec]) {
2225-
val msg = s"unsupported Spark operator: ${op.nodeName}"
2226-
emitWarning(msg)
2227-
withInfo(op, msg)
2227+
withInfo(op, s"unsupported Spark operator: ${op.nodeName}")
22282228
}
22292229
None
22302230
}

0 commit comments

Comments
 (0)