Skip to content

Commit 27b81ef

Browse files
committed
Debounce with selector
1 parent fe438ca commit 27b81ef

File tree

3 files changed

+260
-1
lines changed

3 files changed

+260
-1
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2266,6 +2266,20 @@ public Observable<T> debounce(long timeout, TimeUnit unit) {
22662266
return create(OperationDebounce.debounce(this, timeout, unit));
22672267
}
22682268

2269+
/**
2270+
* Create an Observable that ignores elements from this observable
2271+
* sequence which are followed by another value within a computed
2272+
* debounce duration.
2273+
* @param <U> the debounce value type (ignored)
2274+
* @param debounceSelector function to retrieve a sequence indicating the throttle duration for each given element.
2275+
* @return an Observable that ignores elements from this observable
2276+
* sequence which are followed by another value within a computed
2277+
* debounce duration
2278+
*/
2279+
public <U> Observable<T> debounce(Func1<? super T, ? extends Observable<U>> debounceSelector) {
2280+
return create(OperationDebounce.debounceSelector(this, debounceSelector));
2281+
}
2282+
22692283
/**
22702284
* Drops items emitted by an Observable that are followed by newer items
22712285
* before a timeout value expires. The timer resets on each emission.

rxjava-core/src/main/java/rx/operators/OperationDebounce.java

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import rx.Scheduler;
2525
import rx.Subscription;
2626
import rx.schedulers.Schedulers;
27+
import rx.subscriptions.CompositeSubscription;
28+
import rx.subscriptions.SerialSubscription;
2729
import rx.util.functions.Action0;
2830
import rx.util.functions.Func1;
2931

@@ -151,4 +153,162 @@ public void call() {
151153
}
152154
}
153155
}
156+
157+
/**
158+
* Delay the emission via another observable if no new source appears in the meantime.
159+
*/
160+
public static <T, U> OnSubscribeFunc<T> debounceSelector(
161+
Observable<? extends T> source,
162+
Func1<? super T, ? extends Observable<U>> debounceSelector) {
163+
return new DebounceSelector<T, U>(source, debounceSelector);
164+
}
165+
166+
/**
167+
* Delay the emission via another observable if no new source appears in the meantime.
168+
*/
169+
private static final class DebounceSelector<T, U> implements OnSubscribeFunc<T> {
170+
final Observable<? extends T> source;
171+
final Func1<? super T, ? extends Observable<U>> debounceSelector;
172+
173+
public DebounceSelector(Observable<? extends T> source, Func1<? super T, ? extends Observable<U>> debounceSelector) {
174+
this.source = source;
175+
this.debounceSelector = debounceSelector;
176+
}
177+
178+
@Override
179+
public Subscription onSubscribe(Observer<? super T> t1) {
180+
CompositeSubscription csub = new CompositeSubscription();
181+
182+
csub.add(source.subscribe(new SourceObserver<T, U>(t1, debounceSelector, csub)));
183+
184+
return csub;
185+
}
186+
187+
/** Observe the source. */
188+
private static final class SourceObserver<T, U> implements Observer<T> {
189+
final Observer<? super T> observer;
190+
final Func1<? super T, ? extends Observable<U>> debounceSelector;
191+
final CompositeSubscription cancel;
192+
final SerialSubscription ssub = new SerialSubscription();
193+
long index;
194+
T value;
195+
boolean hasValue;
196+
final Object guard;
197+
198+
public SourceObserver(
199+
Observer<? super T> observer,
200+
Func1<? super T, ? extends Observable<U>> debounceSelector,
201+
CompositeSubscription cancel) {
202+
this.observer = observer;
203+
this.debounceSelector = debounceSelector;
204+
this.cancel = cancel;
205+
this.cancel.add(ssub);
206+
this.guard = new Object();
207+
}
208+
209+
@Override
210+
public void onNext(T args) {
211+
Observable<U> o;
212+
try {
213+
o = debounceSelector.call(args);
214+
} catch (Throwable t) {
215+
synchronized (guard) {
216+
observer.onError(t);
217+
}
218+
cancel.unsubscribe();
219+
return;
220+
}
221+
long currentIndex;
222+
synchronized (guard) {
223+
hasValue = true;
224+
value = args;
225+
currentIndex = ++index;
226+
}
227+
228+
SerialSubscription osub = new SerialSubscription();
229+
ssub.set(osub);
230+
231+
osub.set(o.subscribe(new DebounceObserver<T, U>(this, osub, args, currentIndex)));
232+
}
233+
234+
@Override
235+
public void onError(Throwable e) {
236+
ssub.unsubscribe();
237+
try {
238+
synchronized (guard) {
239+
observer.onError(e);
240+
hasValue = false;
241+
value = null;
242+
index++;
243+
}
244+
} finally {
245+
cancel.unsubscribe();
246+
}
247+
}
248+
249+
@Override
250+
public void onCompleted() {
251+
ssub.unsubscribe();
252+
try {
253+
synchronized (guard) {
254+
if (hasValue) {
255+
try {
256+
observer.onNext(value);
257+
} catch (Throwable t) {
258+
observer.onError(t);
259+
return;
260+
}
261+
}
262+
observer.onCompleted();
263+
hasValue = false;
264+
value = null;
265+
index++;
266+
}
267+
} finally {
268+
cancel.unsubscribe();
269+
}
270+
}
271+
}
272+
/**
273+
* The debounce observer.
274+
*/
275+
private static final class DebounceObserver<T, U> implements Observer<U> {
276+
final SourceObserver<T, U> parent;
277+
final Subscription cancel;
278+
final T value;
279+
final long currentIndex;
280+
281+
public DebounceObserver(SourceObserver<T, U> parent, Subscription cancel, T value, long currentIndex) {
282+
this.parent = parent;
283+
this.cancel = cancel;
284+
this.value = value;
285+
this.currentIndex = currentIndex;
286+
}
287+
288+
@Override
289+
public void onNext(U args) {
290+
onCompleted();
291+
}
292+
293+
@Override
294+
public void onError(Throwable e) {
295+
synchronized (parent.guard) {
296+
parent.observer.onError(e);
297+
}
298+
parent.cancel.unsubscribe();
299+
}
300+
301+
@Override
302+
public void onCompleted() {
303+
synchronized (parent.guard) {
304+
if (parent.hasValue && parent.index == currentIndex) {
305+
parent.observer.onNext(value);
306+
}
307+
parent.hasValue = false;
308+
}
309+
cancel.unsubscribe();
310+
}
311+
312+
}
313+
}
154314
}

