Skip to content

Commit d7b268a

Browse files
committed
[SPARK-29348][SQL] Add observable Metrics for Streaming queries
### What changes were proposed in this pull request? Observable metrics are named arbitrary aggregate functions that can be defined on a query (Dataframe). As soon as the execution of a Dataframe reaches a completion point (e.g. finishes batch query or reaches streaming epoch) a named event is emitted that contains the metrics for the data processed since the last completion point. A user can observe these metrics by attaching a listener to spark session, it depends on the execution mode which listener to attach: - Batch: `QueryExecutionListener`. This will be called when the query completes. A user can access the metrics by using the `QueryExecution.observedMetrics` map. - (Micro-batch) Streaming: `StreamingQueryListener`. This will be called when the streaming query completes an epoch. A user can access the metrics by using the `StreamingQueryProgress.observedMetrics` map. Please note that we currently do not support continuous execution streaming. ### Why are the changes needed? This enabled observable metrics. ### Does this PR introduce any user-facing change? Yes. It adds the `observe` method to `Dataset`. ### How was this patch tested? - Added unit tests for the `CollectMetrics` logical node to the `AnalysisSuite`. - Added unit tests for `StreamingProgress` JSON serialization to the `StreamingQueryStatusAndProgressSuite`. - Added integration tests for streaming to the `StreamingQueryListenerSuite`. - Added integration tests for batch to the `DataFrameCallbackSuite`. Closes apache#26127 from hvanhovell/SPARK-29348. Authored-by: herman <[email protected]> Signed-off-by: herman <[email protected]>
1 parent 075ae1e commit d7b268a

File tree

19 files changed

+586
-20
lines changed

19 files changed

+586
-20
lines changed

project/MimaExcludes.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,10 @@ object MimaExcludes {
458458
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.FetchFailed.this"),
459459

460460
// [SPARK-28957][SQL] Copy any "spark.hive.foo=bar" spark properties into hadoop conf as "hive.foo=bar"
461-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.appendS3AndSparkHadoopConfigurations")
461+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.appendS3AndSparkHadoopConfigurations"),
462+
463+
// [SPARK-29348] Add observable metrics.
464+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryProgress.this")
462465
)
463466

464467
// Exclude rules for 2.4.x

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2432,6 +2432,10 @@ class Analyzer(
24322432
nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
24332433
}.copy(child = newChild)
24342434

