Skip to content

Commit 4fc1b92

Browse files
Merge pull request #1132 from benjchristensen/merge-prs
Manual Merge of Several PRs
2 parents 95e0636 + b604a32 commit 4fc1b92

File tree

119 files changed

+4521
-5598
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

119 files changed

+4521
-5598
lines changed

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java

Lines changed: 21 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2013 Netflix, Inc.
2+
* Copyright 2014 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
55
* use this file except in compliance with the License. You may obtain a copy of
@@ -54,10 +54,10 @@
5454
import rx.subjects.Subject;
5555
import rx.subscriptions.SerialSubscription;
5656
import rx.util.async.operators.Functionals;
57-
import rx.util.async.operators.OperationDeferFuture;
58-
import rx.util.async.operators.OperationForEachFuture;
59-
import rx.util.async.operators.OperationFromFunctionals;
60-
import rx.util.async.operators.OperationStartFuture;
57+
import rx.util.async.operators.OperatorDeferFuture;
58+
import rx.util.async.operators.OperatorForEachFuture;
59+
import rx.util.async.operators.OperatorFromFunctionals;
60+
import rx.util.async.operators.OperatorStartFuture;
6161

6262
/**
6363
* Utility methods to convert functions and actions into asynchronous operations
@@ -1377,7 +1377,7 @@ public static <R> FuncN<Observable<R>> asyncFunc(final FuncN<? extends R> func,
13771377
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#startfuture">RxJava Wiki: startFuture()</a>
13781378
*/
13791379
public static <T> Observable<T> startFuture(Func0<? extends Future<? extends T>> functionAsync) {
1380-
return OperationStartFuture.startFuture(functionAsync);
1380+
return OperatorStartFuture.startFuture(functionAsync);
13811381
}
13821382

13831383
/**
@@ -1395,7 +1395,7 @@ public static <T> Observable<T> startFuture(Func0<? extends Future<? extends T>>
13951395
*/
13961396
public static <T> Observable<T> startFuture(Func0<? extends Future<? extends T>> functionAsync,
13971397
Scheduler scheduler) {
1398-
return OperationStartFuture.startFuture(functionAsync, scheduler);
1398+
return OperatorStartFuture.startFuture(functionAsync, scheduler);
13991399
}
14001400

14011401
/**
@@ -1416,7 +1416,7 @@ public static <T> Observable<T> startFuture(Func0<? extends Future<? extends T>>
14161416
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#deferfuture">RxJava Wiki: deferFuture()</a>
14171417
*/
14181418
public static <T> Observable<T> deferFuture(Func0<? extends Future<? extends Observable<? extends T>>> observableFactoryAsync) {
1419-
return OperationDeferFuture.deferFuture(observableFactoryAsync);
1419+
return OperatorDeferFuture.deferFuture(observableFactoryAsync);
14201420
}
14211421

14221422
/**
@@ -1437,7 +1437,7 @@ public static <T> Observable<T> deferFuture(Func0<? extends Future<? extends Obs
14371437
public static <T> Observable<T> deferFuture(
14381438
Func0<? extends Future<? extends Observable<? extends T>>> observableFactoryAsync,
14391439
Scheduler scheduler) {
1440-
return OperationDeferFuture.deferFuture(observableFactoryAsync, scheduler);
1440+
return OperatorDeferFuture.deferFuture(observableFactoryAsync, scheduler);
14411441
}
14421442

14431443
/**
@@ -1453,13 +1453,13 @@ public static <T> Observable<T> deferFuture(
14531453
* @param source the source Observable
14541454
* @param onNext the action to call with each emitted element
14551455
* @return the Future representing the entire for-each operation
1456-
* @see #forEachFuture(rx.functions.Action1, rx.Scheduler)
1456+
* @see #forEachFuture(rx.Observable, rx.functions.Action1, rx.Scheduler)
14571457
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#foreachfuture">RxJava Wiki: forEachFuture()</a>
14581458
*/
14591459
public static <T> FutureTask<Void> forEachFuture(
14601460
Observable<? extends T> source,
14611461
Action1<? super T> onNext) {
1462-
return OperationForEachFuture.forEachFuture(source, onNext);
1462+
return OperatorForEachFuture.forEachFuture(source, onNext);
14631463
}
14641464

14651465

