Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit e430915

Browse files
committed
[SPARK-18870] Disallowed Distinct Aggregations on Streaming Datasets
## What changes were proposed in this pull request? Check whether Aggregation operators on a streaming subplan have aggregate expressions with isDistinct = true. ## How was this patch tested? Added unit test Author: Tathagata Das <[email protected]> Closes apache#16289 from tdas/SPARK-18870. (cherry picked from commit 4f7292c) Signed-off-by: Tathagata Das <[email protected]>
1 parent 2a8de2e commit e430915

File tree

2 files changed

+26
-2
lines changed

2 files changed

+26
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.sql.{AnalysisException, InternalOutputModes}
2121
import org.apache.spark.sql.catalyst.expressions.Attribute
22+
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2223
import org.apache.spark.sql.catalyst.plans._
2324
import org.apache.spark.sql.catalyst.plans.logical._
2425
import org.apache.spark.sql.streaming.OutputMode
@@ -95,6 +96,16 @@ object UnsupportedOperationChecker {
9596
// Operations that cannot exists anywhere in a streaming plan
9697
subPlan match {
9798

99+
case Aggregate(_, aggregateExpressions, child) =>
100+
val distinctAggExprs = aggregateExpressions.flatMap { expr =>
101+
expr.collect { case ae: AggregateExpression if ae.isDistinct => ae }
102+
}
103+
throwErrorIf(
104+
child.isStreaming && distinctAggExprs.nonEmpty,
105+
"Distinct aggregations are not supported on streaming DataFrames/Datasets, unless " +
106+
"it is on aggregated DataFrame/Dataset in Complete output mode. Consider using " +
107+
"approximate distinct aggregation (e.g. approx_count_distinct() instead of count()).")
108+
98109
case _: Command =>
99110
throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
100111
"streaming DataFrames/Datasets")
@@ -143,7 +154,7 @@ object UnsupportedOperationChecker {
143154
throwError("Union between streaming and batch DataFrames/Datasets is not supported")
144155

145156
case Except(left, right) if right.isStreaming =>
146-
throwError("Except with a streaming DataFrame/Dataset on the right is not supported")
157+
throwError("Except on a streaming DataFrame/Dataset on the right is not supported")
147158

148159
case Intersect(left, right) if left.isStreaming && right.isStreaming =>
149160
throwError("Intersect between two streaming DataFrames/Datasets is not supported")
@@ -156,7 +167,7 @@ object UnsupportedOperationChecker {
156167

157168
case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) =>
158169
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" +
159-
"aggregated DataFrame/Dataset in Complete mode")
170+
"aggregated DataFrame/Dataset in Complete output mode")
160171

161172
case Sample(_, _, _, _, child) if child.isStreaming =>
162173
throwError("Sampling is not supported on streaming DataFrames/Datasets")

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,19 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
9898
outputMode = Update,
9999
expectedMsgs = Seq("multiple streaming aggregations"))
100100

101+
// Aggregation: Distinct aggregates not supported on streaming relation
102+
val distinctAggExprs = Seq(Count("*").toAggregateExpression(isDistinct = true).as("c"))
103+
assertSupportedInStreamingPlan(
104+
"distinct aggregate - aggregate on batch relation",
105+
Aggregate(Nil, distinctAggExprs, batchRelation),
106+
outputMode = Append)
107+
108+
assertNotSupportedInStreamingPlan(
109+
"distinct aggregate - aggregate on streaming relation",
110+
Aggregate(Nil, distinctAggExprs, streamRelation),
111+
outputMode = Complete,
112+
expectedMsgs = Seq("distinct aggregation"))
113+
101114
// Inner joins: Stream-stream not supported
102115
testBinaryOperationInStreamingPlan(
103116
"inner join",

0 commit comments

Comments
 (0)