2435+
// Don't touch collect metrics. Top-level metrics are not supported (check analysis will fail)
2436+
// and we want to retain them inside the aggregate functions.
2437+
case m: CollectMetrics => m
2438+
24352439
// todo: It's hard to write a general rule to pull out nondeterministic expressions
24362440
// from LogicalPlan, currently we only do it for UnaryNode which has same output
24372441
// schema with its child.
@@ -2932,6 +2936,12 @@ object CleanupAliases extends Rule[LogicalPlan] {
29322936
Window(cleanedWindowExprs, partitionSpec.map(trimAliases),
29332937
orderSpec.map(trimAliases(_).asInstanceOf[SortOrder]), child)
29342938

2939+
case CollectMetrics(name, metrics, child) =>
2940+
val cleanedMetrics = metrics.map {
2941+
e => trimNonTopLevelAliases(e).asInstanceOf[NamedExpression]
2942+
}
2943+
CollectMetrics(name, cleanedMetrics, child)
2944+
29352945
// Operators that operate on objects should only have expressions from encoders, which should
29362946
// never have extra aliases.
29372947
case o: ObjectConsumer => o

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
1817
package org.apache.spark.sql.catalyst.analysis
1918

19+
import scala.collection.mutable
20+
2021
import org.apache.spark.sql.AnalysisException
2122
import org.apache.spark.sql.catalyst.expressions._
2223
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
@@ -280,6 +281,41 @@ trait CheckAnalysis extends PredicateHelper {
280281
groupingExprs.foreach(checkValidGroupingExprs)
281282
aggregateExprs.foreach(checkValidAggregateExpression)
282283

284+
case CollectMetrics(name, metrics, _) =>
285+
if (name == null || name.isEmpty) {
286+
operator.failAnalysis(s"observed metrics should be named: $operator")
287+
}
288+
// Check if an expression is a valid metric. A metric must meet the following criteria:
289+
// - Is not a window function;
290+
// - Is not nested aggregate function;
291+
// - Is not a distinct aggregate function;
292+
// - Has only non-deterministic functions that are nested inside an aggregate function;
293+
// - Has only attributes that are nested inside an aggregate function.
294+
def checkMetric(s: Expression, e: Expression, seenAggregate: Boolean = false): Unit = {
295+
e match {
296+
case _: WindowExpression =>
297+
e.failAnalysis(
298+
"window expressions are not allowed in observed metrics, but found: " + s.sql)
299+
case _ if !e.deterministic && !seenAggregate =>
300+
e.failAnalysis(s"non-deterministic expression ${s.sql} can only be used " +
301+
"as an argument to an aggregate function.")
302+
case a: AggregateExpression if seenAggregate =>
303+
e.failAnalysis(
304+
"nested aggregates are not allowed in observed metrics, but found: " + s.sql)
305+
case a: AggregateExpression if a.isDistinct =>
306+
e.failAnalysis(
307+
"distinct aggregates are not allowed in observed metrics, but found: " + s.sql)
308+
case _: Attribute if !seenAggregate =>
309+
e.failAnalysis (s"attribute ${s.sql} can only be used as an argument to an " +
310+
"aggregate function.")
311+
case _: AggregateExpression =>
312+
e.children.foreach(checkMetric (s, _, seenAggregate = true))
313+
case _ =>
314+
e.children.foreach(checkMetric (s, _, seenAggregate))
315+
}
316+
}
317+
metrics.foreach(m => checkMetric(m, m))
318+
283319
case Sort(orders, _, _) =>
284320
orders.foreach { order =>
285321
if (!RowOrdering.isOrderable(order.dataType)) {
@@ -534,6 +570,7 @@ trait CheckAnalysis extends PredicateHelper {
534570
case _ => // Analysis successful!
535571
}
536572
}
573+
checkCollectedMetrics(plan)
537574
extendedCheckRules.foreach(_(plan))
538575
plan.foreachUp {
539576
case o if !o.resolved =>
@@ -627,6 +664,38 @@ trait CheckAnalysis extends PredicateHelper {
627664
checkCorrelationsInSubquery(expr.plan)
628665
}
629666

667+
/**
668+
* Validate that collected metrics names are unique. The same name cannot be used for metrics
669+
* with different results. However multiple instances of metrics with with same result and name
670+
* are allowed (e.g. self-joins).
671+
*/
672+
private def checkCollectedMetrics(plan: LogicalPlan): Unit = {
673+
val metricsMap = mutable.Map.empty[String, LogicalPlan]
674+
def check(plan: LogicalPlan): Unit = plan.foreach { node =>
675+
node match {
676+
case metrics @ CollectMetrics(name, _, _) =>
677+
metricsMap.get(name) match {
678+
case Some(other) =>
679+
// Exact duplicates are allowed. They can be the result
680+
// of a CTE that is used multiple times or a self join.
681+
if (!metrics.sameResult(other)) {
682+
failAnalysis(
683+
s"Multiple definitions of observed metrics named '$name': $plan")
684+
}
685+
case None =>
686+
metricsMap.put(name, metrics)
687+
}
688+
case _ =>
689+
}
690+
node.expressions.foreach(_.foreach {
691+
case subquery: SubqueryExpression =>
692+
check(subquery.plan)
693+
case _ =>
694+
})
695+
}
696+
check(plan)
697+
}
698+
630699
/**
631700
* Validates to make sure the outer references appearing inside the subquery
632701
* are allowed.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ object PlanHelper {
4343
case e: WindowExpression
4444
if !plan.isInstanceOf[Window] => e
4545
case e: AggregateExpression
46-
if !(plan.isInstanceOf[Aggregate] || plan.isInstanceOf[Window]) => e
46+
if !(plan.isInstanceOf[Aggregate] ||
47+
plan.isInstanceOf[Window] ||
48+
plan.isInstanceOf[CollectMetrics]) => e
4749
case e: Generator
4850
if !plan.isInstanceOf[Generate] => e
4951
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,3 +971,25 @@ case class Deduplicate(
971971
* This is used to whitelist such commands in the subquery-related checks.
972972
*/
973973
trait SupportsSubquery extends LogicalPlan
974+
975+
/**
976+
* Collect arbitrary (named) metrics from a dataset. As soon as the query reaches a completion
977+
* point (batch query completes or streaming query epoch completes) an event is emitted on the
978+
* driver which can be observed by attaching a listener to the spark session. The metrics are named
979+
* so we can collect metrics at multiple places in a single dataset.
980+
*
981+
* This node behaves like a global aggregate. All the metrics collected must be aggregate functions
982+
* or be literals.
983+
*/
984+
case class CollectMetrics(
985+
name: String,
986+
metrics: Seq[NamedExpression],
987+
child: LogicalPlan)
988+
extends UnaryNode {
989+
990+
override lazy val resolved: Boolean = {
991+
name.nonEmpty && metrics.nonEmpty && metrics.forall(_.resolved) && childrenResolved
992+
}
993+
994+
override def output: Seq[Attribute] = child.output
995+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,12 @@ object SQLConf {
160160
confGetter.get()()
161161
}
162162
} else {
163-
confGetter.get()()
163+
val conf = existingConf.get()
164+
if (conf != null) {
165+
conf
166+
} else {
167+
confGetter.get()()
168+
}
164169
}
165170
}
166171
}

sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,14 @@ import java.util.Locale
2121

2222
import scala.util.control.NonFatal
2323

24+
import com.fasterxml.jackson.core.{JsonGenerator, JsonParser}
25+
import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonSerializer, SerializerProvider}
26+
import com.fasterxml.jackson.databind.`type`.TypeFactory
27+
import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize}
2428
import org.json4s._
2529
import org.json4s.JsonAST.JValue
2630
import org.json4s.JsonDSL._
31+
import org.json4s.jackson.{JValueDeserializer, JValueSerializer}
2732
import org.json4s.jackson.JsonMethods._
2833

