Skip to content

Commit f4a17e9

Browse files
httfighterMarcelo Vanzin
authored andcommitted
[SPARK-26726] Synchronize the amount of memory used by the broadcast variable to the UI display
…not synchronized to the UI display ## What changes were proposed in this pull request? The amount of memory used by the broadcast variable is not synchronized to the UI display. I added the case for BroadcastBlockId and updated the memory usage. ## How was this patch tested? We can test this patch with unit tests. Closes apache#23649 from httfighter/SPARK-26726. Lead-authored-by: 韩田田00222924 <[email protected]> Co-authored-by: [email protected] <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent df4c53e commit f4a17e9

File tree

2 files changed

+53
-9
lines changed

2 files changed

+53
-9
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -829,6 +829,7 @@ private[spark] class AppStatusListener(
829829
event.blockUpdatedInfo.blockId match {
830830
case block: RDDBlockId => updateRDDBlock(event, block)
831831
case stream: StreamBlockId => updateStreamBlock(event, stream)
832+
case broadcast: BroadcastBlockId => updateBroadcastBlock(event, broadcast)
832833
case _ =>
833834
}
834835
}
@@ -887,15 +888,7 @@ private[spark] class AppStatusListener(
887888
// Update the executor stats first, since they are used to calculate the free memory
888889
// on tracked RDD distributions.
889890
maybeExec.foreach { exec =>
890-
if (exec.hasMemoryInfo) {
891-
if (storageLevel.useOffHeap) {
892-
exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta)
893-
} else {
894-
exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta)
895-
}
896-
}
897-
exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta)
898-
exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta)
891+
updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta)
899892
}
900893

901894
// Update the block entry in the RDD info, keeping track of the deltas above so that we
@@ -997,6 +990,39 @@ private[spark] class AppStatusListener(
997990
}
998991
}
999992

993+
private def updateBroadcastBlock(
994+
event: SparkListenerBlockUpdated,
995+
broadcast: BroadcastBlockId): Unit = {
996+
val executorId = event.blockUpdatedInfo.blockManagerId.executorId
997+
liveExecutors.get(executorId).foreach { exec =>
998+
val now = System.nanoTime()
999+
val storageLevel = event.blockUpdatedInfo.storageLevel
1000+
1001+
// Whether values are being added to or removed from the existing accounting.
1002+
val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1)
1003+
val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1)
1004+
1005+
updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta)
1006+
maybeUpdate(exec, now)
1007+
}
1008+
}
1009+
1010+
private def updateExecutorMemoryDiskInfo(
1011+
exec: LiveExecutor,
1012+
storageLevel: StorageLevel,
1013+
memoryDelta: Long,
1014+
diskDelta: Long): Unit = {
1015+
if (exec.hasMemoryInfo) {
1016+
if (storageLevel.useOffHeap) {
1017+
exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta)
1018+
} else {
1019+
exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta)
1020+
}
1021+
}
1022+
exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta)
1023+
exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta)
1024+
}
1025+
10001026
private def getOrCreateStage(info: StageInfo): LiveStage = {
10011027
val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber),
10021028
new Function[(Int, Int), LiveStage]() {

core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,6 +938,24 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
938938
intercept[NoSuchElementException] {
939939
check[StreamBlockData](stream1.name) { _ => () }
940940
}
941+
942+
// Update a BroadcastBlock.
943+
val broadcast1 = BroadcastBlockId(1L)
944+
listener.onBlockUpdated(SparkListenerBlockUpdated(
945+
BlockUpdatedInfo(bm1, broadcast1, level, 1L, 1L)))
946+
947+
check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
948+
assert(exec.info.memoryUsed === 1L)
949+
assert(exec.info.diskUsed === 1L)
950+
}
951+
952+
// Drop a BroadcastBlock.
953+
listener.onBlockUpdated(SparkListenerBlockUpdated(
954+
BlockUpdatedInfo(bm1, broadcast1, StorageLevel.NONE, 1L, 1L)))
955+
check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
956+
assert(exec.info.memoryUsed === 0)
957+
assert(exec.info.diskUsed === 0)
958+
}
941959
}
942960

943961
test("eviction of old data") {

0 commit comments

Comments
 (0)