Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 6b4d8cd

Browse files
committed
Merge branch 'master' into sreactor
2 parents 7c4b983 + 84aa58d commit 6b4d8cd

File tree

4 files changed

+76
-17
lines changed

4 files changed

+76
-17
lines changed

src/main/scala/reactor/core/scala/publisher/Flux.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2392,9 +2392,9 @@ class Flux[T] private[publisher](private[publisher] val jFlux: JFlux[T])
23922392
* <p>
23932393
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.0.5.RELEASE/src/docs/marble/parallel.png" alt="">
23942394
*
2395-
* @return a new [[ParallelFlux]] instance
2395+
* @return a new [[SParallelFlux]] instance
23962396
*/
2397-
final def parallel() = ParallelFlux(jFlux.parallel())
2397+
final def parallel() = SParallelFlux(jFlux.parallel())
23982398

23992399
/**
24002400
* Prepare to consume this [[Flux]] on parallelism number of 'rails'
@@ -2404,9 +2404,9 @@ class Flux[T] private[publisher](private[publisher] val jFlux: JFlux[T])
24042404
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.0.5.RELEASE/src/docs/marble/parallel.png" alt="">
24052405
*
24062406
* @param parallelism the number of parallel rails
2407-
* @return a new [[ParallelFlux]] instance
2407+
* @return a new [[SParallelFlux]] instance
24082408
*/
2409-
final def parallel(parallelism: Int): ParallelFlux[T] = ParallelFlux(jFlux.parallel(parallelism))
2409+
final def parallel(parallelism: Int): SParallelFlux[T] = SParallelFlux(jFlux.parallel(parallelism))
24102410

24112411
/**
24122412
* Prepare to consume this [[Flux]] on parallelism number of 'rails'
@@ -2418,9 +2418,9 @@ class Flux[T] private[publisher](private[publisher] val jFlux: JFlux[T])
24182418
*
24192419
* @param parallelism the number of parallel rails
24202420
* @param prefetch the number of values to prefetch from the source
2421-
* @return a new [[ParallelFlux]] instance
2421+
* @return a new [[SParallelFlux]] instance
24222422
*/
2423-
final def parallel(parallelism: Int, prefetch: Int) = ParallelFlux(jFlux.parallel(parallelism, prefetch))
2423+
final def parallel(parallelism: Int, prefetch: Int) = SParallelFlux(jFlux.parallel(parallelism, prefetch))
24242424

24252425
/**
24262426
* Prepare a [[ConnectableFlux]] which shares this [[Flux]] sequence and dispatches values to

src/main/scala/reactor/core/scala/publisher/ParallelFlux.scala

Lines changed: 0 additions & 11 deletions
This file was deleted.
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package reactor.core.scala.publisher
2+
3+
import java.util
4+
import java.util.function.Supplier
5+
6+
import org.reactivestreams.Publisher
7+
import reactor.core.publisher.{ParallelFlux => JParallelFlux}
8+
import reactor.util.concurrent.Queues
9+
10+
class SParallelFlux[T] private(private val jParallelFlux: JParallelFlux[T]) {
11+
12+
/**
13+
* Perform a fluent transformation to a value via a converter function which receives
14+
* this ParallelFlux.
15+
*
16+
* @tparam U the output value type
17+
* @param converter the converter function from [[SParallelFlux]] to some type
18+
* @return the value returned by the converter function
19+
*/
20+
final def as[U](converter: SParallelFlux[T] => U): U = jParallelFlux.as((t: JParallelFlux[T]) => converter(SParallelFlux(t)))
21+
22+
def asJava: JParallelFlux[T] = jParallelFlux
23+
}
24+
25+
object SParallelFlux {
26+
def apply[T](jParallelFlux: JParallelFlux[T]) = new SParallelFlux[T](jParallelFlux)
27+
28+
/**
29+
* Take a Publisher and prepare to consume it on multiple 'rails' (one per CPU core)
30+
* in a round-robin fashion.
31+
*
32+
* @tparam T the value type
33+
* @param source the source Publisher
34+
* @param parallelism the number of parallel rails
35+
* @param prefetch the number of values to prefetch from the source
36+
* @param queueSupplier the queue structure supplier to hold the prefetched values
37+
* from the source until there is a rail ready to process it.
38+
* @return the [[SParallelFlux]] instance
39+
*/
40+
def from[T](source: Publisher[_ <: T],
41+
parallelism: Int = Runtime.getRuntime.availableProcessors(),
42+
prefetch: Int = Queues.SMALL_BUFFER_SIZE,
43+
queueSupplier: Supplier[util.Queue[T]] = Queues.small()) = SParallelFlux(JParallelFlux.from(source, parallelism, prefetch, queueSupplier))
44+
45+
/**
46+
* Wraps multiple Publishers into a [[SParallelFlux]] which runs them in parallel and
47+
* unordered.
48+
*
49+
* @tparam T the value type
50+
* @param publishers the array of publishers
51+
* @return the [[SParallelFlux]] instance
52+
*/
53+
def fromPublishers[T](publishers: Publisher[T]*) = SParallelFlux(JParallelFlux.from(publishers: _*))
54+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package reactor.core.scala.publisher
2+
3+
import org.scalatest.{FreeSpec, Matchers}
4+
import reactor.core.publisher.{ParallelFlux => JParallelFlux, Flux => JFlux}
5+
6+
class SParallelFluxTest extends FreeSpec with Matchers {
7+
"SParallelFlux" - {
8+
".asJava should convert as Java ParallelFlux" in {
9+
Flux.just(1, 2, 3).parallel().asJava shouldBe a[JParallelFlux[Int]]
10+
}
11+
12+
".apply should convert Java ParallelFlux into SParallelFlux" in {
13+
SParallelFlux(JFlux.just(1, 2, 3).parallel()).asJava shouldBe a[JParallelFlux[Int]]
14+
}
15+
}
16+
}

0 commit comments

Comments
 (0)