Skip to content

Commit 0f92fdd

Browse files
committed
RunAsync method for outputting multiple values
1 parent dc15e2b commit 0f92fdd

File tree

3 files changed

+142
-1
lines changed

3 files changed

+142
-1
lines changed

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,16 @@
2020
import java.util.concurrent.FutureTask;
2121

2222
import rx.Observable;
23+
import rx.Observer;
2324
import rx.Scheduler;
2425
import rx.Scheduler.Inner;
26+
import rx.Subscriber;
27+
import rx.Subscription;
2528
import rx.schedulers.Schedulers;
2629
import rx.subjects.AsyncSubject;
30+
import rx.subjects.PublishSubject;
31+
import rx.subjects.Subject;
32+
import rx.subscriptions.SerialSubscription;
2733
import rx.util.async.operators.Functionals;
2834
import rx.util.async.operators.OperationDeferFuture;
2935
import rx.util.async.operators.OperationForEachFuture;
@@ -1711,4 +1717,54 @@ public static <R> Observable<R> fromCallable(Callable<? extends R> callable, Sch
17111717
public static <R> Observable<R> fromRunnable(final Runnable run, final R result, Scheduler scheduler) {
17121718
return Observable.create(OperationFromFunctionals.fromRunnable(run, result)).subscribeOn(scheduler);
17131719
}
1720+
/**
1721+
* Runs the provided action on the given scheduler and allows propagation
1722+
* of multiple events to the observers of the returned StoppableObservable.
1723+
* The action is immediately executed and unobserved values will be lost.
1724+
* @param <T> the output value type
1725+
* @param scheduler the scheduler where the action is executed
1726+
* @param action the action to execute, receives an Observer where the events can be pumped
1727+
* and a Subscription which lets check for cancellation condition.
1728+
* @return an Observable which provides a Subscription interface to cancel the action
1729+
*/
1730+
public static <T> StoppableObservable<T> runAsync(Scheduler scheduler,
1731+
final Action2<? super Observer<? super T>, ? super Subscription> action) {
1732+
return runAsync(scheduler, PublishSubject.<T>create(), action);
1733+
}
1734+
/**
1735+
* Runs the provided action on the given scheduler and allows propagation
1736+
* of multiple events to the observers of the returned StoppableObservable.
1737+
* The action is immediately executed and unobserved values might be lost,
1738+
* depending on the subject type used.
1739+
* @param <T> the output value of the action
1740+
* @param <U> the output type of the observable sequence
1741+
* @param scheduler the scheduler where the action is executed
1742+
* @param subject the subject to use to distribute values emitted by the action
1743+
* @param action the action to execute, receives an Observer where the events can be pumped
1744+
* and a Subscription which lets check for cancellation condition.
1745+
* @return an Observable which provides a Subscription interface to cancel the action
1746+
*/
1747+
public static <T, U> StoppableObservable<U> runAsync(Scheduler scheduler,
1748+
final Subject<T, U> subject,
1749+
final Action2<? super Observer<? super T>, ? super Subscription> action) {
1750+
final SerialSubscription csub = new SerialSubscription();
1751+
1752+
StoppableObservable<U> co = new StoppableObservable<U>(new Observable.OnSubscribe<U>() {
1753+
@Override
1754+
public void call(Subscriber<? super U> t1) {
1755+
subject.subscribe(t1);
1756+
}
1757+
}, csub);
1758+
1759+
csub.set(scheduler.schedule(new Action1<Inner>() {
1760+
@Override
1761+
public void call(Inner t1) {
1762+
if (!csub.isUnsubscribed()) {
1763+
action.call(subject, csub);
1764+
}
1765+
}
1766+
}));
1767+
1768+
return co;
1769+
}
17141770
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.util.async;
17+
18+
import rx.Observable;
19+
import rx.Subscription;
20+
21+
/**
22+
* An Observable which provides a Subscription interface to signal a stop
23+
* condition to an asynchronous task.
24+
*/
25+
public class StoppableObservable<T> extends Observable<T> implements Subscription {
26+
private final Subscription token;
27+
public StoppableObservable(Observable.OnSubscribe<T> onSubscribe, Subscription token) {
28+
super(onSubscribe);
29+
this.token = token;
30+
}
31+
32+
@Override
33+
public boolean isUnsubscribed() {
34+
return token.isUnsubscribed();
35+
}
36+
37+
@Override
38+
public void unsubscribe() {
39+
token.unsubscribe();
40+
}
41+
}

rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/AsyncTest.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package rx.util.async;
1818

19+
import java.util.concurrent.CountDownLatch;
1920
import static org.junit.Assert.*;
20-
import static org.mockito.Matchers.*;
2121
import static org.mockito.Mockito.*;
2222

2323
import java.util.concurrent.TimeUnit;
@@ -35,6 +35,7 @@
3535

3636
import rx.Observable;
3737
import rx.Observer;
38+
import rx.Subscription;
3839
import rx.observers.TestObserver;
3940
import rx.schedulers.Schedulers;
4041
import rx.schedulers.TestScheduler;
@@ -818,4 +819,47 @@ public String answer(InvocationOnMock invocation) throws Throwable {
818819
verify(func, times(1)).call();
819820
}
820821

822+
@Test
823+
public void testRunAsync() throws InterruptedException {
824+
final CountDownLatch cdl = new CountDownLatch(1);
825+
final CountDownLatch cdl2 = new CountDownLatch(1);
826+
Action2<Observer<? super Integer>, Subscription> action = new Action2<Observer<? super Integer>, Subscription>() {
827+
@Override
828+
public void call(Observer<? super Integer> t1, Subscription t2) {
829+
try {
830+
cdl.await();
831+
} catch (InterruptedException ex) {
832+
Thread.currentThread().interrupt();
833+
return;
834+
}
835+
for (int i = 0; i < 10 && !t2.isUnsubscribed(); i++) {
836+
t1.onNext(i);
837+
}
838+
t1.onCompleted();
839+
cdl2.countDown();
840+
}
841+
};
842+
843+
@SuppressWarnings("unchecked")
844+
Observer<Object> o = mock(Observer.class);
845+
InOrder inOrder = inOrder(o);
846+
847+
StoppableObservable<Integer> so = Async.<Integer>runAsync(Schedulers.io(), action);
848+
849+
so.subscribe(o);
850+
851+
cdl.countDown();
852+
853+
if (!cdl2.await(2, TimeUnit.SECONDS)) {
854+
fail("Didn't complete");
855+
}
856+
857+
for (int i = 0; i < 10; i++) {
858+
inOrder.verify(o).onNext(i);
859+
}
860+
inOrder.verify(o).onCompleted();
861+
inOrder.verifyNoMoreInteractions();
862+
verify(o, never()).onError(any(Throwable.class));
863+
864+
}
821865
}

0 commit comments

Comments
 (0)