Skip to content

Commit 349a63c

Browse files
authored
Improve bin packing and reduce scala parallelism (apache-spark-on-k8s#333)
1 parent 0d41eee commit 349a63c

File tree

2 files changed

+25
-11
lines changed

2 files changed

+25
-11
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ jobs:
247247
run-scala-tests:
248248
<<: *test-defaults
249249
# project/CirclePlugin.scala does its own test splitting in SBT based on CIRCLE_NODE_INDEX, CIRCLE_NODE_TOTAL
250-
parallelism: 12
250+
parallelism: 9
251251
# Spark runs a lot of tests in parallel, we need 16 GB of RAM for this
252252
resource_class: xlarge
253253
steps:

project/CirclePlugin.scala

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import sbt.Keys._
2626
import sbt._
2727
import sbt.plugins.JvmPlugin
2828
import scalaz.Dequeue
29+
import scalaz.Heap
30+
import scalaz.Order
2931

3032
//noinspection ScalaStyle
3133
object CirclePlugin extends AutoPlugin {
@@ -149,12 +151,19 @@ object CirclePlugin extends AutoPlugin {
149151
val tests = Dequeue[(TestKey, Double)](
150152
allTestsTimings.toIndexedSeq.sortBy { case (key, runTime) => (runTime, key) } : _*)
151153

154+
case class Group(tests: List[TestKey], runTime: Double)
155+
156+
implicit val groupOrder: Order[Group] = {
157+
import scalaz.std.anyVal._
158+
Order.orderBy(_.runTime)
159+
}
160+
152161
@tailrec
153162
def process(tests: Dequeue[(TestKey, Double)],
154163
soFar: Double = 0d,
155164
takeLeft: Boolean = true,
156165
acc: List[TestKey] = Nil,
157-
groups: List[List[TestKey]] = Nil): List[List[TestKey]] = {
166+
groups: List[Group] = Nil): List[Group] = {
158167

159168
if (groups.size == totalNodes || tests.isEmpty) {
160169
// Short circuit the logic if we've just completed the last group
@@ -163,12 +172,17 @@ object CirclePlugin extends AutoPlugin {
163172
return if (tests.isEmpty) {
164173
groups
165174
} else {
166-
// Fit all remaining tests in the last issued bucket.
167-
val lastGroup +: restGroups = groups
168175
val toFit = tests.toStream.map(_._1).force
169-
log.info(s"Fitting remaining tests into first bucket (which already has " +
170-
s"${lastGroup.size} tests): $toFit")
171-
(toFit ++: lastGroup) :: restGroups
176+
log.info(s"Fitting remaining tests into smallest buckets: $toFit")
177+
// Fit all remaining tests into the least used buckets.
178+
// import needed for creating Heap from List (needs Foldable[List[_]])
179+
import scalaz.std.list._
180+
tests.foldLeft(Heap.fromData(groups)) { case(heap, (test, runTime)) =>
181+
heap.uncons match {
182+
case Some((group, rest)) =>
183+
rest.insert(group.copy(test :: group.tests, runTime + group.runTime))
184+
}
185+
}.toList
172186
}
173187
}
174188

@@ -192,7 +206,7 @@ object CirclePlugin extends AutoPlugin {
192206
case x@TestCandidate((_, runTime), _, _) if soFar + runTime <= timePerNode => x
193207
} match {
194208
case None =>
195-
process(tests, 0d, takeLeft = true, Nil, acc :: groups)
209+
process(tests, 0d, takeLeft = true, Nil, Group(acc, soFar) :: groups)
196210
case Some(TestCandidate((key, runTime), rest, fromLeft)) =>
197211
process(rest, soFar + runTime, fromLeft, key :: acc, groups)
198212
}
@@ -202,11 +216,11 @@ object CirclePlugin extends AutoPlugin {
202216
val rootTarget = (target in LocalRootProject).value
203217
val bucketsFile = rootTarget / "tests-by-bucket.json"
204218
log.info(s"Saving test distribution into $totalNodes buckets to: $bucketsFile")
205-
mapper.writeValue(bucketsFile, buckets)
206-
val timingsPerBucket = buckets.map(_.iterator.map(allTestsTimings.apply).sum)
219+
mapper.writeValue(bucketsFile, buckets.map(_.tests))
220+
val timingsPerBucket = buckets.map(_.tests.iterator.map(allTestsTimings.apply).sum)
207221
log.info(s"Estimated test timings per bucket: $timingsPerBucket")
208222

209-
val bucket = buckets.lift.apply(index).getOrElse(Nil)
223+
val bucket = buckets.map(_.tests).lift.apply(index).getOrElse(Nil)
210224

211225
val groupedByProject = bucket.flatMap(testsByKey.apply)
212226
.groupBy(_.project)

0 commit comments

Comments
 (0)