Skip to content

Commit cff0745

Browse files
committed
Add fallback storage read metrics
1 parent 2dbf8eb commit cff0745

File tree

3 files changed

+16
-3
lines changed

3 files changed

+16
-3
lines changed

core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
4646
private[executor] val _localMergedBytesRead = new LongAccumulator
4747
private[executor] val _remoteReqsDuration = new LongAccumulator
4848
private[executor] val _remoteMergedReqsDuration = new LongAccumulator
49+
private[executor] val _fallbackStorageBlocksFetched = new LongAccumulator
50+
private[executor] val _fallbackStorageBytesRead = new LongAccumulator
4951

5052
/**
5153
* Number of remote blocks fetched in this shuffle by this task.
@@ -215,6 +217,8 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
215217
_localMergedBytesRead.setValue(0)
216218
_remoteReqsDuration.setValue(0)
217219
_remoteMergedReqsDuration.setValue(0)
220+
_fallbackStorageBlocksFetched.setValue(0)
221+
_fallbackStorageBytesRead.setValue(0)
218222
metrics.foreach { metric =>
219223
_remoteBlocksFetched.add(metric.remoteBlocksFetched)
220224
_localBlocksFetched.add(metric.localBlocksFetched)
@@ -233,6 +237,8 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
233237
_localMergedBytesRead.add(metric.localMergedBytesRead)
234238
_remoteReqsDuration.add(metric.remoteReqsDuration)
235239
_remoteMergedReqsDuration.add(metric.remoteMergedReqsDuration)
240+
_fallbackStorageBlocksFetched.add(metric.fallbackStorageBlocksFetched)
241+
_fallbackStorageBytesRead.add(metric.fallbackStorageBytesRead)
236242
}
237243
}
238244
}
@@ -261,6 +267,8 @@ private[spark] class TempShuffleReadMetrics extends ShuffleReadMetricsReporter {
261267
private[this] var _localMergedBytesRead = 0L
262268
private[this] var _remoteReqsDuration = 0L
263269
private[this] var _remoteMergedReqsDuration = 0L
270+
private[this] var _fallbackStorageBlocksFetched = 0L
271+
private[this] var _fallbackStorageBytesRead = 0L
264272

265273
override def incRemoteBlocksFetched(v: Long): Unit = _remoteBlocksFetched += v
266274
override def incLocalBlocksFetched(v: Long): Unit = _localBlocksFetched += v
@@ -279,6 +287,8 @@ private[spark] class TempShuffleReadMetrics extends ShuffleReadMetricsReporter {
279287
override def incLocalMergedBytesRead(v: Long): Unit = _localMergedBytesRead += v
280288
override def incRemoteReqsDuration(v: Long): Unit = _remoteReqsDuration += v
281289
override def incRemoteMergedReqsDuration(v: Long): Unit = _remoteMergedReqsDuration += v
290+
override def incFallbackStorageBlocksFetched(v: Long): Unit = _fallbackStorageBlocksFetched += v
291+
override def incFallbackStorageBytesRead(v: Long): Unit = _fallbackStorageBytesRead += v
282292

283293
def remoteBlocksFetched: Long = _remoteBlocksFetched
284294
def localBlocksFetched: Long = _localBlocksFetched
@@ -297,4 +307,6 @@ private[spark] class TempShuffleReadMetrics extends ShuffleReadMetricsReporter {
297307
def localMergedBytesRead: Long = _localMergedBytesRead
298308
def remoteReqsDuration: Long = _remoteReqsDuration
299309
def remoteMergedReqsDuration: Long = _remoteMergedReqsDuration
310+
def fallbackStorageBlocksFetched: Long = _fallbackStorageBlocksFetched
311+
def fallbackStorageBytesRead: Long = _fallbackStorageBytesRead
300312
}

core/src/main/scala/org/apache/spark/shuffle/metrics.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ private[spark] trait ShuffleReadMetricsReporter {
4343
private[spark] def incLocalMergedBytesRead(v: Long): Unit
4444
private[spark] def incRemoteReqsDuration(v: Long): Unit
4545
private[spark] def incRemoteMergedReqsDuration(v: Long): Unit
46+
private[spark] def incFallbackStorageBlocksFetched(v: Long): Unit
47+
private[spark] def incFallbackStorageBytesRead(v: Long): Unit
4648
}
4749

4850

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1315,9 +1315,8 @@ final class ShuffleBlockFetcherIterator(
13151315
try {
13161316
// materialize the block ManagedBuffer and store data in SuccessFetchResult
13171317
val buf = new NioManagedBuffer(request.block.nioByteBuffer())
1318-
// TODO: add fallback storage metrics
1319-
shuffleMetrics.incLocalBlocksFetched(1)
1320-
shuffleMetrics.incLocalBytesRead(buf.size)
1318+
shuffleMetrics.incFallbackStorageBlocksFetched(1)
1319+
shuffleMetrics.incFallbackStorageBytesRead(buf.size)
13211320
val result = SuccessFetchResult(
13221321
request.blockId, request.mapIndex, FallbackStorage.FALLBACK_BLOCK_MANAGER_ID,
13231322
request.size, buf, isNetworkReqDone = true)

0 commit comments

Comments
 (0)