Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 2257c75

Browse files
committed
Breaking changes:
- Renamed ConnectableFlux => ConnectableSFlux Added SFlux.publish
1 parent 19190c3 commit 2257c75

File tree

4 files changed

+71
-21
lines changed

4 files changed

+71
-21
lines changed

src/main/scala/reactor/core/scala/publisher/ConnectableFlux.scala renamed to src/main/scala/reactor/core/scala/publisher/ConnectableSFlux.scala

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,26 @@
11
package reactor.core.scala.publisher
22

3-
import reactor.core.publisher.{ConnectableFlux => JConnectableFlux}
43
import reactor.core.Disposable
4+
import reactor.core.publisher.ConnectableFlux
55
import reactor.core.scheduler.Scheduler
66

77
import scala.concurrent.duration.Duration
88

9-
class ConnectableFlux[T]private (private val jConnectableFlux: JConnectableFlux[T]) extends SFlux[T] {
9+
class ConnectableSFlux[T]private(private val connectableFlux: ConnectableFlux[T]) extends SFlux[T] {
1010

1111
/**
12-
* Connects this [[ConnectableFlux]] to the upstream source when the first [[org.reactivestreams.Subscriber]]
12+
* Connects this [[ConnectableSFlux]] to the upstream source when the first [[org.reactivestreams.Subscriber]]
1313
* subscribes.
1414
*
1515
* <p>
1616
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.0.6.RELEASE/src/docs/marble/autoconnect.png" alt="">
1717
*
1818
* @return a [[SFlux]] that connects to the upstream source when the first [[org.reactivestreams.Subscriber]] subscribes
1919
*/
20-
final def autoConnect(): SFlux[T] = SFlux.fromPublisher(jConnectableFlux.autoConnect())
20+
final def autoConnect(): SFlux[T] = SFlux.fromPublisher(connectableFlux.autoConnect())
2121

2222
/**
23-
* Connects this [[ConnectableFlux]] to the upstream source when the specified amount of
23+
* Connects this [[ConnectableSFlux]] to the upstream source when the specified amount of
2424
* [[org.reactivestreams.Subscriber]] subscribes.
2525
* <p>
2626
* Subscribing and immediately unsubscribing Subscribers also contribute the the subscription count
@@ -32,10 +32,10 @@ class ConnectableFlux[T]private (private val jConnectableFlux: JConnectableFlux[
3232
* @param minSubscribers the minimum number of subscribers
3333
* @return a [[SFlux]] that connects to the upstream source when the given amount of Subscribers subscribe
3434
*/
35-
final def autoConnect(minSubscribers: Int): SFlux[T] = SFlux.fromPublisher(jConnectableFlux.autoConnect(minSubscribers))
35+
final def autoConnect(minSubscribers: Int): SFlux[T] = SFlux.fromPublisher(connectableFlux.autoConnect(minSubscribers))
3636

3737
/**
38-
* Connects this [[ConnectableFlux]] to the upstream source when the specified amount of
38+
* Connects this [[ConnectableSFlux]] to the upstream source when the specified amount of
3939
* [[org.reactivestreams.Subscriber]] subscribes and calls the supplied consumer with a runnable that allows disconnecting.
4040
*
4141
* @param minSubscribers the minimum number of subscribers
@@ -45,18 +45,18 @@ class ConnectableFlux[T]private (private val jConnectableFlux: JConnectableFlux[
4545
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.0.6.RELEASE/src/docs/marble/autoconnect.png" alt="">
4646
* @return a { @link Flux} that connects to the upstream source when the given amount of subscribers subscribed
4747
*/
48-
final def autoConnect(minSubscribers: Int, cancelSupport: Disposable => Unit): SFlux[T] = SFlux.fromPublisher(jConnectableFlux.autoConnect(minSubscribers, cancelSupport))
48+
final def autoConnect(minSubscribers: Int, cancelSupport: Disposable => Unit): SFlux[T] = SFlux.fromPublisher(connectableFlux.autoConnect(minSubscribers, cancelSupport))
4949

5050
/**
51-
* Connect this [[ConnectableFlux]] to its source and return a [[Runnable]] that
51+
* Connect this [[ConnectableSFlux]] to its source and return a [[Runnable]] that
5252
* can be used for disconnecting.
5353
*
5454
* @return the [[Disposable]] that allows disconnecting the connection after.
5555
*/
56-
final def connect(): Disposable = jConnectableFlux.connect()
56+
final def connect(): Disposable = connectableFlux.connect()
5757

5858
/**
59-
* Connects this [[ConnectableFlux]] to its source and sends a [[Disposable]] to a callback that
59+
* Connects this [[ConnectableSFlux]] to its source and sends a [[Disposable]] to a callback that
6060
* can be used for disconnecting.
6161
*
6262
* <p>The call should be idempotent in respect of connecting the first
@@ -66,7 +66,7 @@ class ConnectableFlux[T]private (private val jConnectableFlux: JConnectableFlux[
6666
* @param cancelSupport the callback is called with a Cancellation instance that can
6767
* be called to disconnect the source, even synchronously.
6868
*/
69-
final def connect(cancelSupport: Disposable => Unit): Unit = jConnectableFlux.connect(cancelSupport)
69+
final def connect(cancelSupport: Disposable => Unit): Unit = connectableFlux.connect(cancelSupport)
7070

7171
/**
7272
* Connects to the upstream source when the first [[org.reactivestreams.Subscriber]] subscribes and disconnects
@@ -77,7 +77,7 @@ class ConnectableFlux[T]private (private val jConnectableFlux: JConnectableFlux[
7777
*
7878
* @return a reference counting [[SFlux]]
7979
*/
80-
final def refCount(): SFlux[T] = SFlux.fromPublisher(jConnectableFlux.refCount())
80+
final def refCount(): SFlux[T] = SFlux.fromPublisher(connectableFlux.refCount())
8181

8282
/**
8383
* Connects to the upstream source when the given number of [[org.reactivestreams.Subscriber]] subscribes and disconnects
@@ -89,7 +89,7 @@ class ConnectableFlux[T]private (private val jConnectableFlux: JConnectableFlux[
8989
* @param minSubscribers the number of subscribers expected to subscribe before connection
9090
* @return a reference counting [[SFlux]]
9191
*/
92-
final def refCount(minSubscribers: Int): SFlux[T] = SFlux.fromPublisher(jConnectableFlux.refCount(minSubscribers))
92+
final def refCount(minSubscribers: Int): SFlux[T] = SFlux.fromPublisher(connectableFlux.refCount(minSubscribers))
9393

9494
/**
9595
* Connects to the upstream source when the given number of [[org.reactivestreams.Subscriber]] subscribes.
@@ -106,7 +106,7 @@ class ConnectableFlux[T]private (private val jConnectableFlux: JConnectableFlux[
106106
* disconnecting when all subscribers have cancelled.
107107
* @return a reference counting [[SFlux]] with a grace period for disconnection
108108
*/
109-
final def refCount(minSubscribers: Int, gracePeriod: Duration): SFlux[T] = SFlux.fromPublisher(jConnectableFlux.refCount(minSubscribers, gracePeriod))
109+
final def refCount(minSubscribers: Int, gracePeriod: Duration): SFlux[T] = SFlux.fromPublisher(connectableFlux.refCount(minSubscribers, gracePeriod))
110110

111111
/**
112112
* Connects to the upstream source when the given number of [[org.reactivestreams.Subscriber]] subscribes.
@@ -124,11 +124,11 @@ class ConnectableFlux[T]private (private val jConnectableFlux: JConnectableFlux[
124124
* @param scheduler the [[Scheduler]] on which to run timeouts
125125
* @return a reference counting [[SFlux]] with a grace period for disconnection
126126
*/
127-
final def refCount(minSubscribers: Int, gracePeriod: Duration, scheduler: Scheduler): SFlux[T] = SFlux.fromPublisher(jConnectableFlux.refCount(minSubscribers, gracePeriod, scheduler))
127+
final def refCount(minSubscribers: Int, gracePeriod: Duration, scheduler: Scheduler): SFlux[T] = SFlux.fromPublisher(connectableFlux.refCount(minSubscribers, gracePeriod, scheduler))
128128

129-
override private[publisher] def coreFlux: JConnectableFlux[T] = jConnectableFlux
129+
override private[publisher] def coreFlux: ConnectableFlux[T] = connectableFlux
130130
}
131131

132-
object ConnectableFlux {
133-
def apply[T](jConnectableFlux: JConnectableFlux[T]) = new ConnectableFlux[T](jConnectableFlux)
132+
object ConnectableSFlux {
133+
def apply[T](jConnectableFlux: ConnectableFlux[T]) = new ConnectableSFlux[T](jConnectableFlux)
134134
}

src/main/scala/reactor/core/scala/publisher/SFlux.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,32 @@ trait SFlux[T] extends SFluxLike[T, SFlux] with MapablePublisher[T] with ScalaCo
600600
*/
601601
final def parallel(parallelism: Int = Runtime.getRuntime.availableProcessors(), prefetch: Int = Queues.SMALL_BUFFER_SIZE) = SParallelFlux(coreFlux.parallel(parallelism, prefetch))
602602

603+
/**
604+
* Prepare a [[ConnectableSFlux]] which shares this [[SFlux]] sequence and
605+
* dispatches values to subscribers in a backpressure-aware manner. This will
606+
* effectively turn any type of sequence into a hot sequence.
607+
* <p>
608+
* Backpressure will be coordinated on [[org.reactivestreams.Subscription.request]] and if any
609+
* [[Subscriber]] is missing demand (requested = 0), multicast will pause
610+
* pushing/pulling.
611+
* <p>
612+
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/master/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/publish.svg" alt="">
613+
*
614+
* @param prefetch bounded requested demand
615+
* @return a new [[ConnectableSFlux]]
616+
*/
617+
final def publish(prefetch: Int = Queues.SMALL_BUFFER_SIZE): ConnectableSFlux[T] = coreFlux.publish(prefetch).asScala
618+
619+
/**
620+
* Prepare a [[SMono]] which shares this [[SFlux]] sequence and dispatches the
621+
* first observed item to subscribers in a backpressure-aware manner.
622+
* This will effectively turn any type of sequence into a hot sequence when the first
623+
* [[Subscriber]] subscribes.
624+
* <p>
625+
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/master/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/publishNext.svg" alt="">
626+
*
627+
* @return a new [[SMono]]
628+
*/
603629
final def publishNext(): SMono[T] = SMono.fromPublisher(coreFlux.publishNext())
604630

605631
final def reduce(aggregator: (T, T) => T): SMono[T] = coreFlux.reduce(aggregator).asScala

src/main/scala/reactor/core/scala/publisher/ScalaConverters.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package reactor.core.scala.publisher
22

3-
import reactor.core.publisher.{Flux => JFlux, Mono => JMono}
3+
import reactor.core.publisher.{ConnectableFlux, Flux => JFlux, Mono => JMono}
44

55
import scala.language.implicitConversions
66

@@ -12,6 +12,10 @@ trait ScalaConverters {
1212
implicit class PimpJFlux[T](jFlux: JFlux[T]) {
1313
def asScala: SFlux[T] = new ReactiveSFlux[T](jFlux)
1414
}
15+
16+
implicit class PimpConnectableFlux[T](connectableFlux: ConnectableFlux[T]) {
17+
def asScala: ConnectableSFlux[T] = ConnectableSFlux(connectableFlux)
18+
}
1519
}
1620

1721
object ScalaConverters extends ScalaConverters

src/test/scala/reactor/core/scala/publisher/SFluxTest.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ import java.util.function.{Consumer, Predicate}
1010

1111
import org.mockito.{ArgumentMatchersSugar, IdiomaticMockito}
1212
import org.reactivestreams.Subscription
13+
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
1314
import org.scalatest.freespec.AnyFreeSpec
1415
import org.scalatest.matchers.should.Matchers
1516
import org.scalatest.prop.TableDrivenPropertyChecks
17+
import org.scalatest.time.{Seconds, Span}
1618
import reactor.core.publisher.BufferOverflowStrategy.DROP_LATEST
1719
import reactor.core.publisher.{Flux => JFlux, _}
1820
import reactor.core.scala.Scannable
@@ -33,7 +35,11 @@ import scala.math.Ordering.IntOrdering
3335
import scala.math.ScalaNumber
3436
import scala.util.{Failure, Try}
3537

36-
class SFluxTest extends AnyFreeSpec with Matchers with TableDrivenPropertyChecks with TestSupport with IdiomaticMockito with ArgumentMatchersSugar {
38+
class SFluxTest extends AnyFreeSpec with Matchers with TableDrivenPropertyChecks with TestSupport with IdiomaticMockito with ArgumentMatchersSugar
39+
with PatienceConfiguration with Eventually {
40+
41+
implicit override def patienceConfig: PatienceConfig = PatienceConfig(Span(2, Seconds))
42+
3743
"SFlux" - {
3844
".apply should return a proper SFlux when provided a Publisher" in {
3945
StepVerifier.create(SFlux(JFlux.just(1,2,3)))
@@ -1680,6 +1686,20 @@ class SFluxTest extends AnyFreeSpec with Matchers with TableDrivenPropertyChecks
16801686
.verifyComplete()
16811687
}
16821688

1689+
".publish" - {
1690+
"without transformer should produce connectable flux" in {
1691+
val buffer = mutable.ListBuffer.empty[Int]
1692+
val base = SFlux.just(1, 2, 3, 4, 5).delayElements(1 second).publish().autoConnect()
1693+
base.subscribe()
1694+
Thread.sleep(1000)
1695+
base.subscribe(i => buffer += i)
1696+
eventually {
1697+
buffer.size should be >= 1
1698+
buffer should not contain (1)
1699+
}
1700+
}
1701+
}
1702+
16831703
".publishNext should make this flux a hot mono" in {
16841704
StepVerifier.create(SFlux.just(1, 2, 3).publishNext())
16851705
.expectNext(1)

0 commit comments

Comments
 (0)