File tree Expand file tree Collapse file tree 1 file changed +22
-0
lines changed
spark/src/test/scala/org/apache/spark/sql Expand file tree Collapse file tree 1 file changed +22
-0
lines changed Original file line number Diff line number Diff line change @@ -385,6 +385,8 @@ abstract class CometTestBase
385385 s " plan: ${new ExtendedExplainInfo ().generateExtendedInfo(plan)}" )
386386 case _ =>
387387 }
388+
389+ checkPlanNotMissingInput(plan)
388390 }
389391
390392 protected def findFirstNonCometOperator (
@@ -406,6 +408,26 @@ abstract class CometTestBase
406408 None
407409 }
408410
411+ // checks the plan node has no missing inputs
412+ // such nodes represented in plan with exclamation mark !
413+ // example: !CometWindowExec
414+ private def checkPlanNotMissingInput (plan : SparkPlan ): Unit = {
415+ def hasMissingInput (node : SparkPlan ): Boolean = {
416+ node.missingInput.nonEmpty && node.children.nonEmpty
417+ }
418+
419+ val isCometNode = plan.nodeName.startsWith(" Comet" )
420+
421+ if (isCometNode && hasMissingInput(plan)) {
422+ assert(false , s " Plan ${plan.nodeName} has invalid missingInput " )
423+ }
424+
425+ // Otherwise recursively check children
426+ plan.children.foreach { child =>
427+ checkPlanNotMissingInput(child)
428+ }
429+ }
430+
409431 private def checkPlanContains (plan : SparkPlan , includePlans : Class [_]* ): Unit = {
410432 includePlans.foreach { case planClass =>
411433 if (plan.find(op => planClass.isAssignableFrom(op.getClass)).isEmpty) {
You can’t perform that action at this time.
0 commit comments