Skip to content

Commit 2c33e81

Browse files
Replace parallel collections with future based concurrency in tests. (#4841)
Scala 2.13 puts parallel collections into a separate module that's not compatible with Scala 2.12. To avoid having to work around things and to keep cross-compilation compatibility this just exchanges the approach for concurrency in tests to not use parallel collections at all.
1 parent e3c7a13 commit 2c33e81

File tree

8 files changed

+115
-65
lines changed

8 files changed

+115
-65
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package common
19+
20+
import scala.concurrent.duration._
21+
import scala.concurrent.{Await, ExecutionContext, Future}
22+
23+
trait ConcurrencyHelpers {
24+
def concurrently[T](times: Int, timeout: FiniteDuration)(op: => T)(implicit ec: ExecutionContext): Iterable[T] =
25+
Await.result(Future.sequence((1 to times).map(_ => Future(op))), timeout)
26+
27+
def concurrently[B, T](over: Iterable[B], timeout: FiniteDuration)(op: B => T)(
28+
implicit ec: ExecutionContext): Iterable[T] =
29+
Await.result(Future.sequence(over.map(v => Future(op(v)))), timeout)
30+
}

tests/src/test/scala/limits/ThrottleTests.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ import java.time.Instant
2121

2222
import akka.http.scaladsl.model.StatusCodes.TooManyRequests
2323

24-
import scala.collection.parallel.immutable.ParSeq
25-
import scala.concurrent.Future
26-
import scala.concurrent.Promise
24+
import scala.concurrent.{Await, Future, Promise}
2725
import scala.concurrent.duration._
2826
import org.junit.runner.RunWith
2927
import org.scalatest.BeforeAndAfterAll
@@ -102,10 +100,15 @@ class ThrottleTests
102100
*
103101
* @param results the sequence of results from invocations or firings
104102
*/
105-
def waitForActivations(results: ParSeq[RunResult]) = results.foreach { result =>
106-
if (result.exitCode == SUCCESS_EXIT) {
107-
withActivation(wsk.activation, result, totalWait = 5.minutes)(identity)
103+
def waitForActivations(results: Seq[RunResult]) = {
104+
val done = results.map { result =>
105+
if (result.exitCode == SUCCESS_EXIT) {
106+
Future(withActivation(wsk.activation, result, totalWait = 5.minutes)(_ => ()))
107+
} else {
108+
Future.successful(())
109+
}
108110
}
111+
Await.result(Future.sequence(done), 5.minutes)
109112
}
110113

111114
/**
@@ -201,7 +204,7 @@ class ThrottleTests
201204
// wait for the activations last, if these fail, the throttle should be settled
202205
// and this gives the activations time to complete and may avoid unnecessarily polling
203206
println("waiting for activations to complete")
204-
waitForActivations(results.par)
207+
waitForActivations(results)
205208
}
206209

207210
it should "throttle multiple activations of one trigger" in withAssetCleaner(wskprops) { (wp, assetHelper) =>
@@ -286,7 +289,7 @@ class ThrottleTests
286289
// wait for the activations last, giving the activations time to complete and
287290
// may avoid unnecessarily polling; if these fail, the throttle may not be settled
288291
println("waiting for activations to complete")
289-
waitForActivations(combinedResults.par)
292+
waitForActivations(combinedResults)
290293
}
291294
}
292295

tests/src/test/scala/org/apache/openwhisk/common/ForcibleSemaphoreTests.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,19 @@
1717

1818
package org.apache.openwhisk.common
1919

20+
import common.ConcurrencyHelpers
21+
import org.apache.openwhisk.utils.ExecutionContextFactory
2022
import org.junit.runner.RunWith
2123
import org.scalatest.{FlatSpec, Matchers}
2224
import org.scalatest.junit.JUnitRunner
2325

26+
import scala.concurrent.duration.DurationInt
27+
2428
@RunWith(classOf[JUnitRunner])
25-
class ForcibleSemaphoreTests extends FlatSpec with Matchers {
29+
class ForcibleSemaphoreTests extends FlatSpec with Matchers with ConcurrencyHelpers {
30+
// use an infinite thread pool to allow for maximum concurrency
31+
implicit val executionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
32+
2633
behavior of "ForcableSemaphore"
2734

2835
it should "not allow to acquire, force or release negative amounts of permits" in {
@@ -79,7 +86,7 @@ class ForcibleSemaphoreTests extends FlatSpec with Matchers {
7986
(0 until 100).foreach { _ =>
8087
val s = new ForcibleSemaphore(32)
8188
// try to acquire more permits than allowed in parallel
82-
val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq
89+
val acquires = concurrently(64, 1.minute)(s.tryAcquire())
8390

8491
val result = Seq.fill(32)(true) ++ Seq.fill(32)(false)
8592
acquires should contain theSameElementsAs result

tests/src/test/scala/org/apache/openwhisk/common/NestedSemaphoreTests.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,21 @@
1717

1818
package org.apache.openwhisk.common
1919

20+
import common.ConcurrencyHelpers
21+
import org.apache.openwhisk.utils.ExecutionContextFactory
2022
import org.junit.runner.RunWith
2123
import org.scalatest.FlatSpec
2224
import org.scalatest.Matchers
2325
import org.scalatest.junit.JUnitRunner
2426

27+
import scala.concurrent.duration.DurationInt
28+
2529
@RunWith(classOf[JUnitRunner])
26-
class NestedSemaphoreTests extends FlatSpec with Matchers {
30+
class NestedSemaphoreTests extends FlatSpec with Matchers with ConcurrencyHelpers {
31+
// use an infinite thread pool to allow for maximum concurrency
32+
implicit val executionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
33+
val acquireTimeout = 1.minute
34+
2735
behavior of "NestedSemaphore"
2836

2937
it should "allow acquire of concurrency permits before acquire of memory permits" in {
@@ -34,16 +42,17 @@ class NestedSemaphoreTests extends FlatSpec with Matchers {
3442
val actionConcurrency = 5
3543
val actionMemory = 3
3644
//use all concurrency on a single slot
37-
(1 to 5).par.map { i =>
38-
s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory) shouldBe true
39-
}
45+
concurrently(5, acquireTimeout) {
46+
s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory)
47+
} should contain only true
4048
s.availablePermits shouldBe 20 - 3 //we used a single container (memory == 3)
4149
s.concurrentState(actionId).availablePermits shouldBe 0
4250

4351
//use up all the remaining memory (17) and concurrency slots (17 / 3 * 5 = 25)
44-
(1 to 25).par.map { i =>
45-
s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory) shouldBe true
46-
}
52+
concurrently(25, acquireTimeout) {
53+
s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory)
54+
} should contain only true
55+
4756
s.availablePermits shouldBe 2 //we used 18 (20/3 = 6, 6*3=18)
4857
s.concurrentState(actionId).availablePermits shouldBe 0
4958
s.tryAcquireConcurrent("action1", actionConcurrency, actionMemory) shouldBe false
@@ -55,7 +64,7 @@ class NestedSemaphoreTests extends FlatSpec with Matchers {
5564
(0 until 100).foreach { _ =>
5665
val s = new NestedSemaphore(32)
5766
// try to acquire more permits than allowed in parallel
58-
val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq
67+
val acquires = concurrently(64, acquireTimeout)(s.tryAcquire())
5968

6069
val result = Seq.fill(32)(true) ++ Seq.fill(32)(false)
6170
acquires should contain theSameElementsAs result

tests/src/test/scala/org/apache/openwhisk/common/ResizableSemaphoreTests.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,21 @@
1717

1818
package org.apache.openwhisk.common
1919

20+
import common.ConcurrencyHelpers
21+
import org.apache.openwhisk.utils.ExecutionContextFactory
2022
import org.junit.runner.RunWith
2123
import org.scalatest.FlatSpec
2224
import org.scalatest.Matchers
2325
import org.scalatest.junit.JUnitRunner
2426

27+
import scala.concurrent.duration.DurationInt
28+
2529
@RunWith(classOf[JUnitRunner])
26-
class ResizableSemaphoreTests extends FlatSpec with Matchers {
30+
class ResizableSemaphoreTests extends FlatSpec with Matchers with ConcurrencyHelpers {
31+
// use an infinite thread pool to allow for maximum concurrency
32+
implicit val executionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
33+
val acquireTimeout = 1.minute
34+
2735
behavior of "ResizableSemaphore"
2836

2937
it should "not allow to acquire, force or release negative amounts of permits" in {
@@ -163,7 +171,7 @@ class ResizableSemaphoreTests extends FlatSpec with Matchers {
163171
(0 until 100).foreach { _ =>
164172
val s = new ResizableSemaphore(32, 35)
165173
// try to acquire more permits than allowed in parallel
166-
val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq
174+
val acquires = concurrently(64, acquireTimeout)(s.tryAcquire())
167175

168176
val result = Seq.fill(32)(true) ++ Seq.fill(32)(false)
169177
acquires should contain theSameElementsAs result
@@ -173,11 +181,10 @@ class ResizableSemaphoreTests extends FlatSpec with Matchers {
173181
it should "release permits even under concurrent load" in {
174182
val s = new ResizableSemaphore(32, 35)
175183
// try to acquire more permits than allowed in parallel
176-
val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq
184+
concurrently(64, acquireTimeout)(s.tryAcquire())
185+
concurrently(32, acquireTimeout)(s.release(1, true))
177186

178-
(0 until 32).par.map(_ => s.release(1, true))
179187
s.counter shouldBe 0
180-
181188
}
182189

183190
}

tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1161,7 +1161,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
11611161
action.publish,
11621162
action.annotations ++ systemAnnotations(kind))
11631163

1164-
(0 until 5).par.map { i =>
1164+
(0 until 5).map { i =>
11651165
Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check {
11661166
status should be(OK)
11671167
val response = responseAs[WhiskAction]

tests/src/test/scala/org/apache/openwhisk/core/limits/MaxActionDurationTests.scala

Lines changed: 32 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,12 @@
1818
package org.apache.openwhisk.core.limits
1919

2020
import java.io.File
21-
import scala.concurrent.duration.DurationInt
2221

22+
import scala.concurrent.duration.DurationInt
2323
import org.junit.runner.RunWith
2424
import org.scalatest.junit.JUnitRunner
25-
26-
import common.TestHelpers
27-
import common.TestUtils
25+
import common.{ConcurrencyHelpers, TestHelpers, TestUtils, WskActorSystem, WskProps, WskTestHelpers}
2826
import common.rest.WskRestOperations
29-
import common.WskProps
30-
import common.WskTestHelpers
31-
import common.WskActorSystem
32-
3327
import org.apache.openwhisk.core.entity._
3428
import spray.json.DefaultJsonProtocol._
3529
import spray.json._
@@ -41,7 +35,7 @@ import org.scalatest.tagobjects.Slow
4135
* Tests for action duration limits. These tests require a deployed backend.
4236
*/
4337
@RunWith(classOf[JUnitRunner])
44-
class MaxActionDurationTests extends TestHelpers with WskTestHelpers with WskActorSystem {
38+
class MaxActionDurationTests extends TestHelpers with WskTestHelpers with WskActorSystem with ConcurrencyHelpers {
4539

4640
implicit val wskprops = WskProps()
4741
val wsk = new WskRestOperations
@@ -65,41 +59,41 @@ class MaxActionDurationTests extends TestHelpers with WskTestHelpers with WskAct
6559
"node-, python, and java-action" should s"run up to the max allowed duration (${TimeLimit.MAX_DURATION})" taggedAs (Slow) in withAssetCleaner(
6660
wskprops) { (wp, assetHelper) =>
6761
// When you add more runtimes, keep in mind, how many actions can be processed in parallel by the Invokers!
68-
Map("node" -> "helloDeadline.js", "python" -> "sleep.py", "java" -> "sleep.jar")
62+
val runtimes = Map("node" -> "helloDeadline.js", "python" -> "sleep.py", "java" -> "sleep.jar")
6963
.filter {
7064
case (_, name) =>
7165
new File(TestUtils.getTestActionFilename(name)).exists()
7266
}
73-
.par
74-
.map {
75-
case (k, name) =>
76-
println(s"Testing action kind '${k}' with action '${name}'")
77-
assetHelper.withCleaner(wsk.action, name) { (action, _) =>
78-
val main = if (k == "java") Some("Sleep") else None
79-
action.create(
80-
name,
81-
Some(TestUtils.getTestActionFilename(name)),
82-
timeout = Some(TimeLimit.MAX_DURATION),
83-
main = main)
84-
}
8567

86-
val run = wsk.action.invoke(
68+
concurrently(runtimes.toSeq, TimeLimit.MAX_DURATION + 2.minutes) {
69+
case (k, name) =>
70+
println(s"Testing action kind '${k}' with action '${name}'")
71+
assetHelper.withCleaner(wsk.action, name) { (action, _) =>
72+
val main = if (k == "java") Some("Sleep") else None
73+
action.create(
8774
name,
88-
Map("forceHang" -> true.toJson, "sleepTimeInMs" -> (TimeLimit.MAX_DURATION + 30.seconds).toMillis.toJson))
89-
withActivation(
90-
wsk.activation,
91-
run,
92-
initialWait = 1.minute,
93-
pollPeriod = 1.minute,
94-
totalWait = TimeLimit.MAX_DURATION + 2.minutes) { activation =>
95-
withClue("Activation result not as expected:") {
96-
activation.response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.DeveloperError)
97-
activation.response.result shouldBe Some(
98-
JsObject("error" -> Messages.timedoutActivation(TimeLimit.MAX_DURATION, init = false).toJson))
99-
activation.duration.toInt should be >= TimeLimit.MAX_DURATION.toMillis.toInt
100-
}
75+
Some(TestUtils.getTestActionFilename(name)),
76+
timeout = Some(TimeLimit.MAX_DURATION),
77+
main = main)
78+
}
79+
80+
val run = wsk.action.invoke(
81+
name,
82+
Map("forceHang" -> true.toJson, "sleepTimeInMs" -> (TimeLimit.MAX_DURATION + 30.seconds).toMillis.toJson))
83+
84+
withActivation(
85+
wsk.activation,
86+
run,
87+
initialWait = 1.minute,
88+
pollPeriod = 1.minute,
89+
totalWait = TimeLimit.MAX_DURATION + 2.minutes) { activation =>
90+
withClue("Activation result not as expected:") {
91+
activation.response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.DeveloperError)
92+
activation.response.result shouldBe Some(
93+
JsObject("error" -> Messages.timedoutActivation(TimeLimit.MAX_DURATION, init = false).toJson))
94+
activation.duration.toInt should be >= TimeLimit.MAX_DURATION.toMillis.toInt
10195
}
102-
() // explicitly map to Unit
103-
}
96+
}
97+
}
10498
}
10599
}

tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ class ShardingContainerPoolBalancerTests
497497
val stepSize = stepSizes(hash % stepSizes.size)
498498
val uuid = UUID()
499499
//initiate activation
500-
val published = (0 until numActivations).par.map { _ =>
500+
val published = (0 until numActivations).map { _ =>
501501
val aid = ActivationId.generate()
502502
val msg = ActivationMessage(
503503
TransactionId.testing,
@@ -545,12 +545,12 @@ class ShardingContainerPoolBalancerTests
545545
}
546546

547547
//complete all
548-
val acks = ids.par.map { aid =>
548+
val acks = ids.map { aid =>
549549
val invoker = balancer.activationSlots(aid).invokerName
550550
completeActivation(invoker, balancer, aid)
551551
}
552552

553-
Await.ready(Future.sequence(acks.toList), 10.seconds)
553+
Await.ready(Future.sequence(acks), 10.seconds)
554554

555555
//verify invokers go back to unused state
556556
invokers.foreach { i =>

0 commit comments

Comments
 (0)