Skip to content

Commit c1760da

Browse files
dilipbiswalgatorsmile
authored andcommitted
[SPARK-25025][SQL] Remove the default value of isAll in INTERSECT/EXCEPT
## What changes were proposed in this pull request? Having the default value of isAll in the logical plan nodes INTERSECT/EXCEPT could introduce bugs when the callers are not aware of it. This PR removes the default value and makes caller explicitly specify them. ## How was this patch tested? This is a refactoring change. Existing tests test the functionality already. Author: Dilip Biswal <[email protected]> Closes apache#22000 from dilipbiswal/SPARK-25025.
1 parent d063e3a commit c1760da

File tree

12 files changed

+60
-49
lines changed

12 files changed

+60
-49
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,10 +356,10 @@ package object dsl {
356356

357357
def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan)
358358

359-
def except(otherPlan: LogicalPlan, isAll: Boolean = false): LogicalPlan =
359+
def except(otherPlan: LogicalPlan, isAll: Boolean): LogicalPlan =
360360
Except(logicalPlan, otherPlan, isAll)
361361

362-
def intersect(otherPlan: LogicalPlan, isAll: Boolean = false): LogicalPlan =
362+
def intersect(otherPlan: LogicalPlan, isAll: Boolean): LogicalPlan =
363363
Intersect(logicalPlan, otherPlan, isAll)
364364

