Skip to content

Commit 60af29e

Browse files
Subscriber and Observer
-> Restore Observer interface -> Subscriber implements Observer, Subscription
1 parent 47a0e1b commit 60af29e

File tree

108 files changed

+1696
-1386
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

108 files changed

+1696
-1386
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 137 additions & 117 deletions
Large diffs are not rendered by default.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx;
17+
18+
/**
19+
* Provides a mechanism for receiving push-based notifications.
20+
* <p>
21+
* After an Observer calls an {@link Observable}'s <code>Observable.subscribe</code> method, the {@link Observable} calls the
22+
* Observer's <code>onNext</code> method to provide notifications. A well-behaved {@link Observable} will call an Observer's
23+
* <code>onCompleted</code> closure exactly once or the Observer's <code>onError</code> closure exactly once.
24+
* <p>
25+
* For more information see the <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava Wiki</a>
26+
*
27+
* @param <T>
28+
*/
29+
public interface Observer<T> {
30+
31+
/**
32+
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
33+
* <p>
34+
* The {@link Observable} will not call this closure if it calls <code>onError</code>.
35+
*/
36+
public abstract void onCompleted();
37+
38+
/**
39+
* Notifies the Observer that the {@link Observable} has experienced an error condition.
40+
* <p>
41+
* If the {@link Observable} calls this closure, it will not thereafter call <code>onNext</code> or <code>onCompleted</code>.
42+
*
43+
* @param e
44+
*/
45+
public abstract void onError(Throwable e);
46+
47+
/**
48+
* Provides the Observer with new data.
49+
* <p>
50+
* The {@link Observable} calls this closure 1 or more times, unless it calls <code>onError</code> in which case this closure may never be called.
51+
* <p>
52+
* The {@link Observable} will not call this closure again after it calls either <code>onCompleted</code> or <code>onError</code>.
53+
*
54+
* @param args
55+
*/
56+
public abstract void onNext(T t);
57+
58+
}

