Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit f939468

Browse files
committed
Added SParallelFlux.from
Added SParallelFlux.fromPublishers Signed-off-by: Winarto Zhao <[email protected]>
1 parent ef91b02 commit f939468

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed
Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,43 @@
11
package reactor.core.scala.publisher
22

3+
import java.util
4+
import java.util.function.Supplier
5+
6+
import org.reactivestreams.Publisher
37
import reactor.core.publisher.{ParallelFlux => JParallelFlux}
8+
import reactor.util.concurrent.Queues
49

5-
class SParallelFlux[T]private(private val jParallelFlux: JParallelFlux[T]) {
10+
class SParallelFlux[T] private(private val jParallelFlux: JParallelFlux[T]) {
611
def asJava: JParallelFlux[T] = jParallelFlux
712
}
813

914
object SParallelFlux {
1015
def apply[T](jParallelFlux: JParallelFlux[T]) = new SParallelFlux[T](jParallelFlux)
16+
17+
/**
18+
* Take a Publisher and prepare to consume it on multiple 'rails' (one per CPU core)
19+
* in a round-robin fashion.
20+
*
21+
* @tparam T the value type
22+
* @param source the source Publisher
23+
* @param parallelism the number of parallel rails
24+
* @param prefetch the number of values to prefetch from the source
25+
* @param queueSupplier the queue structure supplier to hold the prefetched values
26+
* from the source until there is a rail ready to process it.
27+
* @return the [[SParallelFlux]] instance
28+
*/
29+
def from[T](source: Publisher[_ <: T],
30+
parallelism: Int = Runtime.getRuntime.availableProcessors(),
31+
prefetch: Int = Queues.SMALL_BUFFER_SIZE,
32+
queueSupplier: Supplier[util.Queue[T]] = Queues.small()) = SParallelFlux(JParallelFlux.from(source, parallelism, prefetch, queueSupplier))
33+
34+
/**
35+
* Wraps multiple Publishers into a [[SParallelFlux]] which runs them in parallel and
36+
* unordered.
37+
*
38+
* @tparam T the value type
39+
* @param publishers the array of publishers
40+
* @return the [[SParallelFlux]] instance
41+
*/
42+
def fromPublishers[T](publishers: Publisher[T]*) = SParallelFlux(JParallelFlux.from(publishers: _*))
1143
}

src/test/scala/reactor/core/scala/publisher/SParallelFluxTest.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package reactor.core.scala.publisher
22

33
import org.scalatest.{FreeSpec, Matchers}
4-
import reactor.core.publisher.{Flux => JFlux, ParallelFlux => JParallelFlux}
4+
import reactor.core.publisher.{ParallelFlux => JParallelFlux, Flux => JFlux}
55

66
class SParallelFluxTest extends FreeSpec with Matchers {
77
"SParallelFlux" - {
88
".asJava should convert as Java ParallelFlux" in {
9+
Flux.just(1, 2, 3).parallel().asJava shouldBe a[JParallelFlux[Int]]
10+
}
11+
12+
".apply should convert Java ParallelFlux into SParallelFlux" in {
913
SParallelFlux(JFlux.just(1, 2, 3).parallel()).asJava shouldBe a[JParallelFlux[Int]]
1014
}
1115
}

0 commit comments

Comments
 (0)