Skip to content

Commit 9362c5c

Browse files
mukulmurthyzsxwing
authored andcommitted
[SPARK-25449][CORE] Heartbeat shouldn't include accumulators for zero metrics
## What changes were proposed in this pull request? Heartbeat shouldn't include accumulators for zero metrics. Heartbeats sent from executors to the driver every 10 seconds contain metrics and are generally on the order of a few KBs. However, for large jobs with lots of tasks, heartbeats can be on the order of tens of MBs, causing tasks to die with heartbeat failures. We can mitigate this by not sending zero metrics to the driver. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Closes apache#22473 from mukulmurthy/25449-heartbeat. Authored-by: Mukul Murthy <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent a281465 commit 9362c5c

File tree

6 files changed

+154
-27
lines changed

6 files changed

+154
-27
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -609,13 +609,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
609609
require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
610610
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
611611

612-
val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s")
613-
val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s")
612+
val executorTimeoutThresholdMs =
613+
getTimeAsSeconds("spark.network.timeout", "120s") * 1000
614+
val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL)
614615
// If spark.executor.heartbeatInterval bigger than spark.network.timeout,
615616
// it will almost always cause ExecutorLostFailure. See SPARK-22754.
616-
require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " +
617-
s"spark.network.timeout=${executorTimeoutThreshold}s must be no less than the value of " +
618-
s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.")
617+
require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " +
618+
s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be no less than the value of " +
619+
s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.")
619620
}
620621

621622
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ class SparkContext(config: SparkConf) extends Logging {
499499

500500
// create and start the heartbeater for collecting memory metrics
501501
_heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, "driver-heartbeater",
502-
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
502+
conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
503503
_heartbeater.start()
504504

505505
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy
2828

2929
import scala.collection.JavaConverters._
3030
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
31+
import scala.concurrent.duration._
3132
import scala.util.control.NonFatal
3233

3334
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -120,7 +121,7 @@ private[spark] class Executor(
120121
}
121122

122123
// Whether to load classes in user jars before those in Spark jars
123-
private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)
124+
private val userClassPathFirst = conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)
124125

125126
// Whether to monitor killed / interrupted tasks
126127
private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", false)
@@ -170,21 +171,32 @@ private[spark] class Executor(
170171
// Maintains the list of running tasks.
171172
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
172173

174+
/**
175+
* When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
176+
* times, it should kill itself. The default value is 60. It means we will retry to send
177+
* heartbeats about 10 minutes because the heartbeat interval is 10s.
178+
*/
179+
private val HEARTBEAT_MAX_FAILURES = conf.get(EXECUTOR_HEARTBEAT_MAX_FAILURES)
180+
181+
/**
182+
* Whether to drop empty accumulators from heartbeats sent to the driver. Including the empty
183+
* accumulators (that satisfy isZero) can make the size of the heartbeat message very large.
184+
*/
185+
private val HEARTBEAT_DROP_ZEROES = conf.get(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES)
186+
187+
/**
188+
* Interval to send heartbeats, in milliseconds
189+
*/
190+
private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)
191+
173192
// Executor for the heartbeat task.
174193
private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat,
175-
"executor-heartbeater", conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
194+
"executor-heartbeater", HEARTBEAT_INTERVAL_MS)
176195

177196
// must be initialized before running startDriverHeartbeat()
178197
private val heartbeatReceiverRef =
179198
RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
180199

