Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit e667cac

Browse files
committed
Adding scaladoc for SFlux.mergeOrdered
Signed-off-by: Winarto Zhao <[email protected]>
1 parent cefd835 commit e667cac

File tree

2 files changed

+19
-1
lines changed

2 files changed

+19
-1
lines changed

src/main/scala/reactor/core/scala/publisher/SFlux.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,24 @@ object SFlux {
748748
else new ReactiveSFlux[I](JFlux.merge(prefetch, sources: _*))
749749
}
750750

751+
/**
752+
* Merge data from provided [[Publisher]] sequences into an ordered merged sequence,
753+
* by picking the smallest values from each source (as defined by the provided
754+
* [[Comparator]]). This is not a [[SFlux.sort(Ordering)]], as it doesn't consider
755+
* the whole of each sequences.
756+
* <p>
757+
* Instead, this operator considers only one value from each source and picks the
758+
* smallest of all these values, then replenishes the slot for that picked source.
759+
* <p>
760+
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/master/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/mergeOrdered.svg" alt="">
761+
*
762+
* @param prefetch the number of elements to prefetch from each source (avoiding too
763+
* many small requests to the source when picking)
764+
* @param comparator the [[Comparator]] to use to find the smallest value
765+
* @param sources [[Publisher]] sources to merge
766+
* @tparam I the merged type
767+
* @return a merged [[SFlux]] that , subscribing early but keeping the original ordering
768+
*/
751769
def mergeOrdered[I <: Comparable[I]](sources: Seq[Publisher[_ <: I]], prefetch: Int = Queues.SMALL_BUFFER_SIZE, comparator: Comparator[I] = Comparator.naturalOrder[I]()) =
752770
new ReactiveSFlux[I](JFlux.mergeOrdered(prefetch: Int, comparator, sources: _*))
753771

src/test/scala/reactor/core/scala/publisher/SFluxTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ class SFluxTest extends FreeSpec with Matchers with TableDrivenPropertyChecks wi
291291
val sFlux2 = SFlux.just[Integer](10, 30, 50, 70).delayElements(5 seconds).delaySubscription(2500 millisecond)
292292
SFlux.mergeOrdered(Seq(sFlux1, sFlux2), 5, Comparator.naturalOrder().reversed())
293293
}).thenAwait(30 seconds)
294-
.expectNext(10, 30, 50, 70, 1, 20, 40, 60, 80)//this is possibly a bug in reactor. Tracking https://github.com/reactor/reactor-core/issues/1918
294+
.expectNext(10, 30, 50, 70, 1, 20, 40, 60, 80)
295295
.verifyComplete()
296296
}
297297
}

0 commit comments

Comments
 (0)