rxjava-core/src/main/java/rx/Subscriber.java

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@
2020
/**
2121
* Provides a mechanism for receiving push-based notifications.
2222
* <p>
23-
* After an Observer calls an {@link Observable}'s <code>Observable.subscribe</code> method, the {@link Observable} calls the
24-
* Observer's <code>onNext</code> method to provide notifications. A well-behaved {@link Observable} will call an Observer's
23+
* After an Observer calls an {@link Observable}'s <code>Observable.subscribe</code> method, the {@link Observable} calls the
24+
* Observer's <code>onNext</code> method to provide notifications. A well-behaved {@link Observable} will call an Observer's
2525
* <code>onCompleted</code> closure exactly once or the Observer's <code>onError</code> closure exactly once.
2626
* <p>
2727
* For more information see the <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava Wiki</a>
2828
*
2929
* @param <T>
3030
*/
31-
public abstract class Subscriber<T> implements Subscription {
31+
public abstract class Subscriber<T> implements Observer<T>, Subscription {
3232

3333
private final CompositeSubscription cs;
3434

@@ -47,32 +47,26 @@ protected Subscriber(Subscriber<?> op) {
4747
this(op.cs);
4848
}
4949

50-
/**
51-
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
52-
* <p>
53-
* The {@link Observable} will not call this closure if it calls <code>onError</code>.
54-
*/
55-
public abstract void onCompleted();
50+
public static <T> Subscriber<T> from(final Observer<? super T> o) {
51+
return new Subscriber<T>() {
5652

57-
/**
58-
* Notifies the Observer that the {@link Observable} has experienced an error condition.
59-
* <p>
60-
* If the {@link Observable} calls this closure, it will not thereafter call <code>onNext</code> or <code>onCompleted</code>.
61-
*
62-
* @param e
63-
*/
64-
public abstract void onError(Throwable e);
53+
@Override
54+
public void onCompleted() {
55+
o.onCompleted();
56+
}
6557

66-
/**
67-
* Provides the Observer with new data.
68-
* <p>
69-
* The {@link Observable} calls this closure 1 or more times, unless it calls <code>onError</code> in which case this closure may never be called.
70-
* <p>
71-
* The {@link Observable} will not call this closure again after it calls either <code>onCompleted</code> or <code>onError</code>.
72-
*
73-
* @param args
74-
*/
75-
public abstract void onNext(T t);
58+
@Override
59+
public void onError(Throwable e) {
60+
o.onError(e);
61+
}
62+
63+
@Override
64+
public void onNext(T t) {
65+
o.onNext(t);
66+
}
67+
68+
};
69+
}
7670

7771
/**
7872
* Used to register an unsubscribe callback.

rxjava-core/src/main/java/rx/joins/JoinObserver1.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import rx.Notification;
2525
import rx.Observable;
2626
import rx.Subscriber;
27-
import rx.observers.SafeObserver;
27+
import rx.observers.SafeSubscriber;
2828
import rx.operators.SafeObservableSubscription;
2929
import rx.util.functions.Action1;
3030

@@ -38,14 +38,14 @@ public final class JoinObserver1<T> extends Subscriber<Notification<T>> implemen
3838
private final List<ActivePlan0> activePlans;
3939
private final Queue<Notification<T>> queue;
4040
private final AtomicBoolean subscribed = new AtomicBoolean(false);
41-
private final SafeObserver<Notification<T>> safeObserver;
41+
private final SafeSubscriber<Notification<T>> safeObserver;
4242

4343
public JoinObserver1(Observable<T> source, Action1<Throwable> onError) {
4444
this.source = source;
4545
this.onError = onError;
4646
queue = new LinkedList<Notification<T>>();
4747
activePlans = new ArrayList<ActivePlan0>();
48-
safeObserver = new SafeObserver<Notification<T>>(new InnerObserver());
48+
safeObserver = new SafeSubscriber<Notification<T>>(new InnerObserver());
4949
// add this subscription so it gets unsubscribed when the parent does
5050
add(safeObserver);
5151
}

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import rx.Observable;
2424
import rx.Subscriber;
2525
import rx.Subscription;
26-
import rx.observers.SafeObserver;
26+
import rx.observers.SafeSubscriber;
2727
import rx.operators.OperationLatest;
2828
import rx.operators.OperationMostRecent;
2929
import rx.operators.OperationNext;
@@ -73,7 +73,7 @@ public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
7373
* "Guideline 6.4: Protect calls to user code from within an operator"
7474
*/
7575
private Subscription protectivelyWrapAndSubscribe(Subscriber<? super T> observer) {
76-
return o.subscribe(new SafeObserver<T>(observer));
76+
return o.subscribe(new SafeSubscriber<T>(observer));
7777
}
7878

7979
/**

rxjava-core/src/main/java/rx/observers/EmptyObserver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package rx.observers;
22

3-
import rx.Subscriber;
3+
import rx.Observer;
4+
45

56
/**
67
* Observer that does nothing... including swallowing errors.
78
*/
8-
public class EmptyObserver<T> extends Subscriber<T> {
9+
public class EmptyObserver<T> implements Observer<T> {
910

1011
@Override
1112
public void onCompleted() {

rxjava-core/src/main/java/rx/observers/Observers.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package rx.observers;
22

3-
import rx.Subscriber;
3+
import rx.Observer;
44
import rx.util.OnErrorNotImplementedException;
55
import rx.util.functions.Action0;
66
import rx.util.functions.Action1;
@@ -10,8 +10,8 @@ public class Observers {
1010
/**
1111
* Create an empty Observer that ignores all events.
1212
*/
13-
public static final <T> Subscriber<T> create() {
14-
return new Subscriber<T>() {
13+
public static final <T> Observer<T> create() {
14+
return new Observer<T>() {
1515

1616
@Override
1717
public final void onCompleted() {
@@ -34,12 +34,12 @@ public final void onNext(T args) {
3434
/**
3535
* Create an Observer that receives `onNext` and ignores `onError` and `onCompleted`.
3636
*/
37-
public static final <T> Subscriber<T> create(final Action1<? super T> onNext) {
37+
public static final <T> Observer<T> create(final Action1<? super T> onNext) {
3838
if (onNext == null) {
3939
throw new IllegalArgumentException("onNext can not be null");
4040
}
4141

42-
return new Subscriber<T>() {
42+
return new Observer<T>() {
4343

4444
@Override
4545
public final void onCompleted() {
@@ -63,15 +63,15 @@ public final void onNext(T args) {
6363
* Create an Observer that receives `onNext` and `onError` and ignores `onCompleted`.
6464
*
6565
*/
66-
public static final <T> Subscriber<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError) {
66+
public static final <T> Observer<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError) {
6767
if (onNext == null) {
6868
throw new IllegalArgumentException("onNext can not be null");
6969
}
7070
if (onError == null) {
7171
throw new IllegalArgumentException("onError can not be null");
7272
}
7373

74-
return new Subscriber<T>() {
74+
return new Observer<T>() {
7575

7676
@Override
7777
public final void onCompleted() {
@@ -95,7 +95,7 @@ public final void onNext(T args) {
9595
* Create an Observer that receives `onNext`, `onError` and `onCompleted`.
9696
*
9797
*/
98-
public static final <T> Subscriber<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
98+
public static final <T> Observer<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
9999
if (onNext == null) {
100100
throw new IllegalArgumentException("onNext can not be null");
101101
}
@@ -106,7 +106,7 @@ public static final <T> Subscriber<T> create(final Action1<? super T> onNext, fi
106106
throw new IllegalArgumentException("onComplete can not be null");
107107
}
108108

109-
return new Subscriber<T>() {
109+
return new Observer<T>() {
110110

111111
@Override
112112
public final void onCompleted() {

rxjava-core/src/main/java/rx/observers/SafeObserver.java renamed to rxjava-core/src/main/java/rx/observers/SafeSubscriber.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,12 @@
5858
*
5959
* @param <T>
6060
*/
61-
public class SafeObserver<T> extends Subscriber<T> {
61+
public class SafeSubscriber<T> extends Subscriber<T> {
6262

6363
private final Subscriber<? super T> actual;
6464
private final AtomicBoolean isFinished = new AtomicBoolean(false);
6565

66-
public SafeObserver(Subscriber<? super T> actual) {
66+
public SafeSubscriber(Subscriber<? super T> actual) {
6767
super(actual);
6868
this.actual = actual;
6969
}

rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.observers;
1717

18+
import rx.Observer;
1819
import rx.Subscriber;
1920
import rx.operators.SafeObservableSubscription;
2021

@@ -30,7 +31,7 @@
3031
*
3132
* @param <T>
3233
*/
33-
public final class SynchronizedObserver<T> extends Subscriber<T> {
34+
public final class SynchronizedObserver<T> implements Observer<T> {
3435

3536
/**
3637
* Intrinsic synchronized locking with double-check short-circuiting was chosen after testing several other implementations.
@@ -52,25 +53,31 @@ public final class SynchronizedObserver<T> extends Subscriber<T> {
5253
private volatile boolean finished = false;
5354
private volatile Object lock;
5455

55-
public SynchronizedObserver(Subscriber<? super T> Observer, SafeObservableSubscription subscription) {
56-
this.observer = Observer;
56+
public SynchronizedObserver(Subscriber<? super T> subscriber, SafeObservableSubscription subscription) {
57+
this.observer = subscriber;
5758
this.subscription = subscription;
5859
this.lock = this;
5960
}
6061

61-
public SynchronizedObserver(Subscriber<? super T> Observer, SafeObservableSubscription subscription, Object lock) {
62-
this.observer = Observer;
62+
public SynchronizedObserver(Subscriber<? super T> subscriber, SafeObservableSubscription subscription, Object lock) {
63+
this.observer = subscriber;
6364
this.subscription = subscription;
6465
this.lock = lock;
6566
}
67+
68+
public SynchronizedObserver(Subscriber<? super T> subscriber, Object lock) {
69+
this.observer = subscriber;
70+
this.subscription = new SafeObservableSubscription(subscriber);
71+
this.lock = lock;
72+
}
6673

6774
/**
6875
* Used when synchronizing an Observer without access to the subscription.
6976
*
7077
* @param Observer
7178
*/
72-
public SynchronizedObserver(Subscriber<? super T> Observer) {
73-
this(Observer, new SafeObservableSubscription());
79+
public SynchronizedObserver(Subscriber<? super T> subscriber) {
80+
this(subscriber, new SafeObservableSubscription());
7481
}
7582

7683
public void onNext(T arg) {
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.observers;
17+
18+
import rx.Observer;
19+
import rx.Subscriber;
20+
import rx.operators.SafeObservableSubscription;
21+
22+
/**
23+
* A thread-safe Observer for transitioning states in operators.
24+
* <p>
25+
* Execution rules are:
26+
* <ul>
27+
* <li>Allow only single-threaded, synchronous, ordered execution of onNext, onCompleted, onError</li>
28+
* <li>Once an onComplete or onError are performed, no further calls can be executed</li>
29+
* <li>If unsubscribe is called, this means we call completed() and don't allow any further onNext calls.</li>
30+
* </ul>
31+
*
32+
* @param <T>
33+
*/
34+
public final class SynchronizedSubscriber<T> extends Subscriber<T> {
35+
36+
private final Observer<? super T> observer;
37+
38+
public SynchronizedSubscriber(Subscriber<? super T> subscriber, Object lock) {
39+
super(subscriber);
40+
SafeObservableSubscription s = new SafeObservableSubscription();
41+
subscriber.add(s);
42+
this.observer = new SynchronizedObserver<T>(subscriber, s, lock);
43+
}
44+
45+
/**
46+
* Used when synchronizing an Subscriber without access to the subscription.
47+
*
48+
* @param Observer
49+
*/
50+
public SynchronizedSubscriber(Subscriber<? super T> subscriber) {
51+
this(subscriber, new Object());
52+
}
53+
54+
@Override
55+
public void onCompleted() {
56+
observer.onCompleted();
57+
}
58+
59+
@Override
60+
public void onError(Throwable e) {
61+
observer.onError(e);
62+
}
63+
64+
@Override
65+
public void onNext(T t) {
66+
observer.onNext(t);
67+
}
68+
69+
}

0 commit comments

Comments
 (0)