Skip to content

Commit 60ac55f

Browse files
Bind implementation of Map, Cast, Timestamp
1 parent 71baf0e commit 60ac55f

File tree

10 files changed

+174
-244
lines changed

10 files changed

+174
-244
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import rx.operators.OperationAverage;
4040
import rx.operators.OperationBuffer;
4141
import rx.operators.OperationCache;
42-
import rx.operators.OperationCast;
42+
import rx.operators.OperatorCast;
4343
import rx.operators.OperationCombineLatest;
4444
import rx.operators.OperationConcat;
4545
import rx.operators.OperationDebounce;
@@ -60,7 +60,7 @@
6060
import rx.operators.OperationInterval;
6161
import rx.operators.OperationJoin;
6262
import rx.operators.OperationJoinPatterns;
63-
import rx.operators.OperationMap;
63+
import rx.operators.OperatorMap;
6464
import rx.operators.OperationMaterialize;
6565
import rx.operators.OperationMerge;
6666
import rx.operators.OperationMergeDelayError;
@@ -95,7 +95,7 @@
9595
import rx.operators.OperationTimeInterval;
9696
import rx.operators.OperationTimeout;
9797
import rx.operators.OperationTimer;
98-
import rx.operators.OperationTimestamp;
98+
import rx.operators.OperatorTimestamp;
9999
import rx.operators.OperationToMap;
100100
import rx.operators.OperationToMultimap;
101101
import rx.operators.OperationToObservableFuture;
@@ -4043,7 +4043,7 @@ public final Observable<T> cache() {
40434043
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211842.aspx">MSDN: Observable.Cast</a>
40444044
*/
40454045
public final <R> Observable<R> cast(final Class<R> klass) {
4046-
return create(OperationCast.cast(this, klass));
4046+
return bind(new OperatorCast<T, R>(klass));
40474047
}
40484048

40494049
/**
@@ -5061,7 +5061,7 @@ public final Long call(Long t1, T t2) {
50615061
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244306.aspx">MSDN: Observable.Select</a>
50625062
*/
50635063
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
5064-
return create(OperationMap.map(this, func));
5064+
return bind(new OperatorMap<T, R>(func));
50655065
}
50665066

50675067
/**
@@ -5089,6 +5089,7 @@ public final <R> Observable<R> mapMany(Func1<? super T, ? extends Observable<? e
50895089
}
50905090

50915091
/**
5092+
<<<<<<< HEAD
50925093
* Returns an Observable that applies the specified function to each item emitted by an
50935094
* Observable and emits the results of these function applications.
50945095
* <p>
@@ -5111,6 +5112,10 @@ public final <R> Observable<R> mapWithIndex(Func2<? super T, Integer, ? extends
51115112
/**
51125113
* Turns all of the emissions and notifications from a source Observable into emissions marked
51135114
* with their original types within {@link Notification} objects.
5115+
=======
5116+
* Turns all of the emissions and notifications from a source Observable
5117+
* into emissions marked with their original types within {@link Notification} objects.
5118+
>>>>>>> Bind implementation of Map, Cast, Timestamp
51145119
* <p>
51155120
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/materialize.png">
51165121
*
@@ -8157,7 +8162,7 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler sc
81578162
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229003.aspx">MSDN: Observable.Timestamp</a>
81588163
*/
81598164
public final Observable<Timestamped<T>> timestamp() {
8160-
return create(OperationTimestamp.timestamp(this));
8165+
return timestamp(Schedulers.immediate());
81618166
}
81628167

81638168
/**
@@ -8174,7 +8179,7 @@ public final Observable<Timestamped<T>> timestamp() {
81748179
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229003.aspx">MSDN: Observable.Timestamp</a>
81758180
*/
81768181
public final Observable<Timestamped<T>> timestamp(Scheduler scheduler) {
8177-
return create(OperationTimestamp.timestamp(this, scheduler));
8182+
return bind(new OperatorTimestamp<T>(scheduler));
81788183
}
81798184

81808185
/**

rxjava-core/src/main/java/rx/operators/OperationCast.java

Lines changed: 0 additions & 35 deletions
This file was deleted.

rxjava-core/src/main/java/rx/operators/OperationMap.java

Lines changed: 0 additions & 121 deletions
This file was deleted.
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable.OperatorSubscription;
19+
import rx.Observer;
20+
import rx.util.functions.Func2;
21+
22+
/**
23+
* Converts the elements of an observable sequence to the specified type.
24+
*/
25+
public class OperatorCast<T, R> implements Func2<Observer<? super R>, OperatorSubscription, Observer<? super T>> {
26+
27+
private final Class<R> castClass;
28+
29+
public OperatorCast(Class<R> castClass) {
30+
this.castClass = castClass;
31+
}
32+
33+
@Override
34+
public Observer<? super T> call(final Observer<? super R> o, OperatorSubscription os) {
35+
return new Observer<T>() {
36+
37+
@Override
38+
public void onCompleted() {
39+
o.onCompleted();
40+
}
41+
42+
@Override
43+
public void onError(Throwable e) {
44+
o.onError(e);
45+
}
46+
47+
@Override
48+
public void onNext(T t) {
49+
o.onNext(castClass.cast(t));
50+
}
51+
};
52+
}
53+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable.OperatorSubscription;
19+
import rx.Observer;
20+
import rx.util.functions.Func1;
21+
import rx.util.functions.Func2;
22+
23+
/**
24+
* Applies a function of your choosing to every item emitted by an Observable, and returns this
25+
* transformation as a new Observable.
26+
* <p>
27+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/map.png">
28+
*/
29+
public final class OperatorMap<T, R> implements Func2<Observer<? super R>, OperatorSubscription, Observer<? super T>> {
30+
31+
private final Func1<? super T, ? extends R> transformer;
32+
33+
public OperatorMap(Func1<? super T, ? extends R> transformer) {
34+
this.transformer = transformer;
35+
}
36+
37+
@Override
38+
public Observer<? super T> call(final Observer<? super R> o, OperatorSubscription os) {
39+
return new Observer<T>() {
40+
41+
@Override
42+
public void onCompleted() {
43+
o.onCompleted();
44+
}
45+
46+
@Override
47+
public void onError(Throwable e) {
48+
o.onError(e);
49+
}
50+
51+
@Override
52+
public void onNext(T t) {
53+
try {
54+
o.onNext(transformer.call(t));
55+
} catch (Throwable e) {
56+
onError(e);
57+
}
58+
}
59+
60+
};
61+
}
62+
63+
}

0 commit comments

Comments
 (0)