Skip to content

Commit 4120a17

Browse files
杨治国10192065Robert Kruszewski
authored andcommitted
[SPARK-21225][CORE] Considering CPUS_PER_TASK when allocating task slots for each WorkerOffer
JIRA Issue:https://issues.apache.org/jira/browse/SPARK-21225 In the function "resourceOffers", It declare a variable "tasks" for storage the tasks which have allocated a executor. It declared like this: `val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))` But, I think this code only conside a situation for that one task per core. If the user set "spark.task.cpus" as 2 or 3, It really don't need so much Mem. I think It can motify as follow: val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) to instead. Motify like this the other earning is that it's more easy to understand the way how the tasks allocate offers. Author: 杨治国10192065 <[email protected]> Closes apache#18435 from JackYangzg/motifyTaskCoreDisp.
1 parent 5ee3b65 commit 4120a17

File tree

1 file changed

+1
-1
lines changed

1 file changed

+1
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ private[spark] class TaskSchedulerImpl(
345345

346346
val shuffledOffers = shuffleOffers(filteredOffers)
347347
// Build a list of tasks to assign to each worker.
348-
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
348+
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
349349
val availableCpus = shuffledOffers.map(o => o.cores).toArray
350350
val sortedTaskSets = rootPool.getSortedTaskSetQueue
351351
for (taskSet <- sortedTaskSets) {

0 commit comments

Comments
 (0)