181-
/**
182-
* When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
183-
* times, it should kill itself. The default value is 60. It means we will retry to send
184-
* heartbeats about 10 minutes because the heartbeat interval is 10s.
185-
*/
186-
private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60)
187-
188200
/**
189201
* Count the failure times of heartbeat. It should only be accessed in the heartbeat thread. Each
190202
* successful heartbeat will reset it to 0.
@@ -834,15 +846,21 @@ private[spark] class Executor(
834846
if (taskRunner.task != null) {
835847
taskRunner.task.metrics.mergeShuffleReadMetrics()
836848
taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
837-
accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators()))
849+
val accumulatorsToReport =
850+
if (HEARTBEAT_DROP_ZEROES) {
851+
taskRunner.task.metrics.accumulators().filterNot(_.isZero)
852+
} else {
853+
taskRunner.task.metrics.accumulators()
854+
}
855+
accumUpdates += ((taskRunner.taskId, accumulatorsToReport))
838856
}
839857
}
840858

841859
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId,
842860
executorUpdates)
843861
try {
844862
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
845-
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
863+
message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key))
846864
if (response.reregisterBlockManager) {
847865
logInfo("Told to re-register on heartbeat")
848866
env.blockManager.reregister()

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,20 @@ package object config {
8383
private[spark] val EXECUTOR_CLASS_PATH =
8484
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
8585

86+
private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES =
87+
ConfigBuilder("spark.executor.heartbeat.dropZeroAccumulatorUpdates")
88+
.internal()
89+
.booleanConf
90+
.createWithDefault(true)
91+
92+
private[spark] val EXECUTOR_HEARTBEAT_INTERVAL =
93+
ConfigBuilder("spark.executor.heartbeatInterval")
94+
.timeConf(TimeUnit.MILLISECONDS)
95+
.createWithDefaultString("10s")
96+
97+
private[spark] val EXECUTOR_HEARTBEAT_MAX_FAILURES =
98+
ConfigBuilder("spark.executor.heartbeat.maxFailures").internal().intConf.createWithDefault(60)
99+
86100
private[spark] val EXECUTOR_JAVA_OPTIONS =
87101
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
88102

core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala

Lines changed: 102 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
2121
import java.lang.Thread.UncaughtExceptionHandler
2222
import java.nio.ByteBuffer
2323
import java.util.Properties
24-
import java.util.concurrent.{CountDownLatch, TimeUnit}
24+
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
2525
import java.util.concurrent.atomic.AtomicBoolean
2626

27+
import scala.collection.mutable.ArrayBuffer
2728
import scala.collection.mutable.Map
2829
import scala.concurrent.duration._
2930
import scala.language.postfixOps
@@ -33,22 +34,25 @@ import org.mockito.Matchers.{any, eq => meq}
3334
import org.mockito.Mockito.{inOrder, verify, when}
3435
import org.mockito.invocation.InvocationOnMock
3536
import org.mockito.stubbing.Answer
37+
import org.scalatest.PrivateMethodTester
3638
import org.scalatest.concurrent.Eventually
3739
import org.scalatest.mockito.MockitoSugar
3840

3941
import org.apache.spark._
4042
import org.apache.spark.TaskState.TaskState
41-
import org.apache.spark.memory.MemoryManager
43+
import org.apache.spark.internal.config._
44+
import org.apache.spark.memory.TestMemoryManager
4245
import org.apache.spark.metrics.MetricsSystem
4346
import org.apache.spark.rdd.RDD
44-
import org.apache.spark.rpc.RpcEnv
45-
import org.apache.spark.scheduler.{FakeTask, ResultTask, TaskDescription}
47+
import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcTimeout}
48+
import org.apache.spark.scheduler.{FakeTask, ResultTask, Task, TaskDescription}
4649
import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
4750
import org.apache.spark.shuffle.FetchFailedException
48-
import org.apache.spark.storage.BlockManagerId
49-
import org.apache.spark.util.UninterruptibleThread
51+
import org.apache.spark.storage.{BlockManager, BlockManagerId}
52+
import org.apache.spark.util.{LongAccumulator, UninterruptibleThread}
5053

51-
class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with Eventually {
54+
class ExecutorSuite extends SparkFunSuite
55+
with LocalSparkContext with MockitoSugar with Eventually with PrivateMethodTester {
5256

5357
test("SPARK-15963: Catch `TaskKilledException` correctly in Executor.TaskRunner") {
5458
// mock some objects to make Executor.launchTask() happy
@@ -252,18 +256,107 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug
252256
}
253257
}
254258

259+
test("Heartbeat should drop zero accumulator updates") {
260+
heartbeatZeroAccumulatorUpdateTest(true)
261+
}
262+
263+
test("Heartbeat should not drop zero accumulator updates when the conf is disabled") {
264+
heartbeatZeroAccumulatorUpdateTest(false)
265+
}
266+
267+
private def withHeartbeatExecutor(confs: (String, String)*)
268+
(f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = {
269+
val conf = new SparkConf
270+
confs.foreach { case (k, v) => conf.set(k, v) }
271+
val serializer = new JavaSerializer(conf)
272+
val env = createMockEnv(conf, serializer)
273+
val executor =
274+
new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, isLocal = true)
275+
val executorClass = classOf[Executor]
276+
277+
// Save all heartbeats sent into an ArrayBuffer for verification
278+
val heartbeats = ArrayBuffer[Heartbeat]()
279+
val mockReceiver = mock[RpcEndpointRef]
280+
when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
281+
.thenAnswer(new Answer[HeartbeatResponse] {
282+
override def answer(invocation: InvocationOnMock): HeartbeatResponse = {
283+
val args = invocation.getArguments()
284+
val mock = invocation.getMock
285+
heartbeats += args(0).asInstanceOf[Heartbeat]
286+
HeartbeatResponse(false)
287+
}
288+
})
289+
val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
290+
receiverRef.setAccessible(true)
291+
receiverRef.set(executor, mockReceiver)
292+
293+
f(executor, heartbeats)
294+
}
295+
296+
private def heartbeatZeroAccumulatorUpdateTest(dropZeroMetrics: Boolean): Unit = {
297+
val c = EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key -> dropZeroMetrics.toString
298+
withHeartbeatExecutor(c) { (executor, heartbeats) =>
299+
val reportHeartbeat = PrivateMethod[Unit]('reportHeartBeat)
300+
301+
// When no tasks are running, there should be no accumulators sent in heartbeat
302+
executor.invokePrivate(reportHeartbeat())
303+
// invokeReportHeartbeat(executor)
304+
assert(heartbeats.length == 1)
305+
assert(heartbeats(0).accumUpdates.length == 0,
306+
"No updates should be sent when no tasks are running")
307+
308+
// When we start a task with a nonzero accumulator, that should end up in the heartbeat
309+
val metrics = new TaskMetrics()
310+
val nonZeroAccumulator = new LongAccumulator()
311+
nonZeroAccumulator.add(1)
312+
metrics.registerAccumulator(nonZeroAccumulator)
313+
314+
val executorClass = classOf[Executor]
315+
val tasksMap = {
316+
val field =
317+
executorClass.getDeclaredField("org$apache$spark$executor$Executor$$runningTasks")
318+
field.setAccessible(true)
319+
field.get(executor).asInstanceOf[ConcurrentHashMap[Long, executor.TaskRunner]]
320+
}
321+
val mockTaskRunner = mock[executor.TaskRunner]
322+
val mockTask = mock[Task[Any]]
323+
when(mockTask.metrics).thenReturn(metrics)
324+
when(mockTaskRunner.taskId).thenReturn(6)
325+
when(mockTaskRunner.task).thenReturn(mockTask)
326+
when(mockTaskRunner.startGCTime).thenReturn(1)
327+
tasksMap.put(6, mockTaskRunner)
328+
329+
executor.invokePrivate(reportHeartbeat())
330+
assert(heartbeats.length == 2)
331+
val updates = heartbeats(1).accumUpdates
332+
assert(updates.length == 1 && updates(0)._1 == 6,
333+
"Heartbeat should only send update for the one task running")
334+
val accumsSent = updates(0)._2.length
335+
assert(accumsSent > 0, "The nonzero accumulator we added should be sent")
336+
if (dropZeroMetrics) {
337+
assert(accumsSent == metrics.accumulators().count(!_.isZero),
338+
"The number of accumulators sent should match the number of nonzero accumulators")
339+
} else {
340+
assert(accumsSent == metrics.accumulators().length,
341+
"The number of accumulators sent should match the number of total accumulators")
342+
}
343+
}
344+
}
345+
255346
private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = {
256347
val mockEnv = mock[SparkEnv]
257348
val mockRpcEnv = mock[RpcEnv]
258349
val mockMetricsSystem = mock[MetricsSystem]
259-
val mockMemoryManager = mock[MemoryManager]
350+
val mockBlockManager = mock[BlockManager]
260351
when(mockEnv.conf).thenReturn(conf)
261352
when(mockEnv.serializer).thenReturn(serializer)
262353
when(mockEnv.serializerManager).thenReturn(mock[SerializerManager])
263354
when(mockEnv.rpcEnv).thenReturn(mockRpcEnv)
264355
when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem)
265-
when(mockEnv.memoryManager).thenReturn(mockMemoryManager)
356+
when(mockEnv.memoryManager).thenReturn(new TestMemoryManager(conf))
266357
when(mockEnv.closureSerializer).thenReturn(serializer)
358+
when(mockBlockManager.blockManagerId).thenReturn(BlockManagerId("1", "hostA", 1234))
359+
when(mockEnv.blockManager).thenReturn(mockBlockManager)
267360
SparkEnv.set(mockEnv)
268361
mockEnv
269362
}

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.mesos.SchedulerDriver
3333
import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState}
3434
import org.apache.spark.deploy.mesos.config._
3535
import org.apache.spark.internal.config
36+
import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL
3637
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
3738
import org.apache.spark.network.netty.SparkTransportConf
3839
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -635,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
635636
externalShufflePort,
636637
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
637638
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"),
638-
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
639+
sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
639640
slave.shuffleRegistered = true
640641
}
641642

0 commit comments

Comments
 (0)