Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 71df374

Browse files
committed
Added Flux.index(indexMapper)
1 parent 60638ca commit 71df374

File tree

2 files changed

+31
-5
lines changed

2 files changed

+31
-5
lines changed

src/main/scala/reactor/core/scala/publisher/Flux.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1790,6 +1790,24 @@ class Flux[T] private[publisher](private[publisher] val jFlux: JFlux[T]) extends
17901790
case (jLong: JLong, t: T) => (Long2long(jLong), t)
17911791
}
17921792

1793+
/**
1794+
* Keep information about the order in which source values were received by
1795+
* indexing them internally with a 0-based incrementing long then combining this
1796+
* information with the source value into a `I` using the provided [[Function2]] ,
1797+
* returning a [[Flux[I]]].
1798+
* <p>
1799+
* Typical usage would be to produce a [[scala.Tuple2]] similar to [[Flux.index()]], but
1800+
* 1-based instead of 0-based:
1801+
* <p>
1802+
* `index((i, v) => (i+1, v))`
1803+
*
1804+
* @param indexMapper the [[Function2]] to use to combine elements and their index.
1805+
* @return an indexed [[Flux]] with each source value combined with its computed index.
1806+
*/
1807+
final def index[I](indexMapper: (Long, T) => I) = Flux(jFlux.index[I](new BiFunction[JLong, T, I] {
1808+
override def apply(t: JLong, u: T) = indexMapper(Long2long(t), u)
1809+
}))
1810+
17931811
/**
17941812
* Ignores onNext signals (dropping them) and only reacts on termination.
17951813
*

src/test/scala/reactor/core/scala/publisher/FluxTest.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -310,11 +310,19 @@ class FluxTest extends FreeSpec with Matchers with TableDrivenPropertyChecks {
310310
}
311311
}
312312

313-
".index should return tuple with the index" in {
314-
val flux = Flux.just("a", "b", "c").index()
315-
StepVerifier.create(flux)
316-
.expectNext((0l, "a"), (1l, "b"), (2l, "c"))
317-
.verifyComplete()
313+
".index" - {
314+
"should return tuple with the index" in {
315+
val flux = Flux.just("a", "b", "c").index()
316+
StepVerifier.create(flux)
317+
.expectNext((0l, "a"), (1l, "b"), (2l, "c"))
318+
.verifyComplete()
319+
}
320+
"with index mapper should return the mapped value" in {
321+
val flux = Flux.just("a", "b", "c").index((i, v) => s"$i-$v")
322+
StepVerifier.create(flux)
323+
.expectNext("0-a", "1-b", "2-c")
324+
.verifyComplete()
325+
}
318326
}
319327

320328
".interval" - {

0 commit comments

Comments
 (0)