@@ -1477,14 +1477,14 @@ public static <T> FutureTask<Void> forEachFuture(
14771477
* @param onNext the action to call with each emitted element
14781478
* @param onError the action to call when an exception is emitted
14791479
* @return the Future representing the entire for-each operation
1480-
* @see #forEachFuture(rx.functions.Action1, rx.functions.Action1, rx.Scheduler)
1480+
* @see #forEachFuture(rx.Observable, rx.functions.Action1, rx.functions.Action1, rx.Scheduler)
14811481
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#foreachfuture">RxJava Wiki: forEachFuture()</a>
14821482
*/
14831483
public static <T> FutureTask<Void> forEachFuture(
14841484
Observable<? extends T> source,
14851485
Action1<? super T> onNext,
14861486
Action1<? super Throwable> onError) {
1487-
return OperationForEachFuture.forEachFuture(source, onNext, onError);
1487+
return OperatorForEachFuture.forEachFuture(source, onNext, onError);
14881488
}
14891489

14901490

@@ -1503,15 +1503,15 @@ public static <T> FutureTask<Void> forEachFuture(
15031503
* @param onError the action to call when an exception is emitted
15041504
* @param onCompleted the action to call when the source completes
15051505
* @return the Future representing the entire for-each operation
1506-
* @see #forEachFuture(rx.functions.Action1, rx.functions.Action1, rx.functions.Action0, rx.Scheduler)
1506+
* @see #forEachFuture(rx.Observable, rx.functions.Action1, rx.functions.Action1, rx.functions.Action0, rx.Scheduler)
15071507
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#foreachfuture">RxJava Wiki: forEachFuture()</a>
15081508
*/
15091509
public static <T> FutureTask<Void> forEachFuture(
15101510
Observable<? extends T> source,
15111511
Action1<? super T> onNext,
15121512
Action1<? super Throwable> onError,
15131513
Action0 onCompleted) {
1514-
return OperationForEachFuture.forEachFuture(source, onNext, onError, onCompleted);
1514+
return OperatorForEachFuture.forEachFuture(source, onNext, onError, onCompleted);
15151515
}
15161516

15171517

@@ -1534,7 +1534,7 @@ public static <T> FutureTask<Void> forEachFuture(
15341534
Observable<? extends T> source,
15351535
Action1<? super T> onNext,
15361536
Scheduler scheduler) {
1537-
FutureTask<Void> task = OperationForEachFuture.forEachFuture(source, onNext);
1537+
FutureTask<Void> task = OperatorForEachFuture.forEachFuture(source, onNext);
15381538
final Worker inner = scheduler.createWorker();
15391539
inner.schedule(Functionals.fromRunnable(task, inner));
15401540
return task;
@@ -1562,7 +1562,7 @@ public static <T> FutureTask<Void> forEachFuture(
15621562
Action1<? super T> onNext,
15631563
Action1<? super Throwable> onError,
15641564
Scheduler scheduler) {
1565-
FutureTask<Void> task = OperationForEachFuture.forEachFuture(source, onNext, onError);
1565+
FutureTask<Void> task = OperatorForEachFuture.forEachFuture(source, onNext, onError);
15661566
final Worker inner = scheduler.createWorker();
15671567
inner.schedule(Functionals.fromRunnable(task, inner));
15681568
return task;
@@ -1592,7 +1592,7 @@ public static <T> FutureTask<Void> forEachFuture(
15921592
Action1<? super Throwable> onError,
15931593
Action0 onCompleted,
15941594
Scheduler scheduler) {
1595-
FutureTask<Void> task = OperationForEachFuture.forEachFuture(source, onNext, onError, onCompleted);
1595+
FutureTask<Void> task = OperatorForEachFuture.forEachFuture(source, onNext, onError, onCompleted);
15961596
final Worker inner = scheduler.createWorker();
15971597
inner.schedule(Functionals.fromRunnable(task, inner));
15981598
return task;
@@ -1617,30 +1617,6 @@ public static <R> Observable<R> fromAction(Action0 action, R result) {
16171617
return fromAction(action, result, Schedulers.computation());
16181618
}
16191619

1620-
/**
1621-
* Return an Observable that calls the given function and emits its
1622-
* result when an Observer subscribes.
1623-
* <p>
1624-
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/fromFunc0.png">
1625-
* <p>
1626-
* The function is called on the default thread pool for computation.
1627-
*
1628-
* @param <R> the return type
1629-
* @param function the function to call on each subscription
1630-
* @return an Observable that calls the given function and emits its
1631-
* result when an Observer subscribes
1632-
* @see #start(rx.functions.Func0)
1633-
* @see #fromCallable(java.util.concurrent.Callable)
1634-
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromfunc0">RxJava Wiki: fromFunc0()</a>
1635-
*
1636-
* @deprecated Unnecessary now that Func0 extends Callable. Just call
1637-
* {@link #fromCallable(Callable)} instead.
1638-
*/
1639-
@Deprecated
1640-
public static <R> Observable<R> fromFunc0(Func0<? extends R> function) {
1641-
return fromCallable(function);
1642-
}
1643-
16441620
/**
16451621
* Return an Observable that calls the given Callable and emits its
16461622
* result or Exception when an Observer subscribes.
@@ -1654,7 +1630,6 @@ public static <R> Observable<R> fromFunc0(Func0<? extends R> function) {
16541630
* @return an Observable that calls the given Callable and emits its
16551631
* result or Exception when an Observer subscribes
16561632
* @see #start(rx.functions.Func0)
1657-
* @see #fromFunc0(rx.functions.Func0)
16581633
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromcallable">RxJava Wiki: fromCallable()</a>
16591634
*/
16601635
public static <R> Observable<R> fromCallable(Callable<? extends R> callable) {
@@ -1696,33 +1671,9 @@ public static <R> Observable<R> fromRunnable(final Runnable run, final R result)
16961671
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromaction">RxJava Wiki: fromAction()</a>
16971672
*/
16981673
public static <R> Observable<R> fromAction(Action0 action, R result, Scheduler scheduler) {
1699-
return Observable.create(OperationFromFunctionals.fromAction(action, result)).subscribeOn(scheduler);
1674+
return Observable.create(OperatorFromFunctionals.fromAction(action, result)).subscribeOn(scheduler);
17001675
}
17011676

1702-
/**
1703-
* Return an Observable that calls the given function and emits its
1704-
* result when an Observer subscribes.
1705-
* <p>
1706-
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/fromFunc0.s.png">
1707-
*
1708-
* @param <R> the return type
1709-
* @param function the function to call on each subscription
1710-
* @param scheduler the scheduler where the function is called and the
1711-
* result is emitted
1712-
* @return an Observable that calls the given function and emits its
1713-
* result when an Observer subscribes
1714-
* @see #start(rx.functions.Func0)
1715-
* @see #fromCallable(java.util.concurrent.Callable)
1716-
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromfunc0">RxJava Wiki: fromFunc0()</a>
1717-
*
1718-
* @deprecated Unnecessary now that Func0 extends Callable. Just call
1719-
* {@link #fromCallable(Callable, Scheduler)} instead.
1720-
*/
1721-
@Deprecated
1722-
public static <R> Observable<R> fromFunc0(Func0<? extends R> function, Scheduler scheduler) {
1723-
return fromCallable(function, scheduler);
1724-
}
1725-
17261677
/**
17271678
* Return an Observable that calls the given Callable and emits its
17281679
* result or Exception when an Observer subscribes.
@@ -1736,11 +1687,10 @@ public static <R> Observable<R> fromFunc0(Func0<? extends R> function, Scheduler
17361687
* @return an Observable that calls the given Callable and emits its
17371688
* result or Exception when an Observer subscribes
17381689
* @see #start(rx.functions.Func0)
1739-
* @see #fromFunc0(rx.functions.Func0)
17401690
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromcallable">RxJava Wiki: fromCallable()</a>
17411691
*/
17421692
public static <R> Observable<R> fromCallable(Callable<? extends R> callable, Scheduler scheduler) {
1743-
return Observable.create(OperationFromFunctionals.fromCallable(callable)).subscribeOn(scheduler);
1693+
return Observable.create(OperatorFromFunctionals.fromCallable(callable)).subscribeOn(scheduler);
17441694
}
17451695

17461696
/**
@@ -1759,7 +1709,7 @@ public static <R> Observable<R> fromCallable(Callable<? extends R> callable, Sch
17591709
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromrunnable">RxJava Wiki: fromRunnable()</a>
17601710
*/
17611711
public static <R> Observable<R> fromRunnable(final Runnable run, final R result, Scheduler scheduler) {
1762-
return Observable.create(OperationFromFunctionals.fromRunnable(run, result)).subscribeOn(scheduler);
1712+
return Observable.create(OperatorFromFunctionals.fromRunnable(run, result)).subscribeOn(scheduler);
17631713
}
17641714
/**
17651715
* Runs the provided action on the given scheduler and allows propagation

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/Functionals.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2013 Netflix, Inc.
2+
* Copyright 2014 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/LatchedObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2013 Netflix, Inc.
2+
* Copyright 2014 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationDeferFuture.java renamed to rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperatorDeferFuture.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2013 Netflix, Inc.
2+
* Copyright 2014 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,9 +24,9 @@
2424
/**
2525
* Defer the execution of a factory method which produces an observable sequence.
2626
*/
27-
public final class OperationDeferFuture {
27+
public final class OperatorDeferFuture {
2828
/** Utility class. */
29-
private OperationDeferFuture() { throw new IllegalStateException("No instances!"); }
29+
private OperatorDeferFuture() { throw new IllegalStateException("No instances!"); }
3030

3131
/**
3232
* Returns an observable sequence that starts the specified asynchronous
@@ -49,7 +49,7 @@ public DeferFutureFunc0(Func0<? extends Future<? extends Observable<? extends T>
4949

5050
@Override
5151
public Observable<T> call() {
52-
return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync));
52+
return Observable.merge(OperatorStartFuture.startFuture(observableFactoryAsync));
5353
}
5454

5555
}
@@ -81,7 +81,7 @@ public DeferFutureFunc0Scheduled(Func0<? extends Future<? extends Observable<? e
8181

8282
@Override
8383
public Observable<T> call() {
84-
return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync, scheduler));
84+
return Observable.merge(OperatorStartFuture.startFuture(observableFactoryAsync, scheduler));
8585
}
8686

8787
}

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationForEachFuture.java renamed to rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperatorForEachFuture.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2013 Netflix, Inc.
2+
* Copyright 2014 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,9 +29,9 @@
2929
* <p>
3030
* Remark: the cancellation token version's behavior is in doubt, so left out.
3131
*/
32-
public final class OperationForEachFuture {
32+
public final class OperatorForEachFuture {
3333
/** Utility class. */
34-
private OperationForEachFuture() { throw new IllegalStateException("No instances!"); }
34+
private OperatorForEachFuture() { throw new IllegalStateException("No instances!"); }
3535

3636
/**
3737
* Subscribes to the given source and calls the callback for each emitted item,
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2013 Netflix, Inc.
2+
* Copyright 2014 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,48 +17,35 @@
1717

1818
import java.util.concurrent.Callable;
1919

20-
import rx.Observable.OnSubscribeFunc;
21-
import rx.Observer;
22-
import rx.Subscription;
20+
import rx.Observable.OnSubscribe;
21+
import rx.Subscriber;
2322
import rx.functions.Action0;
2423
import rx.functions.Actions;
2524
import rx.functions.Func0;
26-
import rx.subscriptions.Subscriptions;
2725

2826
/**
2927
* Operators that invoke a function or action if
3028
* an observer subscribes.
3129
* Asynchrony can be achieved by using subscribeOn or observeOn.
3230
*/
33-
public final class OperationFromFunctionals {
31+
public final class OperatorFromFunctionals {
3432
/** Utility class. */
35-
private OperationFromFunctionals() { throw new IllegalStateException("No instances!"); }
33+
private OperatorFromFunctionals() { throw new IllegalStateException("No instances!"); }
3634

3735
/** Subscriber function that invokes an action and returns the given result. */
38-
public static <R> OnSubscribeFunc<R> fromAction(Action0 action, R result) {
36+
public static <R> OnSubscribe<R> fromAction(Action0 action, R result) {
3937
return new InvokeAsync<R>(Actions.toFunc(action, result));
4038
}
4139

42-
/**
43-
* Subscriber function that invokes a function and returns its value.
44-
*
45-
* @deprecated Unnecessary now that Func0 extends Callable. Just call
46-
* {@link #fromCallable(Callable)} instead.
47-
*/
48-
@Deprecated
49-
public static <R> OnSubscribeFunc<R> fromFunc0(Func0<? extends R> function) {
50-
return fromCallable(function);
51-
}
52-
5340
/**
5441
* Subscriber function that invokes the callable and returns its value or
5542
* propagates its checked exception.
5643
*/
57-
public static <R> OnSubscribeFunc<R> fromCallable(Callable<? extends R> callable) {
44+
public static <R> OnSubscribe<R> fromCallable(Callable<? extends R> callable) {
5845
return new InvokeAsync<R>(callable);
5946
}
6047
/** Subscriber function that invokes a runnable and returns the given result. */
61-
public static <R> OnSubscribeFunc<R> fromRunnable(final Runnable run, final R result) {
48+
public static <R> OnSubscribe<R> fromRunnable(final Runnable run, final R result) {
6249
return new InvokeAsync<R>(new Func0<R>() {
6350
@Override
6451
public R call() {
@@ -72,7 +59,7 @@ public R call() {
7259
* Invokes a java.util.concurrent.Callable when an observer subscribes.
7360
* @param <R> the return type
7461
*/
75-
static final class InvokeAsync<R> implements OnSubscribeFunc<R> {
62+
static final class InvokeAsync<R> implements OnSubscribe<R> {
7663
final Callable<? extends R> callable;
7764
public InvokeAsync(Callable<? extends R> callable) {
7865
if (callable == null) {
@@ -81,16 +68,14 @@ public InvokeAsync(Callable<? extends R> callable) {
8168
this.callable = callable;
8269
}
8370
@Override
84-
public Subscription onSubscribe(Observer<? super R> t1) {
85-
Subscription s = Subscriptions.empty();
71+
public void call(Subscriber<? super R> t1) {
8672
try {
8773
t1.onNext(callable.call());
8874
} catch (Throwable t) {
8975
t1.onError(t);
90-
return s;
76+
return;
9177
}
9278
t1.onCompleted();
93-
return s;
9479
}
9580
}
9681
}

0 commit comments

Comments
 (0)