Skip to content

Commit 1d1066b

Browse files
Added Observable.bind
- new create signature - new bind operator - new subscribe overload - OperationReplay is failing unit tests, all others are passing
1 parent f17e934 commit 1d1066b

14 files changed

+285
-81
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 202 additions & 34 deletions
Large diffs are not rendered by default.

rxjava-core/src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import rx.Observer;
2020
import rx.Subscription;
2121
import rx.operators.OperationRefCount;
22+
import rx.util.functions.Action2;
2223

2324
/**
2425
* A ConnectableObservable resembles an ordinary {@link Observable}, except that it does not begin
@@ -37,7 +38,7 @@
3738

3839
public abstract class ConnectableObservable<T> extends Observable<T> {
3940

40-
protected ConnectableObservable(OnSubscribeFunc<T> onSubscribe) {
41+
protected ConnectableObservable(Action2<Observer<? super T>, OperatorSubscription> onSubscribe) {
4142
super(onSubscribe);
4243
}
4344

rxjava-core/src/main/java/rx/observables/GroupedObservable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package rx.observables;
1717

1818
import rx.Observable;
19+
import rx.Observer;
20+
import rx.util.functions.Action2;
1921
import rx.util.functions.Func1;
2022

2123
/**
@@ -31,7 +33,7 @@
3133
public class GroupedObservable<K, T> extends Observable<T> {
3234
private final K key;
3335

34-
public GroupedObservable(K key, OnSubscribeFunc<T> onSubscribe) {
36+
public GroupedObservable(K key, Action2<Observer<? super T>, OperatorSubscription> onSubscribe) {
3537
super(onSubscribe);
3638
this.key = key;
3739
}

rxjava-core/src/main/java/rx/operators/OperationGroupBy.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222

2323
import rx.Observable;
2424
import rx.Observable.OnSubscribeFunc;
25+
import rx.Observable.OperatorSubscription;
2526
import rx.Observer;
2627
import rx.Subscription;
2728
import rx.observables.GroupedObservable;
29+
import rx.util.functions.Action2;
2830
import rx.util.functions.Func1;
2931
import rx.util.functions.Functions;
3032

@@ -162,18 +164,16 @@ private static class GroupedSubject<K, T> extends GroupedObservable<K, T> implem
162164

163165
static <K, T> GroupedSubject<K, T> create(final K key, final GroupBy<K, T> parent) {
164166
final AtomicReference<Observer<? super T>> subscribedObserver = new AtomicReference<Observer<? super T>>(OperationGroupBy.<T> emptyObserver());
165-
return new GroupedSubject<K, T>(key, new OnSubscribeFunc<T>() {
166-
167-
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
167+
return new GroupedSubject<K, T>(key, new Action2<Observer<? super T>, OperatorSubscription>() {
168168

169169
@Override
170-
public Subscription onSubscribe(Observer<? super T> observer) {
170+
public void call(Observer<? super T> observer, OperatorSubscription os) {
171171
// register Observer
172172
subscribedObserver.set(observer);
173173

174174
parent.subscribeKey(key);
175175

176-
return subscription.wrap(new Subscription() {
176+
os.add(new Subscription() {
177177
@Override
178178
public void unsubscribe() {
179179
// we remove the Observer so we stop emitting further events (they will be ignored if parent continues to send)
@@ -188,7 +188,7 @@ public void unsubscribe() {
188188

189189
private final AtomicReference<Observer<? super T>> subscribedObserver;
190190

191-
public GroupedSubject(K key, OnSubscribeFunc<T> onSubscribe, AtomicReference<Observer<? super T>> subscribedObserver) {
191+
public GroupedSubject(K key, Action2<Observer<? super T>, OperatorSubscription> onSubscribe, AtomicReference<Observer<? super T>> subscribedObserver) {
192192
super(key, onSubscribe);
193193
this.subscribedObserver = subscribedObserver;
194194
}

rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import rx.Observable;
2424
import rx.Observable.OnSubscribeFunc;
25+
import rx.Observable.OperatorSubscription;
2526
import rx.Observer;
2627
import rx.Subscription;
2728
import rx.observables.GroupedObservable;
@@ -30,6 +31,8 @@
3031
import rx.subscriptions.CompositeSubscription;
3132
import rx.subscriptions.SerialSubscription;
3233
import rx.subscriptions.Subscriptions;
34+
import rx.util.functions.Action2;
35+
import rx.util.functions.Func0;
3336
import rx.util.functions.Func1;
3437

3538
/**
@@ -212,10 +215,16 @@ public static class GroupSubject<K, V> extends GroupedObservable<K, V> implement
212215
protected final Subject<V, V> publish;
213216

214217
public GroupSubject(K key, final Subject<V, V> publish) {
215-
super(key, new OnSubscribeFunc<V>() {
218+
super(key, new Action2<Observer<? super V>, OperatorSubscription>() {
216219
@Override
217-
public Subscription onSubscribe(Observer<? super V> o) {
218-
return publish.subscribe(o);
220+
public void call(Observer<? super V> o, final OperatorSubscription os) {
221+
publish.subscribe(o, new Func0<OperatorSubscription>() {
222+
223+
@Override
224+
public OperatorSubscription call() {
225+
return os;
226+
}
227+
});
219228
}
220229
});
221230
this.publish = publish;

rxjava-core/src/main/java/rx/operators/OperationMulticast.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import rx.subjects.Subject;
2424
import rx.subscriptions.CompositeSubscription;
2525
import rx.subscriptions.Subscriptions;
26+
import rx.util.functions.Action2;
2627
import rx.util.functions.Func0;
2728
import rx.util.functions.Func1;
2829

@@ -40,10 +41,15 @@ private static class MulticastConnectableObservable<T, R> extends ConnectableObs
4041
private Subscription subscription;
4142

4243
public MulticastConnectableObservable(Observable<? extends T> source, final Subject<? super T, ? extends R> subject) {
43-
super(new OnSubscribeFunc<R>() {
44+
super(new Action2<Observer<? super R>, OperatorSubscription>() {
4445
@Override
45-
public Subscription onSubscribe(Observer<? super R> observer) {
46-
return subject.subscribe(observer);
46+
public void call(Observer<? super R> observer, final OperatorSubscription os) {
47+
subject.subscribe(observer, new Func0<OperatorSubscription>() {
48+
49+
@Override
50+
public rx.Observable.OperatorSubscription call() {
51+
return os;
52+
}});
4753
}
4854
});
4955
this.source = source;

rxjava-core/src/main/java/rx/operators/OperationReplay.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,15 @@
2626
import java.util.concurrent.locks.ReentrantLock;
2727

2828
import rx.Observable;
29-
import rx.Observable.OnSubscribeFunc;
29+
import rx.Observable.OperatorSubscription;
3030
import rx.Observer;
3131
import rx.Scheduler;
3232
import rx.Subscription;
3333
import rx.subjects.Subject;
34-
import rx.subscriptions.Subscriptions;
3534
import rx.util.Timestamped;
3635
import rx.util.functions.Action0;
36+
import rx.util.functions.Action2;
37+
import rx.util.functions.Func0;
3738
import rx.util.functions.Func1;
3839
import rx.util.functions.Functions;
3940

@@ -137,11 +138,18 @@ public void call() {
137138
/**
138139
* Return an OnSubscribeFunc which delegates the subscription to the given observable.
139140
*/
140-
public static <T> OnSubscribeFunc<T> subscriberOf(final Observable<T> target) {
141-
return new OnSubscribeFunc<T>() {
141+
public static <T> Action2<Observer<? super T>, OperatorSubscription> subscriberOf(final Observable<T> target) {
142+
return new Action2<Observer<? super T>, OperatorSubscription>() {
142143
@Override
143-
public Subscription onSubscribe(Observer<? super T> t1) {
144-
return target.subscribe(t1);
144+
public void call(Observer<? super T> t1, final OperatorSubscription os) {
145+
target.subscribe(t1, new Func0<OperatorSubscription>() {
146+
147+
@Override
148+
public OperatorSubscription call() {
149+
return os;
150+
}
151+
152+
});
145153
}
146154
};
147155
}
@@ -154,7 +162,7 @@ public static final class MappingSubject<T, R> extends Subject<T, R> {
154162
private final Subject<R, R> subject;
155163
private final Func1<T, R> selector;
156164

157-
public MappingSubject(OnSubscribeFunc<R> func, Subject<R, R> subject, Func1<T, R> selector) {
165+
public MappingSubject(Action2<Observer<? super R>, OperatorSubscription> func, Subject<R, R> subject, Func1<T, R> selector) {
158166
super(func);
159167
this.subject = subject;
160168
this.selector = selector;
@@ -184,7 +192,7 @@ public static final class SubjectWrapper<T> extends Subject<T, T> {
184192
/** The wrapped subject. */
185193
final Subject<T, T> subject;
186194

187-
public SubjectWrapper(OnSubscribeFunc<T> func, Subject<T, T> subject) {
195+
public SubjectWrapper(Action2<Observer<? super T>, OperatorSubscription> func, Subject<T, T> subject) {
188196
super(func);
189197
this.subject = subject;
190198
}
@@ -708,7 +716,7 @@ public static <T> CustomReplaySubject<T, T, T> create(int maxSize) {
708716
protected final Func1<? super TInput, ? extends TIntermediate> intermediateSelector;
709717

710718
private CustomReplaySubject(
711-
Observable.OnSubscribeFunc<TResult> onSubscribe,
719+
Action2<Observer<? super TResult>, OperatorSubscription> onSubscribe,
712720
ReplayState<TIntermediate, TResult> state,
713721
Func1<? super TInput, ? extends TIntermediate> intermediateSelector) {
714722
super(onSubscribe);
@@ -782,7 +790,7 @@ protected void replayValues() {
782790
* the value type of the observers subscribing to this subject
783791
*/
784792
protected static final class CustomReplaySubjectSubscribeFunc<TIntermediate, TResult>
785-
implements Observable.OnSubscribeFunc<TResult> {
793+
implements Action2<Observer<? super TResult>, OperatorSubscription> {
786794

787795
private final ReplayState<TIntermediate, TResult> state;
788796

@@ -791,14 +799,14 @@ protected CustomReplaySubjectSubscribeFunc(ReplayState<TIntermediate, TResult> s
791799
}
792800

793801
@Override
794-
public Subscription onSubscribe(Observer<? super TResult> t1) {
802+
public void call(Observer<? super TResult> t1, OperatorSubscription os) {
795803
VirtualList<TIntermediate> values;
796804
Throwable error;
797805
state.lock();
798806
try {
799807
if (!state.done) {
800808
state.onSubscription.call();
801-
return state.addReplayer(t1);
809+
os.add(state.addReplayer(t1));
802810
}
803811
values = state.values;
804812
error = state.error;
@@ -811,15 +819,15 @@ public Subscription onSubscribe(Observer<? super TResult> t1) {
811819
t1.onNext(state.resultSelector.call(values.get(i)));
812820
} catch (Throwable t) {
813821
t1.onError(t);
814-
return Subscriptions.empty();
822+
return;
815823
}
816824
}
817825
if (error != null) {
818826
t1.onError(error);
819827
} else {
820828
t1.onCompleted();
821829
}
822-
return Subscriptions.empty();
830+
return;
823831
}
824832
}
825833
}

rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
import rx.Observable;
1919
import rx.Observable.OnSubscribeFunc;
20+
import rx.Observable.OperatorSubscription;
2021
import rx.Observer;
2122
import rx.Subscription;
23+
import rx.util.functions.Action2;
2224
import rx.util.functions.Func1;
2325

2426
/**
@@ -52,6 +54,11 @@ public <T> OnSubscribeFunc<T> onSubscribeStart(Observable<? extends T> observabl
5254
// pass-thru by default
5355
return onSubscribe;
5456
}
57+
58+
public <T> Action2<Observer<? super T>, OperatorSubscription> onSubscribeStart(Observable<? extends T> observableInstance, final Action2<Observer<? super T>, OperatorSubscription> onSubscribe) {
59+
// pass-thru by default
60+
return onSubscribe;
61+
}
5562

5663
/**
5764
* Invoked after successful execution of {@link Observable#subscribe(rx.Observer)} with returned {@link Subscription}.

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import rx.Observer;
2323
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2424
import rx.util.functions.Action1;
25+
import rx.util.functions.Action2;
2526

2627
/**
2728
* Subject that publishes only the last event to each {@link Observer} that has subscribed when the
@@ -58,7 +59,7 @@ public static <T> AsyncSubject<T> create() {
5859
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
5960
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>(new Notification<T>());
6061

61-
OnSubscribeFunc<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
62+
Action2<Observer<? super T>, OperatorSubscription> onSubscribe = subscriptionManager.getOnSubscribeFunc(
6263
/**
6364
* This function executes at beginning of subscription.
6465
*
@@ -97,7 +98,7 @@ protected static <T> void emitValueToObserver(Notification<T> n, Observer<? supe
9798
private final SubjectSubscriptionManager<T> subscriptionManager;
9899
final AtomicReference<Notification<T>> lastNotification;
99100

100-
protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
101+
protected AsyncSubject(Action2<Observer<? super T>, OperatorSubscription> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
101102
super(onSubscribe);
102103
this.subscriptionManager = subscriptionManager;
103104
this.lastNotification = lastNotification;

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import rx.Observer;
2323
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2424
import rx.util.functions.Action1;
25+
import rx.util.functions.Action2;
2526

2627
/**
2728
* Subject that publishes the most recent and all subsequent events to each subscribed {@link Observer}.
@@ -90,7 +91,7 @@ public static <T> BehaviorSubject<T> create(T defaultValue) {
9091
// set a default value so subscriptions will immediately receive this until a new notification is received
9192
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>(new Notification<T>(defaultValue));
9293

93-
OnSubscribeFunc<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
94+
Action2<Observer<? super T>, OperatorSubscription> onSubscribe = subscriptionManager.getOnSubscribeFunc(
9495
/**
9596
* This function executes at beginning of subscription.
9697
*
@@ -132,7 +133,7 @@ public void call(SubjectObserver<? super T> o) {
132133
private final SubjectSubscriptionManager<T> subscriptionManager;
133134
final AtomicReference<Notification<T>> lastNotification;
134135

135-
protected BehaviorSubject(OnSubscribeFunc<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
136+
protected BehaviorSubject(Action2<Observer<? super T>, OperatorSubscription> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
136137
super(onSubscribe);
137138
this.subscriptionManager = subscriptionManager;
138139
this.lastNotification = lastNotification;

0 commit comments

Comments
 (0)