rxjava-core/src/test/java/rx/operators/OperationDebounceTest.java

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.mockito.Matchers.*;
1918
import static org.mockito.Mockito.*;
2019

2120
import java.util.concurrent.TimeUnit;
@@ -28,8 +27,10 @@
2827
import rx.Observer;
2928
import rx.Subscription;
3029
import rx.schedulers.TestScheduler;
30+
import rx.subjects.PublishSubject;
3131
import rx.subscriptions.Subscriptions;
3232
import rx.util.functions.Action0;
33+
import rx.util.functions.Func1;
3334

3435
public class OperationDebounceTest {
3536

@@ -158,4 +159,88 @@ public void call() {
158159
@SuppressWarnings("serial")
159160
private class TestException extends Exception {
160161
}
162+
163+
@Test
164+
public void debounceSelectorNormal1() {
165+
PublishSubject<Integer> source = PublishSubject.create();
166+
final PublishSubject<Integer> debouncer = PublishSubject.create();
167+
Func1<Integer, Observable<Integer>> debounceSel = new Func1<Integer, Observable<Integer>>() {
168+
169+
@Override
170+
public Observable<Integer> call(Integer t1) {
171+
return debouncer;
172+
}
173+
};
174+
175+
@SuppressWarnings("unchecked")
176+
Observer<Object> o = mock(Observer.class);
177+
InOrder inOrder = inOrder(o);
178+
179+
source.debounce(debounceSel).subscribe(o);
180+
181+
source.onNext(1);
182+
debouncer.onNext(1);
183+
184+
source.onNext(2);
185+
source.onNext(3);
186+
source.onNext(4);
187+
188+
debouncer.onNext(2);
189+
190+
source.onNext(5);
191+
source.onCompleted();
192+
193+
inOrder.verify(o).onNext(1);
194+
inOrder.verify(o).onNext(4);
195+
inOrder.verify(o).onNext(5);
196+
inOrder.verify(o).onCompleted();
197+
198+
verify(o, never()).onError(any(Throwable.class));
199+
}
200+
201+
@Test
202+
public void debounceSelectorFuncThrows() {
203+
PublishSubject<Integer> source = PublishSubject.create();
204+
Func1<Integer, Observable<Integer>> debounceSel = new Func1<Integer, Observable<Integer>>() {
205+
206+
@Override
207+
public Observable<Integer> call(Integer t1) {
208+
throw new OperationReduceTest.CustomException();
209+
}
210+
};
211+
212+
@SuppressWarnings("unchecked")
213+
Observer<Object> o = mock(Observer.class);
214+
215+
source.debounce(debounceSel).subscribe(o);
216+
217+
source.onNext(1);
218+
219+
verify(o, never()).onNext(any());
220+
verify(o, never()).onCompleted();
221+
verify(o).onError(any(OperationReduceTest.CustomException.class));
222+
}
223+
224+
@Test
225+
public void debounceSelectorObservableThrows() {
226+
PublishSubject<Integer> source = PublishSubject.create();
227+
Func1<Integer, Observable<Integer>> debounceSel = new Func1<Integer, Observable<Integer>>() {
228+
229+
@Override
230+
public Observable<Integer> call(Integer t1) {
231+
return Observable.error(new OperationReduceTest.CustomException());
232+
}
233+
};
234+
235+
@SuppressWarnings("unchecked")
236+
Observer<Object> o = mock(Observer.class);
237+
238+
source.debounce(debounceSel).subscribe(o);
239+
240+
source.onNext(1);
241+
242+
verify(o, never()).onNext(any());
243+
verify(o, never()).onCompleted();
244+
verify(o).onError(any(OperationReduceTest.CustomException.class));
245+
}
161246
}

0 commit comments

Comments
 (0)