Skip to content

Commit ee5a5a0

Browse files
karth295srowen
authored andcommitted
[SPARK-21960][STREAMING] Spark Streaming Dynamic Allocation should respect spark.executor.instances
## What changes were proposed in this pull request? Removes check that `spark.executor.instances` is set to 0 when using Streaming DRA. ## How was this patch tested? Manual tests My only concern with this PR is that `spark.executor.instances` (or the actual initial number of executors that the cluster manager gives Spark) can be outside of `spark.streaming.dynamicAllocation.minExecutors` to `spark.streaming.dynamicAllocation.maxExecutors`. I don't see a good way around that, because this code only runs after the SparkContext has been created. Author: Karthik Palaniappan <[email protected]> Closes apache#19183 from karth295/master.
1 parent 0a0f68b commit ee5a5a0

File tree

1 file changed

+6
-7
lines changed

1 file changed

+6
-7
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.streaming.util.RecurringTimer
2626
import org.apache.spark.util.{Clock, Utils}
2727

2828
/**
29-
* Class that manages executor allocated to a StreamingContext, and dynamically request or kill
29+
* Class that manages executors allocated to a StreamingContext, and dynamically requests or kills
3030
* executors based on the statistics of the streaming computation. This is different from the core
3131
* dynamic allocation policy; the core policy relies on executors being idle for a while, but the
3232
* micro-batch model of streaming prevents any particular executors from being idle for a long
@@ -43,6 +43,10 @@ import org.apache.spark.util.{Clock, Utils}
4343
*
4444
* This features should ideally be used in conjunction with backpressure, as backpressure ensures
4545
* system stability, while executors are being readjusted.
46+
*
47+
* Note that an initial set of executors (spark.executor.instances) was allocated when the
48+
* SparkContext was created. This class scales executors up/down after the StreamingContext
49+
* has started.
4650
*/
4751
private[streaming] class ExecutorAllocationManager(
4852
client: ExecutorAllocationClient,
@@ -202,12 +206,7 @@ private[streaming] object ExecutorAllocationManager extends Logging {
202206
val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
203207

204208
def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
205-
val numExecutor = conf.getInt("spark.executor.instances", 0)
206209
val streamingDynamicAllocationEnabled = conf.getBoolean(ENABLED_KEY, false)
207-
if (numExecutor != 0 && streamingDynamicAllocationEnabled) {
208-
throw new IllegalArgumentException(
209-
"Dynamic Allocation for streaming cannot be enabled while spark.executor.instances is set.")
210-
}
211210
if (Utils.isDynamicAllocationEnabled(conf) && streamingDynamicAllocationEnabled) {
212211
throw new IllegalArgumentException(
213212
"""
@@ -217,7 +216,7 @@ private[streaming] object ExecutorAllocationManager extends Logging {
217216
""".stripMargin)
218217
}
219218
val testing = conf.getBoolean("spark.streaming.dynamicAllocation.testing", false)
220-
numExecutor == 0 && streamingDynamicAllocationEnabled && (!Utils.isLocalMaster(conf) || testing)
219+
streamingDynamicAllocationEnabled && (!Utils.isLocalMaster(conf) || testing)
221220
}
222221

223222
def createIfEnabled(

0 commit comments

Comments
 (0)