Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 60638ca

Browse files
committed
Added Flux.index()
1 parent 408562b commit 60638ca

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

src/main/scala/reactor/core/scala/publisher/Flux.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1779,6 +1779,17 @@ class Flux[T] private[publisher](private[publisher] val jFlux: JFlux[T]) extends
17791779
// TODO: How to test???
17801780
final def hide() = Flux(jFlux.hide())
17811781

1782+
/**
1783+
* Keep information about the order in which source values were received by
1784+
* indexing them with a 0-based incrementing long, returning a [[Flux]]
1785+
* of [[scala.Tuple2[index, value]]
1786+
*
1787+
* @return an indexed [[Flux]] with each source value combined with its 0-based index.
1788+
*/
1789+
final def index(): Flux[(Long, T)] = Flux(jFlux.index()).map(tupleTwo2ScalaTuple2) map {
1790+
case (jLong: JLong, t: T) => (Long2long(jLong), t)
1791+
}
1792+
17821793
/**
17831794
* Ignores onNext signals (dropping them) and only reacts on termination.
17841795
*

src/test/scala/reactor/core/scala/publisher/FluxTest.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,14 @@ class FluxTest extends FreeSpec with Matchers with TableDrivenPropertyChecks {
309309
.verifyComplete()
310310
}
311311
}
312+
313+
".index should return tuple with the index" in {
314+
val flux = Flux.just("a", "b", "c").index()
315+
StepVerifier.create(flux)
316+
.expectNext((0l, "a"), (1l, "b"), (2l, "c"))
317+
.verifyComplete()
318+
}
319+
312320
".interval" - {
313321
"without delay should produce flux of Long starting from 0 every provided timespan immediately" in {
314322
StepVerifier.withVirtualTime(() => Flux.interval(1 second).take(5))

0 commit comments

Comments
 (0)