1717
1818package org .apache .openwhisk .core .database .test
1919
20- import scala .collection .parallel ._
2120import scala .concurrent .duration .DurationInt
22- import java .util .concurrent .ForkJoinPool
21+ import java .util .concurrent .Executors
22+
2323import org .junit .runner .RunWith
2424import org .scalatest .BeforeAndAfterEach
2525import org .scalatest .FlatSpec
@@ -32,9 +32,17 @@ import spray.json.JsString
3232import org .apache .openwhisk .common .TransactionId
3333import org .apache .openwhisk .utils .retry
3434
35- @ RunWith (classOf [JUnitRunner ])
36- class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with WskActorSystem with BeforeAndAfterEach {
35+ import scala .concurrent .ExecutionContext
3736
37+ @ RunWith (classOf [JUnitRunner ])
38+ class CacheConcurrencyTests
39+ extends FlatSpec
40+ with WskTestHelpers
41+ with WskActorSystem
42+ with BeforeAndAfterEach
43+ with ConcurrencyHelpers {
44+
45+ val timeout = 5 .minutes
3846 println(s " Running tests on # proc: ${Runtime .getRuntime.availableProcessors()}" )
3947
4048 implicit private val transId = TransactionId .testing
@@ -43,15 +51,14 @@ class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with WskActorSy
4351
4452 val nExternalIters = 1
4553 val nInternalIters = 5
46- val nThreads = nInternalIters * 30
54+ val nThreads = nInternalIters * 30 // The maximum number of tasks running in parallel at any given time
55+ val parallelismExecutionContext = ExecutionContext .fromExecutor(Executors .newFixedThreadPool(nThreads))
4756
48- val parallel = (1 to nInternalIters).par
49- parallel.tasksupport = new ForkJoinTaskSupport (new ForkJoinPool (nThreads))
50-
51- def run [W ](phase : String )(block : String => W ) = parallel.map { i =>
52- val name = s " testy ${i}"
53- withClue(s " $phase: failed for $name" ) { (name, block(name)) }
54- }
57+ def run [W ](phase : String )(block : String => W ) =
58+ concurrently((1 to nInternalIters), timeout) { i =>
59+ val name = s " testy ${i}"
60+ withClue(s " $phase: failed for $name" ) { (name, block(name)) }
61+ }(parallelismExecutionContext)
5562
5663 override def beforeEach () = {
5764 run(" pre-test sanitize" ) { name =>
@@ -79,9 +86,7 @@ class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with WskActorSy
7986
8087 run(" delete+get" ) { name =>
8188 // run 30 operations in parallel: 15 get, 1 delete, 14 more get
82- val para = (1 to 30 ).par
83- para.tasksupport = new ForkJoinTaskSupport (new ForkJoinPool (nThreads))
84- para.map { i =>
89+ concurrently((1 to 30 ), timeout) { i =>
8590 if (i != 16 ) {
8691 val rr = wsk.action.get(name, expectedExitCode = DONTCARE_EXIT )
8792 withClue(s " expecting get to either succeed or fail with not found: $rr" ) {
@@ -91,7 +96,7 @@ class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with WskActorSy
9196 } else {
9297 wsk.action.delete(name)
9398 }
94- }
99+ }(parallelismExecutionContext)
95100 }
96101
97102 // Give some time to replicate the state between the controllers
@@ -117,9 +122,7 @@ class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with WskActorSy
117122
118123 run(" update+get" ) { name =>
119124 // run 30 operations in parallel: 15 get, 1 update, 14 more get
120- val para = (1 to 30 ).par
121- para.tasksupport = new ForkJoinTaskSupport (new ForkJoinPool (nThreads))
122- para.map { i =>
125+ concurrently((1 to 30 ), timeout) { i =>
123126 if (i != 16 ) {
124127 val rr = wsk.action.get(name, expectedExitCode = DONTCARE_EXIT )
125128 withClue(s " expecting get to either succeed or fail with not found: $rr" ) {
@@ -129,7 +132,7 @@ class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with WskActorSy
129132 } else {
130133 wsk.action.create(name, None , parameters = Map (" color" -> JsString (" blue" )), update = true )
131134 }
132- }
135+ }(parallelismExecutionContext)
133136 }
134137
135138 // All controllers should have the correct action
0 commit comments