Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 00c6d55

Browse files
committed
Added SParallelFlux.map
Signed-off-by: Winarto Zhao <[email protected]>
1 parent df385a5 commit 00c6d55

File tree

2 files changed

+26
-4
lines changed

2 files changed

+26
-4
lines changed

src/main/scala/reactor/core/scala/publisher/SParallelFlux.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,18 @@ class SParallelFlux[T] private(private val jParallelFlux: JParallelFlux[T]) exte
2121
*/
2222
final def as[U](converter: SParallelFlux[T] => U): U = jParallelFlux.as((t: JParallelFlux[T]) => converter(SParallelFlux(t)))
2323

24+
/**
25+
* Maps the source values on each 'rail' to another value.
26+
* <p>
27+
* Note that the same mapper function may be called from multiple threads
28+
* concurrently.
29+
*
30+
* @tparam U the output value type
31+
* @param mapper the mapper function turning Ts into Us.
32+
* @return the new [[SParallelFlux]] instance
33+
*/
34+
final def map[U](mapper: T => _ <: U) = SParallelFlux(jParallelFlux.map[U](mapper))
35+
2436
/**
2537
* Specifies where each 'rail' will observe its incoming values with possibly
2638
* work-stealing and a given prefetch amount.
Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package reactor.core.scala.publisher
22

3-
import org.mockito.Mockito
3+
import org.mockito.Mockito.{spy, times, verify}
44
import org.scalatest.{FreeSpec, Matchers}
55
import reactor.core.publisher.{Flux => JFlux, ParallelFlux => JParallelFlux}
66
import reactor.core.scheduler.Schedulers
@@ -10,23 +10,33 @@ class SParallelFluxTest extends FreeSpec with Matchers {
1010
"SParallelFlux" - {
1111
val data = Seq(1, 2, 3)
1212
val flux = Flux.just(data.head,data.tail:_*)
13+
val fluxParallel: SParallelFlux[Int] = flux.parallel()
1314
".asJava should convert as Java ParallelFlux" in {
14-
flux.parallel().asJava shouldBe a[JParallelFlux[Int]]
15+
fluxParallel.asJava shouldBe a[JParallelFlux[Int]]
1516
}
1617

1718
".apply should convert Java ParallelFlux into SParallelFlux" in {
1819
SParallelFlux(JFlux.just(1, 2, 3).parallel()).asJava shouldBe a[JParallelFlux[Int]]
1920
}
2021

22+
".map should map from T to U" in {
23+
val expected = data.map(_.toString)
24+
StepVerifier.create(fluxParallel.map(i => i.toString))
25+
.expectNextMatches(i => expected.contains(i))
26+
.expectNextMatches(i => expected.contains(i))
27+
.expectNextMatches(i => expected.contains(i))
28+
.verifyComplete()
29+
}
30+
2131
".runOn should run on different thread" in {
22-
val scheduler = Mockito.spy(Schedulers.parallel())
32+
val scheduler = spy(Schedulers.parallel())
2333
StepVerifier.create(flux.parallel(2).runOn(scheduler))
2434
.expectNextMatches(i => data.contains(i))
2535
.expectNextMatches(i => data.contains(i))
2636
.expectNextMatches(i => data.contains(i))
2737
.verifyComplete()
2838

29-
Mockito.verify(scheduler, Mockito.times(2)).createWorker()
39+
verify(scheduler, times(2)).createWorker()
3040
}
3141
}
3242
}

0 commit comments

Comments
 (0)