Skip to content

Commit 93112e6

Browse files
xuanyuankingliyuanjian
authored andcommitted
[SPARK-26142][SQL] Implement shuffle read metrics in SQL
## What changes were proposed in this pull request? Implement `SQLShuffleMetricsReporter` on the sql side as the customized ShuffleMetricsReporter, which extended the `TempShuffleReadMetrics` and update SQLMetrics, in this way shuffle metrics can be reported in the SQL UI. ## How was this patch tested? Add UT in SQLMetricsSuite. Manual test locally, before: ![image](https://user-images.githubusercontent.com/4833765/48960517-30f97880-efa8-11e8-982c-92d05938fd1d.png) after: ![image](https://user-images.githubusercontent.com/4833765/48960587-b54bfb80-efa8-11e8-8e95-7a3c8c74cc5c.png) Closes apache#23128 from xuanyuanking/SPARK-26142. Lead-authored-by: Yuanjian Li <[email protected]> Co-authored-by: liyuanjian <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 09a91d9 commit 93112e6

File tree

7 files changed

+126
-11
lines changed

7 files changed

+126
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.Arrays
2222
import org.apache.spark._
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.sql.catalyst.InternalRow
25+
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleMetricsReporter}
2526

2627
/**
2728
* The [[Partition]] used by [[ShuffledRowRDD]]. A post-shuffle partition
@@ -112,6 +113,7 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A
112113
*/
113114
class ShuffledRowRDD(
114115
var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
116+
metrics: Map[String, SQLMetric],
115117
specifiedPartitionStartIndices: Option[Array[Int]] = None)
116118
extends RDD[InternalRow](dependency.rdd.context, Nil) {
117119

@@ -154,7 +156,10 @@ class ShuffledRowRDD(
154156

155157
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
156158
val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
157-
val metrics = context.taskMetrics().createTempShuffleReadMetrics()
159+
val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
160+
// `SQLShuffleMetricsReporter` will update its own metrics for SQL exchange operator,
161+
// as well as the `tempMetrics` for basic shuffle metrics.
162+
val sqlMetricsReporter = new SQLShuffleMetricsReporter(tempMetrics, metrics)
158163
// The range of pre-shuffle partitions that we are fetching at here is
159164
// [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1].
160165
val reader =
@@ -163,7 +168,7 @@ class ShuffledRowRDD(
163168
shuffledRowPartition.startPreShufflePartitionIndex,
164169
shuffledRowPartition.endPreShufflePartitionIndex,
165170
context,
166-
metrics)
171+
sqlMetricsReporter)
167172
reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2)
168173
}
169174

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ case class ShuffleExchangeExec(
4848
// e.g. it can be null on the Executor side
4949

5050
override lazy val metrics = Map(
51-
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
51+
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
52+
) ++ SQLMetrics.getShuffleReadMetrics(sparkContext)
5253

5354
override def nodeName: String = {
5455
val extraInfo = coordinator match {
@@ -108,7 +109,7 @@ case class ShuffleExchangeExec(
108109
assert(newPartitioning.isInstanceOf[HashPartitioning])
109110
newPartitioning = UnknownPartitioning(indices.length)
110111
}
111-
new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices)
112+
new ShuffledRowRDD(shuffleDependency, metrics, specifiedPartitionStartIndices)
112113
}
113114

114115
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
2525
import org.apache.spark.sql.catalyst.plans.physical._
2626
import org.apache.spark.sql.catalyst.util.truncatedString
2727
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
28+
import org.apache.spark.sql.execution.metric.SQLMetrics
2829

2930
/**
3031
* Take the first `limit` elements and collect them to a single partition.
@@ -37,11 +38,13 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
3738
override def outputPartitioning: Partitioning = SinglePartition
3839
override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
3940
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
41+
override lazy val metrics = SQLMetrics.getShuffleReadMetrics(sparkContext)
4042
protected override def doExecute(): RDD[InternalRow] = {
4143
val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
4244
val shuffled = new ShuffledRowRDD(
4345
ShuffleExchangeExec.prepareShuffleDependency(
44-
locallyLimited, child.output, SinglePartition, serializer))
46+
locallyLimited, child.output, SinglePartition, serializer),
47+
metrics)
4548
shuffled.mapPartitionsInternal(_.take(limit))
4649
}
4750
}
@@ -151,6 +154,8 @@ case class TakeOrderedAndProjectExec(
151154

152155
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
153156

157+
override lazy val metrics = SQLMetrics.getShuffleReadMetrics(sparkContext)
158+
154159
protected override def doExecute(): RDD[InternalRow] = {
155160
val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
156161
val localTopK: RDD[InternalRow] = {
@@ -160,7 +165,8 @@ case class TakeOrderedAndProjectExec(
160165
}
161166
val shuffled = new ShuffledRowRDD(
162167
ShuffleExchangeExec.prepareShuffleDependency(
163-
localTopK, child.output, SinglePartition, serializer))
168+
localTopK, child.output, SinglePartition, serializer),
169+
metrics)
164170
shuffled.mapPartitions { iter =>
165171
val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
166172
if (projectList != child.output) {

sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ object SQLMetrics {
8282

8383
private val baseForAvgMetric: Int = 10
8484

85+
val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
86+
val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
87+
val REMOTE_BYTES_READ = "remoteBytesRead"
88+
val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk"
89+
val LOCAL_BYTES_READ = "localBytesRead"
90+
val FETCH_WAIT_TIME = "fetchWaitTime"
91+
val RECORDS_READ = "recordsRead"
92+
8593
/**
8694
* Converts a double value to long value by multiplying a base integer, so we can store it in
8795
* `SQLMetrics`. It only works for average metrics. When showing the metrics on UI, we restore
@@ -194,4 +202,16 @@ object SQLMetrics {
194202
SparkListenerDriverAccumUpdates(executionId.toLong, metrics.map(m => m.id -> m.value)))
195203
}
196204
}
205+
206+
/**
207+
* Create all shuffle read relative metrics and return the Map.
208+
*/
209+
def getShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map(
210+
REMOTE_BLOCKS_FETCHED -> createMetric(sc, "remote blocks fetched"),
211+
LOCAL_BLOCKS_FETCHED -> createMetric(sc, "local blocks fetched"),
212+
REMOTE_BYTES_READ -> createSizeMetric(sc, "remote bytes read"),
213+
REMOTE_BYTES_READ_TO_DISK -> createSizeMetric(sc, "remote bytes read to disk"),
214+
LOCAL_BYTES_READ -> createSizeMetric(sc, "local bytes read"),
215+
FETCH_WAIT_TIME -> createTimingMetric(sc, "fetch wait time"),
216+
RECORDS_READ -> createMetric(sc, "records read"))
197217
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.metric
19+
20+
import org.apache.spark.executor.TempShuffleReadMetrics
21+
22+
/**
23+
* A shuffle metrics reporter for SQL exchange operators.
24+
* @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext.
25+
* @param metrics All metrics in current SparkPlan. This param should not empty and
26+
* contains all shuffle metrics defined in [[SQLMetrics.getShuffleReadMetrics]].
27+
*/
28+
private[spark] class SQLShuffleMetricsReporter(
29+
tempMetrics: TempShuffleReadMetrics,
30+
metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
31+
private[this] val _remoteBlocksFetched = metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED)
32+
private[this] val _localBlocksFetched = metrics(SQLMetrics.LOCAL_BLOCKS_FETCHED)
33+
private[this] val _remoteBytesRead = metrics(SQLMetrics.REMOTE_BYTES_READ)
34+
private[this] val _remoteBytesReadToDisk = metrics(SQLMetrics.REMOTE_BYTES_READ_TO_DISK)
35+
private[this] val _localBytesRead = metrics(SQLMetrics.LOCAL_BYTES_READ)
36+
private[this] val _fetchWaitTime = metrics(SQLMetrics.FETCH_WAIT_TIME)
37+
private[this] val _recordsRead = metrics(SQLMetrics.RECORDS_READ)
38+
39+
override def incRemoteBlocksFetched(v: Long): Unit = {
40+
_remoteBlocksFetched.add(v)
41+
tempMetrics.incRemoteBlocksFetched(v)
42+
}
43+
override def incLocalBlocksFetched(v: Long): Unit = {
44+
_localBlocksFetched.add(v)
45+
tempMetrics.incLocalBlocksFetched(v)
46+
}
47+
override def incRemoteBytesRead(v: Long): Unit = {
48+
_remoteBytesRead.add(v)
49+
tempMetrics.incRemoteBytesRead(v)
50+
}
51+
override def incRemoteBytesReadToDisk(v: Long): Unit = {
52+
_remoteBytesReadToDisk.add(v)
53+
tempMetrics.incRemoteBytesReadToDisk(v)
54+
}
55+
override def incLocalBytesRead(v: Long): Unit = {
56+
_localBytesRead.add(v)
57+
tempMetrics.incLocalBytesRead(v)
58+
}
59+
override def incFetchWaitTime(v: Long): Unit = {
60+
_fetchWaitTime.add(v)
61+
tempMetrics.incFetchWaitTime(v)
62+
}
63+
override def incRecordsRead(v: Long): Unit = {
64+
_recordsRead.add(v)
65+
tempMetrics.incRecordsRead(v)
66+
}
67+
}

sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
2626
import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
2727
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
2828
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
29+
import org.apache.spark.sql.execution.metric.SQLMetrics
2930
import org.apache.spark.sql.types._
3031
import org.apache.spark.storage.ShuffleBlockId
3132
import org.apache.spark.util.collection.ExternalSorter
@@ -137,7 +138,9 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession {
137138
rowsRDD,
138139
new PartitionIdPassthrough(2),
139140
new UnsafeRowSerializer(2))
140-
val shuffled = new ShuffledRowRDD(dependency)
141+
val shuffled = new ShuffledRowRDD(
142+
dependency,
143+
SQLMetrics.getShuffleReadMetrics(spark.sparkContext))
141144
shuffled.count()
142145
}
143146
}

sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,13 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
9494
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"),
9595
Map("number of output rows" -> 1L,
9696
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
97+
val shuffleExpected1 = Map(
98+
"records read" -> 2L,
99+
"local blocks fetched" -> 2L,
100+
"remote blocks fetched" -> 0L)
97101
testSparkPlanMetrics(df, 1, Map(
98102
2L -> (("HashAggregate", expected1(0))),
103+
1L -> (("Exchange", shuffleExpected1)),
99104
0L -> (("HashAggregate", expected1(1))))
100105
)
101106

@@ -106,8 +111,13 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
106111
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"),
107112
Map("number of output rows" -> 3L,
108113
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
114+
val shuffleExpected2 = Map(
115+
"records read" -> 4L,
116+
"local blocks fetched" -> 4L,
117+
"remote blocks fetched" -> 0L)
109118
testSparkPlanMetrics(df2, 1, Map(
110119
2L -> (("HashAggregate", expected2(0))),
120+
1L -> (("Exchange", shuffleExpected2)),
111121
0L -> (("HashAggregate", expected2(1))))
112122
)
113123
}
@@ -191,7 +201,11 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
191201
testSparkPlanMetrics(df, 1, Map(
192202
0L -> (("SortMergeJoin", Map(
193203
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
194-
"number of output rows" -> 4L))))
204+
"number of output rows" -> 4L))),
205+
2L -> (("Exchange", Map(
206+
"records read" -> 4L,
207+
"local blocks fetched" -> 2L,
208+
"remote blocks fetched" -> 0L))))
195209
)
196210
}
197211
}
@@ -208,15 +222,15 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
208222
"SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
209223
testSparkPlanMetrics(df, 1, Map(
210224
0L -> (("SortMergeJoin", Map(
211-
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
225+
// It's 8 because we read 6 rows in the left and 2 row in the right one
212226
"number of output rows" -> 8L))))
213227
)
214228

215229
val df2 = spark.sql(
216230
"SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a")
217231
testSparkPlanMetrics(df2, 1, Map(
218232
0L -> (("SortMergeJoin", Map(
219-
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
233+
// It's 8 because we read 6 rows in the left and 2 row in the right one
220234
"number of output rows" -> 8L))))
221235
)
222236
}
@@ -287,7 +301,6 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
287301
// Assume the execution plan is
288302
// ... -> ShuffledHashJoin(nodeId = 1) -> Project(nodeId = 0)
289303
val df = df1.join(df2, "key")
290-
val metrics = getSparkPlanMetrics(df, 1, Set(1L))
291304
testSparkPlanMetrics(df, 1, Map(
292305
1L -> (("ShuffledHashJoin", Map(
293306
"number of output rows" -> 2L,

0 commit comments

Comments
 (0)