Skip to content

Commit ab94a0b

Browse files
committed
Merge remote-tracking branch 'origin/master' into docs
2 parents 00ca382 + 6756be3 commit ab94a0b

File tree

14 files changed

+1223
-296
lines changed

14 files changed

+1223
-296
lines changed

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,16 @@
2020
import java.util.concurrent.FutureTask;
2121

2222
import rx.Observable;
23+
import rx.Observer;
2324
import rx.Scheduler;
2425
import rx.Scheduler.Inner;
26+
import rx.Subscriber;
27+
import rx.Subscription;
2528
import rx.schedulers.Schedulers;
2629
import rx.subjects.AsyncSubject;
30+
import rx.subjects.PublishSubject;
31+
import rx.subjects.Subject;
32+
import rx.subscriptions.SerialSubscription;
2733
import rx.util.async.operators.Functionals;
2834
import rx.util.async.operators.OperationDeferFuture;
2935
import rx.util.async.operators.OperationForEachFuture;
@@ -1711,4 +1717,54 @@ public static <R> Observable<R> fromCallable(Callable<? extends R> callable, Sch
17111717
public static <R> Observable<R> fromRunnable(final Runnable run, final R result, Scheduler scheduler) {
17121718
return Observable.create(OperationFromFunctionals.fromRunnable(run, result)).subscribeOn(scheduler);
17131719
}
1720+
/**
1721+
* Runs the provided action on the given scheduler and allows propagation
1722+
* of multiple events to the observers of the returned StoppableObservable.
1723+
* The action is immediately executed and unobserved values will be lost.
1724+
* @param <T> the output value type
1725+
* @param scheduler the scheduler where the action is executed
1726+
* @param action the action to execute, receives an Observer where the events can be pumped
1727+
* and a Subscription which lets check for cancellation condition.
1728+
* @return an Observable which provides a Subscription interface to cancel the action
1729+
*/
1730+
public static <T> StoppableObservable<T> runAsync(Scheduler scheduler,
1731+
final Action2<? super Observer<? super T>, ? super Subscription> action) {
1732+
return runAsync(scheduler, PublishSubject.<T>create(), action);
1733+
}
1734+
/**
1735+
* Runs the provided action on the given scheduler and allows propagation
1736+
* of multiple events to the observers of the returned StoppableObservable.
1737+
* The action is immediately executed and unobserved values might be lost,
1738+
* depending on the subject type used.
1739+
* @param <T> the output value of the action
1740+
* @param <U> the output type of the observable sequence
1741+
* @param scheduler the scheduler where the action is executed
1742+
* @param subject the subject to use to distribute values emitted by the action
1743+
* @param action the action to execute, receives an Observer where the events can be pumped
1744+
* and a Subscription which lets check for cancellation condition.
1745+
* @return an Observable which provides a Subscription interface to cancel the action
1746+
*/
1747+
public static <T, U> StoppableObservable<U> runAsync(Scheduler scheduler,
1748+
final Subject<T, U> subject,
1749+
final Action2<? super Observer<? super T>, ? super Subscription> action) {
1750+
final SerialSubscription csub = new SerialSubscription();
1751+
1752+
StoppableObservable<U> co = new StoppableObservable<U>(new Observable.OnSubscribe<U>() {
1753+
@Override
1754+
public void call(Subscriber<? super U> t1) {
1755+
subject.subscribe(t1);
1756+
}
1757+
}, csub);
1758+
1759+
csub.set(scheduler.schedule(new Action1<Inner>() {
1760+
@Override
1761+
public void call(Inner t1) {
1762+
if (!csub.isUnsubscribed()) {
1763+
action.call(subject, csub);
1764+
}
1765+
}
1766+
}));
1767+
1768+
return co;
1769+
}
17141770
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.util.async;
17+
18+
import rx.Observable;
19+
import rx.Subscription;
20+
21+
/**
22+
* An Observable which provides a Subscription interface to signal a stop
23+
* condition to an asynchronous task.
24+
*/
25+
public class StoppableObservable<T> extends Observable<T> implements Subscription {
26+
private final Subscription token;
27+
public StoppableObservable(Observable.OnSubscribe<T> onSubscribe, Subscription token) {
28+
super(onSubscribe);
29+
this.token = token;
30+
}
31+
32+
@Override
33+
public boolean isUnsubscribed() {
34+
return token.isUnsubscribed();
35+
}
36+
37+
@Override
38+
public void unsubscribe() {
39+
token.unsubscribe();
40+
}
41+
}

rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/AsyncTest.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package rx.util.async;
1818

19+
import java.util.concurrent.CountDownLatch;
1920
import static org.junit.Assert.*;
20-
import static org.mockito.Matchers.*;
2121
import static org.mockito.Mockito.*;
2222

2323
import java.util.concurrent.TimeUnit;
@@ -35,6 +35,7 @@
3535

3636
import rx.Observable;
3737
import rx.Observer;
38+
import rx.Subscription;
3839
import rx.observers.TestObserver;
3940
import rx.schedulers.Schedulers;
4041
import rx.schedulers.TestScheduler;
@@ -818,4 +819,47 @@ public String answer(InvocationOnMock invocation) throws Throwable {
818819
verify(func, times(1)).call();
819820
}
820821

822+
@Test
823+
public void testRunAsync() throws InterruptedException {
824+
final CountDownLatch cdl = new CountDownLatch(1);
825+
final CountDownLatch cdl2 = new CountDownLatch(1);
826+
Action2<Observer<? super Integer>, Subscription> action = new Action2<Observer<? super Integer>, Subscription>() {
827+
@Override
828+
public void call(Observer<? super Integer> t1, Subscription t2) {
829+
try {
830+
cdl.await();
831+
} catch (InterruptedException ex) {
832+
Thread.currentThread().interrupt();
833+
return;
834+
}
835+
for (int i = 0; i < 10 && !t2.isUnsubscribed(); i++) {
836+
t1.onNext(i);
837+
}
838+
t1.onCompleted();
839+
cdl2.countDown();
840+
}
841+
};
842+
843+
@SuppressWarnings("unchecked")
844+
Observer<Object> o = mock(Observer.class);
845+
InOrder inOrder = inOrder(o);
846+
847+
StoppableObservable<Integer> so = Async.<Integer>runAsync(Schedulers.io(), action);
848+
849+
so.subscribe(o);
850+
851+
cdl.countDown();
852+
853+
if (!cdl2.await(2, TimeUnit.SECONDS)) {
854+
fail("Didn't complete");
855+
}
856+
857+
for (int i = 0; i < 10; i++) {
858+
inOrder.verify(o).onNext(i);
859+
}
860+
inOrder.verify(o).onCompleted();
861+
inOrder.verifyNoMoreInteractions();
862+
verify(o, never()).onError(any(Throwable.class));
863+
864+
}
821865
}

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797
import rx.operators.OperationToObservableFuture;
9898
import rx.operators.OperationUsing;
9999
import rx.operators.OperationWindow;
100-
import rx.operators.OperationZip;
100+
import rx.operators.OperatorZip;
101101
import rx.operators.OperatorCast;
102102
import rx.operators.OperatorFromIterable;
103103
import rx.operators.OperatorGroupBy;
@@ -108,6 +108,7 @@
108108
import rx.operators.OperatorTimestamp;
109109
import rx.operators.OperatorToObservableList;
110110
import rx.operators.OperatorToObservableSortedList;
111+
import rx.operators.OperatorZipIterable;
111112
import rx.plugins.RxJavaObservableExecutionHook;
112113
import rx.plugins.RxJavaPlugins;
113114
import rx.schedulers.Schedulers;
@@ -1611,11 +1612,9 @@ public final static Observable<Long> interval(long interval, TimeUnit unit, Sche
16111612
* the type of that item
16121613
* @return an Observable that emits {@code value} as a single item and then completes
16131614
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-just">RxJava Wiki: just()</a>
1614-
* @deprecated Use {@link #from(T)}
16151615
*/
1616-
@Deprecated
16171616
public final static <T> Observable<T> just(T value) {
1618-
return from(Arrays.asList((value)));
1617+
return from(Arrays.asList(value));
16191618
}
16201619

16211620
/**
@@ -3024,7 +3023,11 @@ public final static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3
30243023
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
30253024
*/
30263025
public final static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
3027-
return create(OperationZip.zip(ws, zipFunction));
3026+
List<Observable<?>> os = new ArrayList<Observable<?>>();
3027+
for (Observable<?> o : ws) {
3028+
os.add(o);
3029+
}
3030+
return Observable.just(os.toArray(new Observable<?>[os.size()])).lift(new OperatorZip<R>(zipFunction));
30283031
}
30293032

30303033
/**
@@ -3053,12 +3056,14 @@ public final static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws,
30533056
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
30543057
*/
30553058
public final static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction) {
3056-
return ws.toList().mergeMap(new Func1<List<? extends Observable<?>>, Observable<? extends R>>() {
3059+
return ws.toList().map(new Func1<List<? extends Observable<?>>, Observable<?>[]>() {
3060+
30573061
@Override
3058-
public final Observable<R> call(List<? extends Observable<?>> wsList) {
3059-
return create(OperationZip.zip(wsList, zipFunction));
3062+
public Observable<?>[] call(List<? extends Observable<?>> o) {
3063+
return o.toArray(new Observable<?>[o.size()]);
30603064
}
3061-
});
3065+
3066+
}).lift(new OperatorZip<R>(zipFunction));
30623067
}
30633068

30643069
/**
@@ -3084,8 +3089,8 @@ public final Observable<R> call(List<? extends Observable<?>> wsList) {
30843089
* @return an Observable that emits the zipped results
30853090
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
30863091
*/
3087-
public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> zipFunction) {
3088-
return create(OperationZip.zip(o1, o2, zipFunction));
3092+
public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {
3093+
return just(new Observable<?>[] { o1, o2 }).lift(new OperatorZip<R>(zipFunction));
30893094
}
30903095

30913096
/**
@@ -3115,7 +3120,7 @@ public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, O
31153120
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
31163121
*/
31173122
public final static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> zipFunction) {
3118-
return create(OperationZip.zip(o1, o2, o3, zipFunction));
3123+
return just(new Observable<?>[] { o1, o2, o3 }).lift(new OperatorZip<R>(zipFunction));
31193124
}
31203125

