Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 5e25895

Browse files
committed
Added Flux.fromStream
1 parent 63b42fa commit 5e25895

File tree

3 files changed

+46
-12
lines changed

3 files changed

+46
-12
lines changed

src/main/scala/reactor/core/scala/publisher/Flux.scala

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import java.util
55
import java.util.concurrent.Callable
66
import java.util.function.{BiFunction, Consumer, Function, Supplier}
77
import java.util.logging.Level
8-
import java.util.{Comparator, List => JList}
8+
import java.util.{Comparator, stream, List => JList}
99

1010
import org.reactivestreams.{Publisher, Subscriber, Subscription}
1111
import reactor.core.Disposable
@@ -4591,16 +4591,37 @@ object Flux {
45914591

45924592
/**
45934593
* Create a [[Flux]] that emits the items contained in the provided [[Stream]].
4594-
* A new iterator will be created for each subscriber.
4594+
* Keep in mind that a [[Stream]] cannot be re-used, which can be problematic in
4595+
* case of multiple subscriptions or re-subscription (like with [[Flux.repeat()]] or
4596+
* [[Flux.retry]]). The [[Stream]] is [[Stream.close closed]] automatically
4597+
* by the operator on cancellation, error or completion.
45954598
* <p>
4596-
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.0.5.RELEASE/src/docs/marble/fromstream.png" alt="">
4599+
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/fromstream.png" alt="">
45974600
* <p>
45984601
*
45994602
* @param s the [[Stream]] to read data from
4600-
* @tparam T the [[Stream]] type to flux
4603+
* @tparam T The type of values in the source [[Stream]] and resulting Flux
46014604
* @return a new [[Flux]]
46024605
*/
4603-
def fromStream[T](s: Stream[T]): Flux[T] = Flux.fromIterable(s)
4606+
def fromStream[T](s: Stream[T]) = Flux(JFlux.fromStream(s))
4607+
4608+
/**
4609+
* Create a [[Flux]] that emits the items contained in a [[Stream]] created by
4610+
* the provided [[Function0]] for each subscription. The [[Stream]] is
4611+
* [[Stream.close closed]] automatically by the operator on cancellation, error
4612+
* or completion.
4613+
* <p>
4614+
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/fromstream.png" alt="">
4615+
* <p>
4616+
*
4617+
* @param streamSupplier the [[Function0]] that generates the [[Stream]] from
4618+
* which to read data
4619+
* @tparam T The type of values in the source [[Stream]] and resulting Flux
4620+
* @return a new [[Flux]]
4621+
*/
4622+
def fromStream[T](streamSupplier: () => Stream[T]) = Flux(JFlux.fromStream[T](new Supplier[stream.Stream[_ <: T]] {
4623+
override def get() = streamSupplier()
4624+
}))
46044625

46054626
/**
46064627
* Generate signals one-by-one via a consumer callback.

src/main/scala/reactor/core/scala/publisher/package.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,19 @@ package reactor.core.scala
22

33
import java.lang.{Boolean => JBoolean, Iterable => JIterable, Long => JLong}
44
import java.time.{Duration => JDuration}
5-
import java.util.Optional
5+
import java.util.{Optional, Spliterator, Spliterators}
66
import java.util.Optional.empty
77
import java.util.concurrent.Callable
88
import java.util.function.{BiConsumer, BiFunction, BiPredicate, BooleanSupplier, Consumer, Function, LongConsumer, Predicate, Supplier}
9+
import java.util.stream.{StreamSupport, Stream => JStream}
910

1011
import org.reactivestreams.Publisher
1112
import reactor.core.publisher.{Flux => JFlux, Mono => JMono}
1213
import reactor.util.function.{Tuple2, Tuple3, Tuple4, Tuple5, Tuple6}
1314

1415
import scala.concurrent.duration.Duration
1516
import scala.language.implicitConversions
17+
import scala.collection.JavaConverters._
1618

1719
/**
1820
* Created by winarto on 12/31/16.
@@ -140,7 +142,6 @@ Uncomment this when used. It is not used for now and reduce the code coverage
140142
}
141143

142144
implicit def scalaIterable2JavaIterable[T](scalaIterable: Iterable[T]): JIterable[T] = {
143-
import scala.collection.JavaConverters._
144145
scalaIterable.asJava
145146
}
146147

@@ -161,4 +162,8 @@ Uncomment this when used. It is not used for now and reduce the code coverage
161162
}
162163

163164
implicit def javaOptional2ScalaOption[T](jOptional: Optional[T]): Option[T] = if(jOptional.isPresent) Some(jOptional.get()) else None
165+
166+
implicit def scalaStream2JavaStream[T](stream: Stream[T]): JStream[T] = {
167+
StreamSupport.stream(Spliterators.spliteratorUnknownSize[T](stream.toIterator.asJava, Spliterator.NONNULL), false)
168+
}
164169
}

src/test/scala/reactor/core/scala/publisher/FluxTest.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -258,11 +258,19 @@ class FluxTest extends FreeSpec with Matchers with TableDrivenPropertyChecks {
258258
.verifyComplete()
259259
}
260260

261-
".fromStream should create flux that emit items contained in the provided stream" in {
262-
val flux = Flux.fromStream(Stream(1, 2, 3))
263-
StepVerifier.create(flux)
264-
.expectNext(1, 2, 3)
265-
.verifyComplete()
261+
".fromStream" - {
262+
"should create flux that emit items contained in the provided stream" in {
263+
val flux = Flux.fromStream(Stream(1, 2, 3))
264+
StepVerifier.create(flux)
265+
.expectNext(1, 2, 3)
266+
.verifyComplete()
267+
}
268+
"with supplier should create flux that emit items contained in the supplier" in {
269+
val flux = Flux.fromStream(() => Stream(1, 2, 3))
270+
StepVerifier.create(flux)
271+
.expectNext(1, 2, 3)
272+
.verifyComplete()
273+
}
266274
}
267275

268276
".generate" - {

0 commit comments

Comments
 (0)