@@ -15536,6 +15536,158 @@ public final Flowable<T> throttleLast(long intervalDuration, TimeUnit unit, Sche
1553615536 return sample(intervalDuration, unit, scheduler);
1553715537 }
1553815538
15539+ /**
15540+ * Throttles items from the upstream {@code Flowable} by first emitting the next
15541+ * item from upstream, then periodically emitting the latest item (if any) when
15542+ * the specified timeout elapses between them.
15543+ * <p>
15544+ * <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.png" alt="">
15545+ * <p>
15546+ * Unlike the option with {@link #throttleLatest(long, TimeUnit, boolean)}, the very last item being held back
15547+ * (if any) is not emitted when the upstream completes.
15548+ * <p>
15549+ * If no items were emitted from the upstream during this timeout phase, the next
15550+ * upstream item is emitted immediately and the timeout window starts from then.
15551+ * <dl>
15552+ * <dt><b>Backpressure:</b></dt>
15553+ * <dd>This operator does not support backpressure as it uses time to control data flow.
15554+ * If the downstream is not ready to receive items, a
15555+ * {@link io.reactivex.exceptions.MissingBackpressureException MissingBackpressureException}
15556+ * will be signaled.</dd>
15557+ * <dt><b>Scheduler:</b></dt>
15558+ * <dd>{@code throttleLatest} operates by default on the {@code computation} {@link Scheduler}.</dd>
15559+ * </dl>
15560+ * @param timeout the time to wait after an item emission towards the downstream
15561+ * before trying to emit the latest item from upstream again
15562+ * @param unit the time unit
15563+ * @return the new Flowable instance
15564+ * @since 2.1.14 - experimental
15565+ * @see #throttleLatest(long, TimeUnit, boolean)
15566+ * @see #throttleLatest(long, TimeUnit, Scheduler)
15567+ */
15568+ @Experimental
15569+ @CheckReturnValue
15570+ @BackpressureSupport(BackpressureKind.ERROR)
15571+ @SchedulerSupport(SchedulerSupport.COMPUTATION)
15572+ public final Flowable<T> throttleLatest(long timeout, TimeUnit unit) {
15573+ return throttleLatest(timeout, unit, Schedulers.computation(), false);
15574+ }
15575+
15576+ /**
15577+ * Throttles items from the upstream {@code Flowable} by first emitting the next
15578+ * item from upstream, then periodically emitting the latest item (if any) when
15579+ * the specified timeout elapses between them.
15580+ * <p>
15581+ * <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.e.png" alt="">
15582+ * <p>
15583+ * If no items were emitted from the upstream during this timeout phase, the next
15584+ * upstream item is emitted immediately and the timeout window starts from then.
15585+ * <dl>
15586+ * <dt><b>Backpressure:</b></dt>
15587+ * <dd>This operator does not support backpressure as it uses time to control data flow.
15588+ * If the downstream is not ready to receive items, a
15589+ * {@link io.reactivex.exceptions.MissingBackpressureException MissingBackpressureException}
15590+ * will be signaled.</dd>
15591+ * <dt><b>Scheduler:</b></dt>
15592+ * <dd>{@code throttleLatest} operates by default on the {@code computation} {@link Scheduler}.</dd>
15593+ * </dl>
15594+ * @param timeout the time to wait after an item emission towards the downstream
15595+ * before trying to emit the latest item from upstream again
15596+ * @param unit the time unit
15597+ * @param emitLast If {@code true}, the very last item from the upstream will be emitted
15598+ * immediately when the upstream completes, regardless if there is
15599+ * a timeout window active or not. If {@code false}, the very last
15600+ * upstream item is ignored and the flow terminates.
15601+ * @return the new Flowable instance
15602+ * @since 2.1.14 - experimental
15603+ * @see #throttleLatest(long, TimeUnit, Scheduler, boolean)
15604+ */
15605+ @Experimental
15606+ @CheckReturnValue
15607+ @BackpressureSupport(BackpressureKind.ERROR)
15608+ @SchedulerSupport(SchedulerSupport.COMPUTATION)
15609+ public final Flowable<T> throttleLatest(long timeout, TimeUnit unit, boolean emitLast) {
15610+ return throttleLatest(timeout, unit, Schedulers.computation(), emitLast);
15611+ }
15612+
15613+ /**
15614+ * Throttles items from the upstream {@code Flowable} by first emitting the next
15615+ * item from upstream, then periodically emitting the latest item (if any) when
15616+ * the specified timeout elapses between them.
15617+ * <p>
15618+ * <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.s.png" alt="">
15619+ * <p>
15620+ * Unlike the option with {@link #throttleLatest(long, TimeUnit, Scheduler, boolean)}, the very last item being held back
15621+ * (if any) is not emitted when the upstream completes.
15622+ * <p>
15623+ * If no items were emitted from the upstream during this timeout phase, the next
15624+ * upstream item is emitted immediately and the timeout window starts from then.
15625+ * <dl>
15626+ * <dt><b>Backpressure:</b></dt>
15627+ * <dd>This operator does not support backpressure as it uses time to control data flow.
15628+ * If the downstream is not ready to receive items, a
15629+ * {@link io.reactivex.exceptions.MissingBackpressureException MissingBackpressureException}
15630+ * will be signaled.</dd>
15631+ * <dt><b>Scheduler:</b></dt>
15632+ * <dd>You specify which {@link Scheduler} this operator will use.</dd>
15633+ * </dl>
15634+ * @param timeout the time to wait after an item emission towards the downstream
15635+ * before trying to emit the latest item from upstream again
15636+ * @param unit the time unit
15637+ * @param scheduler the {@link Scheduler} where the timed wait and latest item
15638+ * emission will be performed
15639+ * @return the new Flowable instance
15640+ * @since 2.1.14 - experimental
15641+ * @see #throttleLatest(long, TimeUnit, Scheduler, boolean)
15642+ */
15643+ @Experimental
15644+ @CheckReturnValue
15645+ @BackpressureSupport(BackpressureKind.ERROR)
15646+ @SchedulerSupport(SchedulerSupport.CUSTOM)
15647+ public final Flowable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler) {
15648+ return throttleLatest(timeout, unit, scheduler, false);
15649+ }
15650+
15651+ /**
15652+ * Throttles items from the upstream {@code Flowable} by first emitting the next
15653+ * item from upstream, then periodically emitting the latest item (if any) when
15654+ * the specified timeout elapses between them.
15655+ * <p>
15656+ * <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.se.png" alt="">
15657+ * <p>
15658+ * If no items were emitted from the upstream during this timeout phase, the next
15659+ * upstream item is emitted immediately and the timeout window starts from then.
15660+ * <dl>
15661+ * <dt><b>Backpressure:</b></dt>
15662+ * <dd>This operator does not support backpressure as it uses time to control data flow.
15663+ * If the downstream is not ready to receive items, a
15664+ * {@link io.reactivex.exceptions.MissingBackpressureException MissingBackpressureException}
15665+ * will be signaled.</dd>
15666+ * <dt><b>Scheduler:</b></dt>
15667+ * <dd>You specify which {@link Scheduler} this operator will use.</dd>
15668+ * </dl>
15669+ * @param timeout the time to wait after an item emission towards the downstream
15670+ * before trying to emit the latest item from upstream again
15671+ * @param unit the time unit
15672+ * @param scheduler the {@link Scheduler} where the timed wait and latest item
15673+ * emission will be performed
15674+ * @param emitLast If {@code true}, the very last item from the upstream will be emitted
15675+ * immediately when the upstream completes, regardless if there is
15676+ * a timeout window active or not. If {@code false}, the very last
15677+ * upstream item is ignored and the flow terminates.
15678+ * @return the new Flowable instance
15679+ * @since 2.1.14 - experimental
15680+ */
15681+ @Experimental
15682+ @CheckReturnValue
15683+ @BackpressureSupport(BackpressureKind.ERROR)
15684+ @SchedulerSupport(SchedulerSupport.CUSTOM)
15685+ public final Flowable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
15686+ ObjectHelper.requireNonNull(unit, "unit is null");
15687+ ObjectHelper.requireNonNull(scheduler, "scheduler is null");
15688+ return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<T>(this, timeout, unit, scheduler, emitLast));
15689+ }
15690+
1553915691 /**
1554015692 * Returns a Flowable that only emits those items emitted by the source Publisher that are not followed
1554115693 * by another emitted item within a specified time window.
0 commit comments