31213126
/**
@@ -3147,7 +3152,7 @@ public final static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o
31473152
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
31483153
*/
31493154
public final static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipFunction) {
3150-
return create(OperationZip.zip(o1, o2, o3, o4, zipFunction));
3155+
return just(new Observable<?>[] { o1, o2, o3, o4 }).lift(new OperatorZip<R>(zipFunction));
31513156
}
31523157

31533158
/**
@@ -3181,7 +3186,7 @@ public final static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T
31813186
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
31823187
*/
31833188
public final static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Func5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> zipFunction) {
3184-
return create(OperationZip.zip(o1, o2, o3, o4, o5, zipFunction));
3189+
return just(new Observable<?>[] { o1, o2, o3, o4, o5 }).lift(new OperatorZip<R>(zipFunction));
31853190
}
31863191

31873192
/**
@@ -3217,7 +3222,7 @@ public final static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? exten
32173222
*/
32183223
public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
32193224
Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
3220-
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, zipFunction));
3225+
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6 }).lift(new OperatorZip<R>(zipFunction));
32213226
}
32223227

32233228
/**
@@ -3255,7 +3260,7 @@ public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? e
32553260
*/
32563261
public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7,
32573262
Func7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> zipFunction) {
3258-
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, zipFunction));
3263+
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7 }).lift(new OperatorZip<R>(zipFunction));
32593264
}
32603265

