Skip to content

Commit 04a40ab

Browse files
ForVicMridul Muralidharan
authored andcommitted
[SPARK-53157][CORE] Decouple driver and executor polling intervals
### What changes were proposed in this pull request? Add a config `spark.driver.metrics.pollingInterval`, and schedule driver polling interval / heartbeat at that schedule. ### Why are the changes needed? Decouple driver and executor heartbeat intervals. Due to sampling frequencies in memory metric reporting intervals we do not have a 100% accurate view of stats at drivers and executors. This is particularly observed at the driver, where we don't have the benefit of a larger sample size of metrics from N executors in application. Here we can provide a way increase (or change in general) the rate of collection of metrics at the driver, to aid in overcoming the sampling problem, without requiring users to also increase executor heartbeat frequencies. ### Does this PR introduce _any_ user-facing change? Yes, introduces a spark config ### How was this patch tested? Verified that metric collection was improved when sampling rates were increased, and verified that the number of events were expected when rate was changed. Methodology for validating that increased driver heartbeat intervals would improve memory collection: 1. Using a 6gb driver heap, wrote a job to broadcast a table, gradually increasing the size of the table until OOM. 2. Increased driver memory to 10gb, large enough for the same broadcast to succeed. 3. Repeated this job and tracked the peak memory usage that was written to event log. 4. After repeated experiments, witnessed that the median peak heap typical usage was tracked at <=5GiB. 5. Added my change, and decreased the heartbeat interval. 6. Re-ran same jobs with 10gb heap, and saw that the typical peak memory usage tracked was ~8GiB, more accurately reflecting the increased memory needs. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51885 from ForVic/vsunderl/driver_polling_interval. Authored-by: ForVic <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent 4441fa1 commit 04a40ab

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ class SparkContext(config: SparkConf) extends Logging {
614614
_heartbeater = new Heartbeater(
615615
() => SparkContext.this.reportHeartBeat(_executorMetricsSource),
616616
"driver-heartbeater",
617-
conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
617+
conf.get(DRIVER_METRICS_POLLING_INTERVAL))
618618
_heartbeater.start()
619619

620620
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,6 +1201,14 @@ package object config {
12011201
.checkValue(v => v >= 0, "The value should be a non-negative time value.")
12021202
.createWithDefaultString("0min")
12031203

1204+
private[spark] val DRIVER_METRICS_POLLING_INTERVAL =
1205+
ConfigBuilder("spark.driver.metrics.pollingInterval")
1206+
.doc("How often to collect driver metrics (in milliseconds). " +
1207+
"If unset, the polling is done at the executor heartbeat interval. " +
1208+
"If set, the polling is done at this interval.")
1209+
.version("4.1.0")
1210+
.fallbackConf(EXECUTOR_HEARTBEAT_INTERVAL)
1211+
12041212
private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress")
12051213
.doc("Address where to bind network listen sockets on the driver.")
12061214
.version("2.1.0")

0 commit comments

Comments
 (0)