2934
import org.apache.spark.annotation.Stable
@@ -40,7 +45,10 @@ import org.apache.spark.util.Utils
4045
*
4146
* @since 1.3.0
4247
*/
48+
4349
@Stable
50+
@JsonSerialize(using = classOf[DataTypeJsonSerializer])
51+
@JsonDeserialize(using = classOf[DataTypeJsonDeserializer])
4452
abstract class DataType extends AbstractDataType {
4553
/**
4654
* Enables matching against DataType for expressions:
@@ -475,3 +483,30 @@ object DataType {
475483
}
476484
}
477485
}
486+
487+
/**
488+
* Jackson serializer for [[DataType]]. Internally this delegates to json4s based serialization.
489+
*/
490+
class DataTypeJsonSerializer extends JsonSerializer[DataType] {
491+
private val delegate = new JValueSerializer
492+
override def serialize(
493+
value: DataType,
494+
gen: JsonGenerator,
495+
provider: SerializerProvider): Unit = {
496+
delegate.serialize(value.jsonValue, gen, provider)
497+
}
498+
}
499+
500+
/**
501+
* Jackson deserializer for [[DataType]]. Internally this delegates to json4s based deserialization.
502+
*/
503+
class DataTypeJsonDeserializer extends JsonDeserializer[DataType] {
504+
private val delegate = new JValueDeserializer(classOf[Any])
505+
506+
override def deserialize(
507+
jsonParser: JsonParser,
508+
deserializationContext: DeserializationContext): DataType = {
509+
val json = delegate.deserialize(jsonParser, deserializationContext)
510+
DataType.parseDataType(json.asInstanceOf[JValue])
511+
}
512+
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
2929
import org.apache.spark.sql.catalyst.dsl.expressions._
3030
import org.apache.spark.sql.catalyst.dsl.plans._
3131
import org.apache.spark.sql.catalyst.expressions._
32+
import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, Sum}
3233
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
3334
import org.apache.spark.sql.catalyst.plans.{Cross, Inner}
3435
import org.apache.spark.sql.catalyst.plans.logical._
35-
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning,
36-
RangePartitioning, RoundRobinPartitioning}
36+
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
3737
import org.apache.spark.sql.catalyst.rules.RuleExecutor
3838
import org.apache.spark.sql.catalyst.util._
3939
import org.apache.spark.sql.internal.SQLConf
@@ -650,4 +650,87 @@ class AnalysisSuite extends AnalysisTest with Matchers {
650650
assertAnalysisError(parsePlan("INSERT INTO test VALUES (1)"),
651651
Seq("Table not found: test"))
652652
}
653+
654+
test("check CollectMetrics resolved") {
655+
val a = testRelation.output.head
656+
val sum = Sum(a).toAggregateExpression().as("sum")
657+
val random_sum = Sum(Rand(1L)).toAggregateExpression().as("rand_sum")
658+
val literal = Literal(1).as("lit")
659+
660+
// Ok
661+
assert(CollectMetrics("event", literal :: sum :: random_sum :: Nil, testRelation).resolved)
662+
663+
// Bad name
664+
assert(!CollectMetrics("", sum :: Nil, testRelation).resolved)
665+
assertAnalysisError(CollectMetrics("", sum :: Nil, testRelation),
666+
"observed metrics should be named" :: Nil)
667+
668+
// No columns
669+
assert(!CollectMetrics("evt", Nil, testRelation).resolved)
670+
671+
def checkAnalysisError(exprs: Seq[NamedExpression], errors: String*): Unit = {
672+
assertAnalysisError(CollectMetrics("event", exprs, testRelation), errors)
673+
}
674+
675+
// Unwrapped attribute
676+
checkAnalysisError(
677+
a :: Nil,
678+
"Attribute", "can only be used as an argument to an aggregate function")
679+
680+
// Unwrapped non-deterministic expression
681+
checkAnalysisError(
682+
Rand(10).as("rnd") :: Nil,
683+
"non-deterministic expression", "can only be used as an argument to an aggregate function")
684+
685+
// Distinct aggregate
686+
checkAnalysisError(
687+
Sum(a).toAggregateExpression(isDistinct = true).as("sum") :: Nil,
688+
"distinct aggregates are not allowed in observed metrics, but found")
689+
690+
// Nested aggregate
691+
checkAnalysisError(
692+
Sum(Sum(a).toAggregateExpression()).toAggregateExpression().as("sum") :: Nil,
693+
"nested aggregates are not allowed in observed metrics, but found")
694+
695+
// Windowed aggregate
696+
val windowExpr = WindowExpression(
697+
RowNumber(),
698+
WindowSpecDefinition(Nil, a.asc :: Nil,
699+
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)))
700+
checkAnalysisError(
701+
windowExpr.as("rn") :: Nil,
702+
"window expressions are not allowed in observed metrics, but found")
703+
}
704+
705+
test("check CollectMetrics duplicates") {
706+
val a = testRelation.output.head
707+
val sum = Sum(a).toAggregateExpression().as("sum")
708+
val count = Count(Literal(1)).toAggregateExpression().as("cnt")
709+
710+
// Same result - duplicate names are allowed
711+
assertAnalysisSuccess(Union(
712+
CollectMetrics("evt1", count :: Nil, testRelation) ::
713+
CollectMetrics("evt1", count :: Nil, testRelation) :: Nil))
714+
715+
// Same children, structurally different metrics - fail
716+
assertAnalysisError(Union(
717+
CollectMetrics("evt1", count :: Nil, testRelation) ::
718+
CollectMetrics("evt1", sum :: Nil, testRelation) :: Nil),
719+
"Multiple definitions of observed metrics" :: "evt1" :: Nil)
720+
721+
// Different children, same metrics - fail
722+
val b = 'b.string
723+
val tblB = LocalRelation(b)
724+
assertAnalysisError(Union(
725+
CollectMetrics("evt1", count :: Nil, testRelation) ::
726+
CollectMetrics("evt1", count :: Nil, tblB) :: Nil),
727+
"Multiple definitions of observed metrics" :: "evt1" :: Nil)
728+
729+
// Subquery different tree - fail
730+
val subquery = Aggregate(Nil, sum :: Nil, CollectMetrics("evt1", count :: Nil, testRelation))
731+
val query = Project(
732+
b :: ScalarSubquery(subquery, Nil).as("sum") :: Nil,
733+
CollectMetrics("evt1", count :: Nil, tblB))
734+
assertAnalysisError(query, "Multiple definitions of observed metrics" :: "evt1" :: Nil)
735+
}
653736
}

0 commit comments

Comments
 (0)