Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit b2c1a6d

Browse files
committed
Merge branch 'master' into sreactor
2 parents 169a303 + ef7fc3c commit b2c1a6d

File tree

2 files changed

+202
-4
lines changed

2 files changed

+202
-4
lines changed

src/main/scala/reactor/core/scala/publisher/SParallelFlux.scala

Lines changed: 129 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ package reactor.core.scala.publisher
33
import java.util
44
import java.util.function.Supplier
55

6-
import org.reactivestreams.Publisher
6+
import org.reactivestreams.{Publisher, Subscriber, Subscription}
7+
import reactor.core.Disposable
78
import reactor.core.publisher.{ParallelFlux => JParallelFlux}
9+
import reactor.core.scheduler.Scheduler
810
import reactor.util.concurrent.Queues
911

10-
class SParallelFlux[T] private(private val jParallelFlux: JParallelFlux[T]) {
12+
class SParallelFlux[T] private(private val jParallelFlux: JParallelFlux[T]) extends Publisher[T]{
1113

1214
/**
1315
* Perform a fluent transformation to a value via a converter function which receives
@@ -19,6 +21,131 @@ class SParallelFlux[T] private(private val jParallelFlux: JParallelFlux[T]) {
1921
*/
2022
final def as[U](converter: SParallelFlux[T] => U): U = jParallelFlux.as((t: JParallelFlux[T]) => converter(SParallelFlux(t)))
2123

24+
/**
25+
* Filters the source values on each 'rail'.
26+
* <p>
27+
* Note that the same predicate may be called from multiple threads concurrently.
28+
*
29+
* @param predicate the function returning true to keep a value or false to drop a
30+
* value
31+
* @return the new [[SParallelFlux]] instance
32+
*/
33+
final def filter(predicate: SPredicate[T]) = SParallelFlux(jParallelFlux.filter(predicate))
34+
35+
/**
36+
* Maps the source values on each 'rail' to another value.
37+
* <p>
38+
* Note that the same mapper function may be called from multiple threads
39+
* concurrently.
40+
*
41+
* @tparam U the output value type
42+
* @param mapper the mapper function turning Ts into Us.
43+
* @return the new [[SParallelFlux]] instance
44+
*/
45+
final def map[U](mapper: T => _ <: U) = SParallelFlux(jParallelFlux.map[U](mapper))
46+
47+
/**
48+
* Reduces all values within a 'rail' and across 'rails' with a reducer function into
49+
* a single sequential value.
50+
* <p>
51+
* Note that the same reducer function may be called from multiple threads
52+
* concurrently.
53+
*
54+
* @param reducer the function to reduce two values into one.
55+
* @return the new Mono instance emitting the reduced value or empty if the
56+
* [[SParallelFlux]] was empty
57+
*/
58+
final def reduce(reducer: (T, T) => T) = Mono(jParallelFlux.reduce(reducer))
59+
60+
/**
61+
* Reduces all values within a 'rail' to a single value (with a possibly different
62+
* type) via a reducer function that is initialized on each rail from an
63+
* initialSupplier value.
64+
* <p>
65+
* Note that the same mapper function may be called from multiple threads
66+
* concurrently.
67+
*
68+
* @tparam R the reduced output type
69+
* @param initialSupplier the supplier for the initial value
70+
* @param reducer the function to reduce a previous output of reduce (or the initial
71+
* value supplied) with a current source value.
72+
* @return the new [[SParallelFlux]] instance
73+
*/
74+
final def reduce[R](initialSupplier: () => R, reducer: (R, T) => R) = SParallelFlux(jParallelFlux.reduce[R](initialSupplier, reducer))
75+
76+
/**
77+
* Specifies where each 'rail' will observe its incoming values with possibly
78+
* work-stealing and a given prefetch amount.
79+
* <p>
80+
* This operator uses the default prefetch size returned by [[Queues.SMALL_BUFFER_SIZE]].
81+
* <p>
82+
* The operator will call [[Scheduler.createWorker()]] as many times as this
83+
* ParallelFlux's parallelism level is.
84+
* <p>
85+
* No assumptions are made about the Scheduler's parallelism level, if the Scheduler's
86+
* parallelism level is lower than the ParallelFlux's, some rails may end up on
87+
* the same thread/worker.
88+
* <p>
89+
* This operator doesn't require the Scheduler to be trampolining as it does its own
90+
* built-in trampolining logic.
91+
*
92+
* @param scheduler the scheduler to use that rail's worker has run out of work.
93+
* @param prefetch the number of values to request on each 'rail' from the source
94+
* @return the new [[SParallelFlux]] instance
95+
*/
96+
final def runOn(scheduler: Scheduler, prefetch: Int = Queues.SMALL_BUFFER_SIZE) = SParallelFlux(jParallelFlux.runOn(scheduler, prefetch))
97+
98+
99+
/**
100+
* Merges the values from each 'rail' in a round-robin or same-order fashion and
101+
* exposes it as a regular Publisher sequence, running with a give prefetch value for
102+
* the rails.
103+
*
104+
* @param prefetch the prefetch amount to use for each rail
105+
* @return the new Flux instance
106+
*/
107+
final def sequential(prefetch: Int = Queues.SMALL_BUFFER_SIZE) = Flux(jParallelFlux.sequential(prefetch))
108+
109+
/**
110+
* Subscribes to this [[SParallelFlux]] by providing an onNext, onError,
111+
* onComplete and onSubscribe callback and triggers the execution chain for all
112+
* 'rails'.
113+
*
114+
* @param onNext consumer of onNext signals
115+
* @param onError consumer of error signal
116+
* @param onComplete callback on completion signal
117+
* @param onSubscribe consumer of the subscription signal
118+
*/
119+
final def subscribe(onNext: Option[T => Unit] = None,
120+
onError: Option[Throwable => Unit] = None,
121+
onComplete: Option[() => Unit] = None,
122+
onSubscribe: Option[Subscription => Unit] = None): Disposable = (onNext, onError, onComplete, onSubscribe) match {
123+
case (Some(fn), Some(fe), Some(fc), Some(fs)) => jParallelFlux.subscribe(fn, fe, fc, fs)
124+
case (Some(fn), Some(fe), Some(fc), None) => jParallelFlux.subscribe(fn, fe, fc)
125+
case (Some(fn), Some(fe), None, Some(fs)) => jParallelFlux.subscribe(fn, fe, null, fs)
126+
case (Some(fn), Some(fe), None, None) => jParallelFlux.subscribe(fn, fe)
127+
case (Some(fn), None, Some(fe), Some(fs)) => jParallelFlux.subscribe(fn, null, fe, fs)
128+
case (Some(fn), None, Some(fe), None) => jParallelFlux.subscribe(fn, null, fe, null)
129+
case (Some(fn), None, None, Some(fs)) => jParallelFlux.subscribe(fn, null, null, fs)
130+
case (Some(fn), None, None, None) => jParallelFlux.subscribe(fn)
131+
case (None, Some(fe), Some(fc), Some(fs)) => jParallelFlux.subscribe(null, fe, fc, fs)
132+
case (None, Some(fe), Some(fc), None) => jParallelFlux.subscribe(null, fe, fc)
133+
case (None, Some(fe), None, Some(fs)) => jParallelFlux.subscribe(null, fe, null, fs)
134+
case (None, Some(fe), None, None) => jParallelFlux.subscribe(null, fe)
135+
case (None, None, Some(fc), Some(fs)) => jParallelFlux.subscribe(null, null, fc, fs)
136+
case (None, None, Some(fc), None) => jParallelFlux.subscribe(null, null, fc, null)
137+
case (None, None, None, Some(fs)) => jParallelFlux.subscribe(null, null, null, fs)
138+
case (None, None, None, None) => jParallelFlux.subscribe()
139+
}
140+
141+
/**
142+
* Merge the rails into a [[Flux.sequential]] Flux and
143+
* [[Flux#subscribe(Subscriber) subscribe]] to said Flux.
144+
*
145+
* @param s the subscriber to use on [[#sequential()]] Flux
146+
*/
147+
override def subscribe(s: Subscriber[_ >: T]): Unit = jParallelFlux.subscribe(s)
148+
22149
def asJava: JParallelFlux[T] = jParallelFlux
23150
}
24151

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,87 @@
11
package reactor.core.scala.publisher
22

3+
import org.mockito.Mockito.{spy, times, verify}
34
import org.scalatest.{FreeSpec, Matchers}
4-
import reactor.core.publisher.{ParallelFlux => JParallelFlux, Flux => JFlux}
5+
import reactor.core.publisher.{Flux => JFlux, ParallelFlux => JParallelFlux}
6+
import reactor.core.scheduler.Schedulers
7+
import reactor.test.StepVerifier
58

69
class SParallelFluxTest extends FreeSpec with Matchers {
710
"SParallelFlux" - {
11+
val data = Seq(1, 2, 3)
12+
val flux = Flux.just(data.head, data.tail: _*)
13+
val fluxParallel: SParallelFlux[Int] = flux.parallel()
814
".asJava should convert as Java ParallelFlux" in {
9-
Flux.just(1, 2, 3).parallel().asJava shouldBe a[JParallelFlux[Int]]
15+
fluxParallel.asJava shouldBe a[JParallelFlux[Int]]
1016
}
1117

1218
".apply should convert Java ParallelFlux into SParallelFlux" in {
1319
SParallelFlux(JFlux.just(1, 2, 3).parallel()).asJava shouldBe a[JParallelFlux[Int]]
1420
}
21+
22+
".filter should filter elements" in {
23+
StepVerifier.create(fluxParallel.filter((i: Int) => i % 2 == 0))
24+
.expectNext(2)
25+
.verifyComplete()
26+
}
27+
28+
".map should map from T to U" in {
29+
val expected = data.map(_.toString)
30+
StepVerifier.create(fluxParallel.map(i => i.toString))
31+
.expectNextMatches((i: String) => expected.contains(i))
32+
.expectNextMatches((i: String) => expected.contains(i))
33+
.expectNextMatches((i: String) => expected.contains(i))
34+
.verifyComplete()
35+
}
36+
37+
".reduce should aggregate the values" - {
38+
"without initial supplier" in {
39+
val mono = fluxParallel.reduce(_ + _)
40+
StepVerifier.create(mono)
41+
.expectNext(6)
42+
.verifyComplete()
43+
}
44+
"with initial value should aggregate the values with initial one" ignore {
45+
val parallelFlux = fluxParallel.reduce[String](() => "0", (agg, v) => s"$agg-${v.toString}")
46+
StepVerifier.create(parallelFlux)
47+
.expectNext("0-1")
48+
.expectNext("0-2")
49+
.expectNext("0-3")
50+
.expectNext("0")
51+
.expectNext("0")
52+
.expectNext("0")
53+
.expectNext("0")
54+
.expectNext("0")
55+
.expectNext("0")
56+
.expectNext("0")
57+
.expectNext("0")
58+
.expectNext("0")
59+
.expectNext("0")
60+
.expectNext("0")
61+
.expectNext("0")
62+
.expectNext("0")
63+
.verifyComplete()
64+
}
65+
}
66+
67+
".sequential should merge the rails" in {
68+
val expected = data.map(_.toString)
69+
StepVerifier.create(fluxParallel.map(i => i.toString).sequential())
70+
.expectNextMatches((i: String) => expected.contains(i))
71+
.expectNextMatches((i: String) => expected.contains(i))
72+
.expectNextMatches((i: String) => expected.contains(i))
73+
.verifyComplete()
74+
}
75+
76+
".runOn should run on different thread" in {
77+
val scheduler = spy(Schedulers.parallel())
78+
StepVerifier.create(flux.parallel(2).runOn(scheduler))
79+
.expectNextMatches((i: Int) => data.contains(i))
80+
.expectNextMatches((i: Int) => data.contains(i))
81+
.expectNextMatches((i: Int) => data.contains(i))
82+
.verifyComplete()
83+
84+
verify(scheduler, times(2)).createWorker()
85+
}
1586
}
1687
}

0 commit comments

Comments
 (0)