Skip to content

Commit 71baf0e

Browse files
Bind implementation of fromIterable, toList, toSortedList
1 parent 2ac7ec4 commit 71baf0e

13 files changed

+496
-369
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@
8888
import rx.operators.OperationSum;
8989
import rx.operators.OperationSwitch;
9090
import rx.operators.OperationSynchronize;
91-
import rx.operators.OperatorTake;
9291
import rx.operators.OperationTakeLast;
9392
import rx.operators.OperationTakeUntil;
9493
import rx.operators.OperationTakeWhile;
@@ -100,13 +99,14 @@
10099
import rx.operators.OperationToMap;
101100
import rx.operators.OperationToMultimap;
102101
import rx.operators.OperationToObservableFuture;
103-
import rx.operators.OperationToObservableIterable;
104-
import rx.operators.OperationToObservableList;
105-
import rx.operators.OperationToObservableSortedList;
106102
import rx.operators.OperationUsing;
107103
import rx.operators.OperationWindow;
108104
import rx.operators.OperationZip;
105+
import rx.operators.OperatorFromIterable;
106+
import rx.operators.OperatorTake;
109107
import rx.operators.OperatorTakeTimed;
108+
import rx.operators.OperatorToObservableList;
109+
import rx.operators.OperatorToObservableSortedList;
110110
import rx.operators.SafeObservableSubscription;
111111
import rx.operators.SafeObserver;
112112
import rx.plugins.RxJavaObservableExecutionHook;
@@ -159,6 +159,22 @@
159159
*/
160160
public class Observable<T> {
161161

162+
final Action2<Observer<? super T>, OperatorSubscription> f;
163+
164+
/**
165+
* Observable with Function to execute when subscribed to.
166+
* <p>
167+
* NOTE: Use {@link #create(OnSubscribeFunc)} to create an Observable
168+
* instead of this constructor unless you specifically have a need for
169+
* inheritance.
170+
*
171+
* @param onSubscribe
172+
* {@link OnSubscribeFunc} to be executed when {@link #subscribe(Observer)} is called
173+
*/
174+
protected Observable(Action2<Observer<? super T>, OperatorSubscription> f) {
175+
this.f = f;
176+
}
177+
162178
/**
163179
<<<<<<< HEAD
164180
* Function interface for work to be performed when an Observable is subscribed to via
@@ -198,11 +214,10 @@ public void add(Subscription s) {
198214

199215
}
200216

201-
final Action2<Observer<? super T>, OperatorSubscription> f;
202-
203217
private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
204218

205219
/**
220+
<<<<<<< HEAD
206221
* Observable with Function to execute when subscribed to.
207222
* <p>
208223
* <em>Note:</em> Use {@link #create(OnSubscribeFunc)} to create an Observable, instead of this
@@ -219,6 +234,8 @@ protected Observable(Action2<Observer<? super T>, OperatorSubscription> f) {
219234
<<<<<<< HEAD
220235
* Mirror the one Observable in an Iterable of several Observables that first emits an item.
221236
=======
237+
=======
238+
>>>>>>> Bind implementation of fromIterable, toList, toSortedList
222239
* Creates an Observable that will execute the given function when an {@link Observer} subscribes to it.
223240
* <p>
224241
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/create.png">
@@ -257,7 +274,7 @@ public void call(Observer<? super T> o, OperatorSubscription s) {
257274

258275
});
259276
}
260-
277+
261278
public <R> Observable<R> bind(final Func2<Observer<? super R>, OperatorSubscription, Observer<? super T>> bind) {
262279
return new Observable<R>(new Action2<Observer<? super R>, OperatorSubscription>() {
263280

@@ -1277,7 +1294,7 @@ public final static <T> Observable<T> from(Future<? extends T> future, Scheduler
12771294
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
12781295
*/
12791296
public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
1280-
return from(iterable, Schedulers.immediate());
1297+
return create(new OperatorFromIterable<T>(iterable));
12811298
}
12821299

12831300
/**
@@ -1299,7 +1316,7 @@ public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
12991316
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212140.aspx">MSDN: Observable.ToObservable</a>
13001317
*/
13011318
public final static <T> Observable<T> from(Iterable<? extends T> iterable, Scheduler scheduler) {
1302-
return create(OperationToObservableIterable.toObservableIterable(iterable, scheduler));
1319+
return create(new OperatorFromIterable<T>(iterable)).subscribeOn(scheduler);
13031320
}
13041321

13051322
/**
@@ -1644,8 +1661,9 @@ public final static <T> Observable<T> from(T t1, T t2, T t3, T t4, T t5, T t6, T
16441661
* @return an Observable that emits each item in the source Array
16451662
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
16461663
*/
1647-
public final static <T> Observable<T> from(T[] items) {
1648-
return from(Arrays.asList(items));
1664+
@SafeVarargs
1665+
public final static <T> Observable<T> from(T... t1) {
1666+
return from(Arrays.asList(t1));
16491667
}
16501668

16511669
/**
@@ -8190,7 +8208,7 @@ public final BlockingObservable<T> toBlockingObservable() {
81908208
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#tolist">RxJava Wiki: toList()</a>
81918209
*/
81928210
public final Observable<List<T>> toList() {
8193-
return create(OperationToObservableList.toObservableList(this));
8211+
return bind(new OperatorToObservableList<T>());
81948212
}
81958213

81968214
/**
@@ -8364,7 +8382,7 @@ public final <K, V> Observable<Map<K, Collection<V>>> toMultimap(Func1<? super T
83648382
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#tosortedlist">RxJava Wiki: toSortedList()</a>
83658383
*/
83668384
public final Observable<List<T>> toSortedList() {
8367-
return create(OperationToObservableSortedList.toSortedList(this));
8385+
return bind(new OperatorToObservableSortedList<T>());
83688386
}
83698387

83708388
/**
@@ -8381,7 +8399,7 @@ public final Observable<List<T>> toSortedList() {
83818399
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#tosortedlist">RxJava Wiki: toSortedList()</a>
83828400
*/
83838401
public final Observable<List<T>> toSortedList(Func2<? super T, ? super T, Integer> sortFunction) {
8384-
return create(OperationToObservableSortedList.toSortedList(this, sortFunction));
8402+
return bind(new OperatorToObservableSortedList<T>(sortFunction));
83858403
}
83868404

83878405
/**

rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java

Lines changed: 0 additions & 102 deletions
This file was deleted.

rxjava-core/src/main/java/rx/operators/OperationToObservableList.java

Lines changed: 0 additions & 82 deletions
This file was deleted.

0 commit comments

Comments
 (0)