This repository was archived by the owner on Mar 2, 2022. It is now read-only.
File tree Expand file tree Collapse file tree 1 file changed +25
-0
lines changed
src/main/scala/reactor/core/scala/publisher Expand file tree Collapse file tree 1 file changed +25
-0
lines changed Original file line number Diff line number Diff line change @@ -669,8 +669,33 @@ trait SFlux[T] extends SFluxLike[T, SFlux] with MapablePublisher[T] with ScalaCo
669669 */
670670 final def retryWhen (retry : Retry ): SFlux [T ] = coreFlux.retryWhen(retry).asScala
671671
672+ /**
673+ * Sample this [[SFlux ]] by periodically emitting an item corresponding to that
674+ * [[SFlux ]] latest emitted value within the periodical time window.
675+ * Note that if some elements are emitted quicker than the timespan just before source
676+ * completion, the last of these elements will be emitted along with the onComplete
677+ * signal.
678+ *
679+ * <p>
680+ * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/master/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/sampleAtRegularInterval.svg" alt="">
681+ *
682+ * @reactor.discard This operator discards elements that are not part of the sampling.
683+ * @param timespan the duration of the window after which to emit the latest observed item
684+ * @return a [[SFlux ]] sampled to the last item seen over each periodic window
685+ */
672686 final def sample (timespan : Duration ): SFlux [T ] = coreFlux.sample(timespan).asScala
673687
688+ /**
689+ * Repeatedly take a value from this [[SFlux ]] then skip the values that follow
690+ * within a given duration.
691+ *
692+ * <p>
693+ * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/master/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/sampleFirstAtRegularInterval.svg" alt="">
694+ *
695+ * @reactor.discard This operator discards elements that are not part of the sampling.
696+ * @param timespan the duration during which to skip values after each sample
697+ * @return a [[SFlux ]] sampled to the first item of each duration-based window
698+ */
674699 final def sampleFirst (timespan : Duration ): SFlux [T ] = coreFlux.sampleFirst(timespan).asScala
675700
676701 final def scan (accumulator : (T , T ) => T ): SFlux [T ] = coreFlux.scan(accumulator).asScala
You can’t perform that action at this time.
0 commit comments