Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 76a32f1

Browse files
committed
Adding scaladoc for SFlux.merge
Signed-off-by: Winarto Zhao <[email protected]>
1 parent e667cac commit 76a32f1

File tree

1 file changed

+20
-4
lines changed

1 file changed

+20
-4
lines changed

src/main/scala/reactor/core/scala/publisher/SFlux.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -743,10 +743,26 @@ object SFlux {
743743

744744
def just[T](data: T*): SFlux[T] = apply[T](data: _*)
745745

746-
def merge[I](sources: Seq[Publisher[_ <: I]], prefetch: Int = Queues.XS_BUFFER_SIZE, delayError: Boolean = false): ReactiveSFlux[I] = {
747-
if(delayError) new ReactiveSFlux[I](JFlux.mergeDelayError(prefetch, sources: _*))
748-
else new ReactiveSFlux[I](JFlux.merge(prefetch, sources: _*))
749-
}
746+
/**
747+
* Merge data from [[Publisher]] sequences contained in an array / vararg
748+
* into an interleaved merged sequence. Unlike [[SFlux.concat(Publisher) concat]],
749+
* sources are subscribed to eagerly.
750+
* <p>
751+
* <img class="marble" src="https://github.com/reactor/reactor-core/tree/master/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles//mergeFixedSources.svg" alt="">
752+
* <p>
753+
* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
754+
* an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
755+
* in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
756+
* another source.
757+
*
758+
* @param sources the array of [[Publisher]] sources to merge
759+
* @param prefetch the inner source request size
760+
* @param delayError This parameter will delay any error until after the rest of the merge backlog has been processed.
761+
* @tparam I The source type of the data sequence
762+
* @return a fresh Reactive [[SFlux]] publisher ready to be subscribed
763+
*/
764+
def merge[I](sources: Seq[Publisher[_ <: I]], prefetch: Int = Queues.XS_BUFFER_SIZE, delayError: Boolean = false): ReactiveSFlux[I] = if(delayError) new ReactiveSFlux[I](JFlux.mergeDelayError(prefetch, sources: _*))
765+
else new ReactiveSFlux[I](JFlux.merge(prefetch, sources: _*))
750766

751767
/**
752768
* Merge data from provided [[Publisher]] sequences into an ordered merged sequence,

0 commit comments

Comments
 (0)