365365
def union(otherPlan: LogicalPlan): LogicalPlan = Union(logicalPlan, otherPlan)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -534,15 +534,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
534534
case SqlBaseParser.INTERSECT if all =>
535535
Intersect(left, right, isAll = true)
536536
case SqlBaseParser.INTERSECT =>
537-
Intersect(left, right)
537+
Intersect(left, right, isAll = false)
538538
case SqlBaseParser.EXCEPT if all =>
539539
Except(left, right, isAll = true)
540540
case SqlBaseParser.EXCEPT =>
541-
Except(left, right)
541+
Except(left, right, isAll = false)
542542
case SqlBaseParser.SETMINUS if all =>
543543
Except(left, right, isAll = true)
544544
case SqlBaseParser.SETMINUS =>
545-
Except(left, right)
545+
Except(left, right, isAll = false)
546546
}
547547
}
548548

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ object SetOperation {
167167
case class Intersect(
168168
left: LogicalPlan,
169169
right: LogicalPlan,
170-
isAll: Boolean = false) extends SetOperation(left, right) {
170+
isAll: Boolean) extends SetOperation(left, right) {
171171

172172
override def nodeName: String = getClass.getSimpleName + ( if ( isAll ) "All" else "" )
173173

@@ -191,7 +191,7 @@ case class Intersect(
191191
case class Except(
192192
left: LogicalPlan,
193193
right: LogicalPlan,
194-
isAll: Boolean = false) extends SetOperation(left, right) {
194+
isAll: Boolean) extends SetOperation(left, right) {
195195
override def nodeName: String = getClass.getSimpleName + ( if ( isAll ) "All" else "" )
196196
/** We don't use right.output because those rows get excluded from the set. */
197197
override def output: Seq[Attribute] = left.output

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,13 @@ class AnalysisErrorSuite extends AnalysisTest {
277277

278278
errorTest(
279279
"intersect with unequal number of columns",
280-
testRelation.intersect(testRelation2),
280+
testRelation.intersect(testRelation2, isAll = false),
281281
"intersect" :: "number of columns" :: testRelation2.output.length.toString ::
282282
testRelation.output.length.toString :: Nil)
283283

284284
errorTest(
285285
"except with unequal number of columns",
286-
testRelation.except(testRelation2),
286+
testRelation.except(testRelation2, isAll = false),
287287
"except" :: "number of columns" :: testRelation2.output.length.toString ::
288288
testRelation.output.length.toString :: Nil)
289289

@@ -299,22 +299,22 @@ class AnalysisErrorSuite extends AnalysisTest {
299299

300300
errorTest(
301301
"intersect with incompatible column types",
302-
testRelation.intersect(nestedRelation),
302+
testRelation.intersect(nestedRelation, isAll = false),
303303
"intersect" :: "the compatible column types" :: Nil)
304304

305305
errorTest(
306306
"intersect with a incompatible column type and compatible column types",
307-
testRelation3.intersect(testRelation4),
307+
testRelation3.intersect(testRelation4, isAll = false),
308308
"intersect" :: "the compatible column types" :: "map" :: "decimal" :: Nil)
309309

310310
errorTest(
311311
"except with incompatible column types",
312-
testRelation.except(nestedRelation),
312+
testRelation.except(nestedRelation, isAll = false),
313313
"except" :: "the compatible column types" :: Nil)
314314

315315
errorTest(
316316
"except with a incompatible column type and compatible column types",
317-
testRelation3.except(testRelation4),
317+
testRelation3.except(testRelation4, isAll = false),
318318
"except" :: "the compatible column types" :: "map" :: "decimal" :: Nil)
319319

320320
errorTest(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
273273
}
274274

275275
test("self intersect should resolve duplicate expression IDs") {
276-
val plan = testRelation.intersect(testRelation)
276+
val plan = testRelation.intersect(testRelation, isAll = false)
277277
assertAnalysisSuccess(plan)
278278
}
279279

@@ -439,8 +439,8 @@ class AnalysisSuite extends AnalysisTest with Matchers {
439439
val unionPlan = Union(firstTable, secondTable)
440440
assertAnalysisSuccess(unionPlan)
441441

442-
val r1 = Except(firstTable, secondTable)
443-
val r2 = Intersect(firstTable, secondTable)
442+
val r1 = Except(firstTable, secondTable, isAll = false)
443+
val r2 = Intersect(firstTable, secondTable, isAll = false)
444444

445445
assertAnalysisSuccess(r1)
446446
assertAnalysisSuccess(r2)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,8 +1223,10 @@ class TypeCoercionSuite extends AnalysisTest {
12231223

12241224
val expectedTypes = Seq(StringType, DecimalType.SYSTEM_DEFAULT, FloatType, DoubleType)
12251225

1226-
val r1 = widenSetOperationTypes(Except(firstTable, secondTable)).asInstanceOf[Except]
1227-
val r2 = widenSetOperationTypes(Intersect(firstTable, secondTable)).asInstanceOf[Intersect]
1226+
val r1 = widenSetOperationTypes(
1227+
Except(firstTable, secondTable, isAll = false)).asInstanceOf[Except]
1228+
val r2 = widenSetOperationTypes(
1229+
Intersect(firstTable, secondTable, isAll = false)).asInstanceOf[Intersect]
12281230
checkOutput(r1.left, expectedTypes)
12291231
checkOutput(r1.right, expectedTypes)
12301232
checkOutput(r2.left, expectedTypes)
@@ -1289,8 +1291,10 @@ class TypeCoercionSuite extends AnalysisTest {
12891291
val expectedType1 = Seq(DecimalType(10, 8))
12901292

12911293
val r1 = widenSetOperationTypes(Union(left1, right1)).asInstanceOf[Union]
1292-
val r2 = widenSetOperationTypes(Except(left1, right1)).asInstanceOf[Except]
1293-
val r3 = widenSetOperationTypes(Intersect(left1, right1)).asInstanceOf[Intersect]
1294+
val r2 = widenSetOperationTypes(
1295+
Except(left1, right1, isAll = false)).asInstanceOf[Except]
1296+
val r3 = widenSetOperationTypes(
1297+
Intersect(left1, right1, isAll = false)).asInstanceOf[Intersect]
12941298

12951299
checkOutput(r1.children.head, expectedType1)
12961300
checkOutput(r1.children.last, expectedType1)
@@ -1310,16 +1314,20 @@ class TypeCoercionSuite extends AnalysisTest {
13101314
AttributeReference("r", rType)())
13111315

13121316
val r1 = widenSetOperationTypes(Union(plan1, plan2)).asInstanceOf[Union]
1313-
val r2 = widenSetOperationTypes(Except(plan1, plan2)).asInstanceOf[Except]
1314-
val r3 = widenSetOperationTypes(Intersect(plan1, plan2)).asInstanceOf[Intersect]
1317+
val r2 = widenSetOperationTypes(
1318+
Except(plan1, plan2, isAll = false)).asInstanceOf[Except]
1319+
val r3 = widenSetOperationTypes(
1320+
Intersect(plan1, plan2, isAll = false)).asInstanceOf[Intersect]
13151321

13161322
checkOutput(r1.children.last, Seq(expectedType))
13171323
checkOutput(r2.right, Seq(expectedType))
13181324
checkOutput(r3.right, Seq(expectedType))
13191325

13201326
val r4 = widenSetOperationTypes(Union(plan2, plan1)).asInstanceOf[Union]
1321-
val r5 = widenSetOperationTypes(Except(plan2, plan1)).asInstanceOf[Except]
1322-
val r6 = widenSetOperationTypes(Intersect(plan2, plan1)).asInstanceOf[Intersect]
1327+
val r5 = widenSetOperationTypes(
1328+
Except(plan2, plan1, isAll = false)).asInstanceOf[Except]
1329+
val r6 = widenSetOperationTypes(
1330+
Intersect(plan2, plan1, isAll = false)).asInstanceOf[Intersect]
13231331

13241332
checkOutput(r4.children.last, Seq(expectedType))
13251333
checkOutput(r5.left, Seq(expectedType))

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -575,14 +575,14 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
575575
// Except: *-stream not supported
576576
testBinaryOperationInStreamingPlan(
577577
"except",
578-
_.except(_),
578+
_.except(_, isAll = false),
579579
streamStreamSupported = false,
580580
batchStreamSupported = false)
581581

582582
// Intersect: stream-stream not supported
583583
testBinaryOperationInStreamingPlan(
584584
"intersect",
585-
_.intersect(_),
585+
_.intersect(_, isAll = false),
586586
streamStreamSupported = false)
587587

588588
// Sort: supported only on batch subplans and after aggregation on streaming plan + complete mode

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,10 @@ class ColumnPruningSuite extends PlanTest {
180180

181181
test("Column pruning on except/intersect/distinct") {
182182
val input = LocalRelation('a.int, 'b.string, 'c.double)
183-
val query = Project('a :: Nil, Except(input, input)).analyze
183+
val query = Project('a :: Nil, Except(input, input, isAll = false)).analyze
184184
comparePlans(Optimize.execute(query), query)
185185

186-
val query2 = Project('a :: Nil, Intersect(input, input)).analyze
186+
val query2 = Project('a :: Nil, Intersect(input, input, isAll = false)).analyze
187187
comparePlans(Optimize.execute(query2), query2)
188188
val query3 = Project('a :: Nil, Distinct(input)).analyze
189189
comparePlans(Optimize.execute(query3), query3)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class ReplaceOperatorSuite extends PlanTest {
4242
val table1 = LocalRelation('a.int, 'b.int)
4343
val table2 = LocalRelation('c.int, 'd.int)
4444

45-
val query = Intersect(table1, table2)
45+
val query = Intersect(table1, table2, isAll = false)
4646
val optimized = Optimize.execute(query.analyze)
4747

4848
val correctAnswer =
@@ -60,7 +60,7 @@ class ReplaceOperatorSuite extends PlanTest {
6060
val table2 = Filter(attributeB === 2, Filter(attributeA === 1, table1))
6161
val table3 = Filter(attributeB < 1, Filter(attributeA >= 2, table1))
6262

63-
val query = Except(table2, table3)
63+
val query = Except(table2, table3, isAll = false)
6464
val optimized = Optimize.execute(query.analyze)
6565

6666
val correctAnswer =
@@ -79,7 +79,7 @@ class ReplaceOperatorSuite extends PlanTest {
7979
val table1 = LocalRelation.fromExternalRows(Seq(attributeA, attributeB), data = Seq(Row(1, 2)))
8080
val table2 = Filter(attributeB < 1, Filter(attributeA >= 2, table1))
8181

82-
val query = Except(table1, table2)
82+
val query = Except(table1, table2, isAll = false)
8383
val optimized = Optimize.execute(query.analyze)
8484

8585
val correctAnswer =
@@ -99,7 +99,7 @@ class ReplaceOperatorSuite extends PlanTest {
9999
val table3 = Project(Seq(attributeA, attributeB),
100100
Filter(attributeB < 1, Filter(attributeA >= 2, table1)))
101101

102-
val query = Except(table2, table3)
102+
val query = Except(table2, table3, isAll = false)
103103
val optimized = Optimize.execute(query.analyze)
104104

105105
val correctAnswer =
@@ -120,7 +120,7 @@ class ReplaceOperatorSuite extends PlanTest {
120120
val table3 = Project(Seq(attributeA, attributeB),
121121
Filter(attributeB < 1, Filter(attributeA >= 2, table1)))
122122

123-
val query = Except(table2, table3)
123+
val query = Except(table2, table3, isAll = false)
124124
val optimized = Optimize.execute(query.analyze)
125125

126126
val correctAnswer =
@@ -141,7 +141,7 @@ class ReplaceOperatorSuite extends PlanTest {
141141
Filter(attributeB < 1, Filter(attributeA >= 2, table1)))
142142
val table3 = Filter(attributeB === 2, Filter(attributeA === 1, table1))
143143

144-
val query = Except(table2, table3)
144+
val query = Except(table2, table3, isAll = false)
145145
val optimized = Optimize.execute(query.analyze)
146146

147147
val correctAnswer =
@@ -158,7 +158,7 @@ class ReplaceOperatorSuite extends PlanTest {
158158
val table1 = LocalRelation('a.int, 'b.int)
159159
val table2 = LocalRelation('c.int, 'd.int)
160160

161-
val query = Except(table1, table2)
161+
val query = Except(table1, table2, isAll = false)
162162
val optimized = Optimize.execute(query.analyze)
163163

164164
val correctAnswer =
@@ -173,7 +173,7 @@ class ReplaceOperatorSuite extends PlanTest {
173173
val left = table.where('b < 1).select('a).as("left")
174174
val right = table.where('b < 3).select('a).as("right")
175175

176-
val query = Except(left, right)
176+
val query = Except(left, right, isAll = false)
177177
val optimized = Optimize.execute(query.analyze)
178178

179179
val correctAnswer =

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,15 @@ class PlanParserSuite extends AnalysisTest {
6565
assertEqual("select * from a union select * from b", Distinct(a.union(b)))
6666
assertEqual("select * from a union distinct select * from b", Distinct(a.union(b)))
6767
assertEqual("select * from a union all select * from b", a.union(b))
68-
assertEqual("select * from a except select * from b", a.except(b))
69-
assertEqual("select * from a except distinct select * from b", a.except(b))
68+
assertEqual("select * from a except select * from b", a.except(b, isAll = false))
69+
assertEqual("select * from a except distinct select * from b", a.except(b, isAll = false))
7070
assertEqual("select * from a except all select * from b", a.except(b, isAll = true))
71-
assertEqual("select * from a minus select * from b", a.except(b))
71+
assertEqual("select * from a minus select * from b", a.except(b, isAll = false))
7272
assertEqual("select * from a minus all select * from b", a.except(b, isAll = true))
73-
assertEqual("select * from a minus distinct select * from b", a.except(b))
74-
assertEqual("select * from a intersect select * from b", a.intersect(b))
75-
assertEqual("select * from a intersect distinct select * from b", a.intersect(b))
73+
assertEqual("select * from a minus distinct select * from b", a.except(b, isAll = false))
74+
assertEqual("select * from a " +
75+
"intersect select * from b", a.intersect(b, isAll = false))
76+
assertEqual("select * from a intersect distinct select * from b", a.intersect(b, isAll = false))
7677
assertEqual("select * from a intersect all select * from b", a.intersect(b, isAll = true))
7778
}
7879

@@ -735,18 +736,20 @@ class PlanParserSuite extends AnalysisTest {
735736
|SELECT * FROM d
736737
""".stripMargin
737738

738-
assertEqual(query1, Distinct(a.union(b)).except(c.intersect(d)))
739+
assertEqual(query1, Distinct(a.union(b)).except(c.intersect(d, isAll = false), isAll = false))
739740
assertEqual(query2, Distinct(a.union(b)).except(c.intersect(d, isAll = true), isAll = true))
740741

741742
// Now disable precedence enforcement to verify the old behaviour.
742743
withSQLConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED.key -> "true") {
743-
assertEqual(query1, Distinct(a.union(b)).except(c).intersect(d))
744+
assertEqual(query1,
745+
Distinct(a.union(b)).except(c, isAll = false).intersect(d, isAll = false))
744746
assertEqual(query2, Distinct(a.union(b)).except(c, isAll = true).intersect(d, isAll = true))
745747
}
746748

747749
// Explicitly enable the precedence enforcement
748750
withSQLConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED.key -> "false") {
749-
assertEqual(query1, Distinct(a.union(b)).except(c.intersect(d)))
751+
assertEqual(query1,
752+
Distinct(a.union(b)).except(c.intersect(d, isAll = false), isAll = false))
750753
assertEqual(query2, Distinct(a.union(b)).except(c.intersect(d, isAll = true), isAll = true))
751754
}
752755
}

0 commit comments

Comments
 (0)