1818package org .apache .spark .sql .catalyst .normalizer
1919
2020import java .util .HashMap
21+ import java .util .concurrent .atomic .AtomicLong
2122
2223import org .apache .spark .sql .catalyst .plans .logical .{CacheTableAsSelect , CTERelationRef , LogicalPlan , UnionLoop , UnionLoopRef , WithCTE }
2324import org .apache .spark .sql .catalyst .rules .Rule
@@ -26,31 +27,26 @@ object NormalizeCTEIds extends Rule[LogicalPlan] {
2627 override def apply (plan : LogicalPlan ): LogicalPlan = {
2728 val curId = new java.util.concurrent.atomic.AtomicLong ()
2829 val defIdToNewId = new HashMap [Long , Long ]()
29-
30- plan transformDownWithSubqueries {
31- case withCTE : WithCTE =>
32- withCTE.cteDefs.foreach { cteDef =>
33- if (! defIdToNewId.containsKey(cteDef.id)) {
34- defIdToNewId.put(cteDef.id, curId.getAndIncrement())
35- }
36- }
37- withCTE
38- }
39-
40- applyInternal(plan, defIdToNewId)
30+ applyInternal(plan, curId, defIdToNewId)
4131 }
4232
43- private def applyInternal (plan : LogicalPlan , defIdToNewId : HashMap [Long , Long ]): LogicalPlan = {
33+ private def applyInternal (
34+ plan : LogicalPlan ,
35+ curId : AtomicLong ,
36+ defIdToNewId : HashMap [Long , Long ]): LogicalPlan = {
4437 plan transformDownWithSubqueries {
4538 case ctas @ CacheTableAsSelect (_, plan, _, _, _, _, _) =>
46- ctas.copy(plan = applyInternal(plan, defIdToNewId))
39+ ctas.copy(plan = applyInternal(plan, curId, defIdToNewId))
4740
4841 case withCTE @ WithCTE (plan, cteDefs) =>
49- val normalizedPlan = canonicalizeCTE(plan, defIdToNewId)
5042 val newCteDefs = cteDefs.map { cteDef =>
43+ if (! defIdToNewId.containsKey(cteDef.id)) {
44+ defIdToNewId.put(cteDef.id, curId.getAndIncrement())
45+ }
5146 val normalizedCteDef = canonicalizeCTE(cteDef.child, defIdToNewId)
5247 cteDef.copy(child = normalizedCteDef, id = defIdToNewId.get(cteDef.id))
5348 }
49+ val normalizedPlan = canonicalizeCTE(plan, defIdToNewId)
5450 withCTE.copy(plan = normalizedPlan, cteDefs = newCteDefs)
5551 }
5652 }
0 commit comments