32613266
/**
@@ -3295,7 +3300,7 @@ public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable
32953300
*/
32963301
public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
32973302
Func8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> zipFunction) {
3298-
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, zipFunction));
3303+
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8 }).lift(new OperatorZip<R>(zipFunction));
32993304
}
33003305

33013306
/**
@@ -3337,7 +3342,7 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observ
33373342
*/
33383343
public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
33393344
Observable<? extends T9> o9, Func9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipFunction) {
3340-
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, o9, zipFunction));
3345+
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8, o9 }).lift(new OperatorZip<R>(zipFunction));
33413346
}
33423347

33433348
/**
@@ -8372,7 +8377,7 @@ public final <U> Observable<Observable<T>> window(Observable<U> boundary) {
83728377
* @return an Observable that pairs up values from the source Observable and the {@code other} Iterable sequence and emits the results of {@code zipFunction} applied to these pairs
83738378
*/
83748379
public final <T2, R> Observable<R> zip(Iterable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction) {
8375-
return create(OperationZip.zipIterable(this, other, zipFunction));
8380+
return lift(new OperatorZipIterable<T, T2, R>(other, zipFunction));
83768381
}
83778382

83788383
/**

rxjava-core/src/main/java/rx/Subscriber.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,27 +47,6 @@ protected Subscriber(Subscriber<?> op) {
4747
this(op.cs);
4848
}
4949

50-
public static <T> Subscriber<T> from(final Observer<? super T> o) {
51-
return new Subscriber<T>() {
52-
53-
@Override
54-
public void onCompleted() {
55-
o.onCompleted();
56-
}
57-
58-
@Override
59-
public void onError(Throwable e) {
60-
o.onError(e);
61-
}
62-
63-
@Override
64-
public void onNext(T t) {
65-
o.onNext(t);
66-
}
67-
68-
};
69-
}
70-
7150
/**
7251
* Used to register an unsubscribe callback.
7352
*/

0 commit comments

Comments
 (0)