Skip to content

Commit c298d5e

Browse files
committed
A number of improvements to OperatorObserveFromAndroidComponent
- move the UI thread assert out of the operator and into the helpers; this way, we don't fail the observer anymore with an exception, but the caller. - do not loop unsubscribe through the main thread anymore. This unnecessarily defers releasing the references, and might in fact be processed only after Android creates the component after a rotation change. I had to make the references volatile for this to work. - immediately unsubscribe in case we detect the componentRef has become invalid. This solves the problem that dangling observers would continue to listen to notifications with no observer alive anymore. refs: #754 #899
1 parent cbbf514 commit c298d5e

File tree

9 files changed

+75
-49
lines changed

9 files changed

+75
-49
lines changed

rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ private AndroidObservable() {}
5959
* @return a new observable sequence that will emit notifications on the main UI thread
6060
*/
6161
public static <T> Observable<T> fromActivity(Activity activity, Observable<T> sourceObservable) {
62+
Assertions.assertUiThread();
6263
return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, activity);
6364
}
6465

@@ -87,6 +88,7 @@ public static <T> Observable<T> fromActivity(Activity activity, Observable<T> so
8788
* @return a new observable sequence that will emit notifications on the main UI thread
8889
*/
8990
public static <T> Observable<T> fromFragment(Object fragment, Observable<T> sourceObservable) {
91+
Assertions.assertUiThread();
9092
if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) {
9193
return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, (android.support.v4.app.Fragment) fragment);
9294
} else if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.HONEYCOMB && fragment instanceof Fragment) {
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package rx.android.observables;
2+
3+
import android.os.Looper;
4+
5+
public class Assertions {
6+
public static void assertUiThread() {
7+
if (Looper.getMainLooper() != Looper.myLooper()) {
8+
throw new IllegalStateException("Observers must subscribe from the main UI thread, but was " + Thread.currentThread());
9+
}
10+
}
11+
}

rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/ViewObservable.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515
*/
1616
package rx.android.observables;
1717

18-
import android.os.Looper;
19-
import android.view.View;
20-
import android.widget.CompoundButton;
21-
import android.widget.EditText;
2218
import rx.Observable;
2319
import rx.operators.OperatorCompoundButtonInput;
2420
import rx.operators.OperatorEditTextInput;
2521
import rx.operators.OperatorViewClick;
2622

23+
import android.view.View;
24+
import android.widget.CompoundButton;
25+
import android.widget.EditText;
26+
2727
public class ViewObservable {
2828

2929
public static Observable<View> clicks(final View view, final boolean emitInitialValue) {
@@ -38,10 +38,5 @@ public static Observable<Boolean> input(final CompoundButton button, final boole
3838
return Observable.create(new OperatorCompoundButtonInput(button, emitInitialValue));
3939
}
4040

41-
public static void assertUiThread() {
42-
if (Looper.getMainLooper() != Looper.myLooper()) {
43-
throw new IllegalStateException("Observers must subscribe from the main UI thread, but was " + Thread.currentThread());
44-
}
45-
}
4641
}
4742

rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorCompoundButtonInput.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import rx.Observable;
2424
import rx.Subscriber;
2525
import rx.Subscription;
26-
import rx.android.observables.ViewObservable;
26+
import rx.android.observables.Assertions;
2727
import rx.android.subscriptions.AndroidSubscriptions;
2828
import rx.functions.Action0;
2929
import android.view.View;
@@ -40,7 +40,7 @@ public OperatorCompoundButtonInput(final CompoundButton button, final boolean em
4040

4141
@Override
4242
public void call(final Subscriber<? super Boolean> observer) {
43-
ViewObservable.assertUiThread();
43+
Assertions.assertUiThread();
4444
final CompositeOnCheckedChangeListener composite = CachedListeners.getFromViewOrCreate(button);
4545

4646
final CompoundButton.OnCheckedChangeListener listener = new CompoundButton.OnCheckedChangeListener() {

rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorEditTextInput.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import rx.Observable;
1919
import rx.Subscriber;
2020
import rx.Subscription;
21-
import rx.android.observables.ViewObservable;
21+
import rx.android.observables.Assertions;
2222
import rx.android.subscriptions.AndroidSubscriptions;
2323
import rx.functions.Action0;
2424
import android.text.Editable;
@@ -36,7 +36,7 @@ public OperatorEditTextInput(final EditText input, final boolean emitInitialValu
3636

3737
@Override
3838
public void call(final Subscriber<? super String> observer) {
39-
ViewObservable.assertUiThread();
39+
Assertions.assertUiThread();
4040
final TextWatcher watcher = new SimpleTextWatcher() {
4141
@Override
4242
public void afterTextChanged(final Editable editable) {

rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorObserveFromAndroidComponent.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
import rx.Observer;
2020
import rx.Subscriber;
2121
import rx.android.schedulers.AndroidSchedulers;
22-
import rx.android.subscriptions.AndroidSubscriptions;
2322
import rx.functions.Action0;
23+
import rx.subscriptions.Subscriptions;
24+
2425
import android.app.Activity;
25-
import android.os.Looper;
2626
import android.util.Log;
2727

2828
public class OperatorObserveFromAndroidComponent {
@@ -44,8 +44,8 @@ private static class OnSubscribeBase<T, AndroidComponent> implements Observable.
4444
private static final String LOG_TAG = "AndroidObserver";
4545

4646
private final Observable<T> source;
47-
private AndroidComponent componentRef;
48-
private Observer<? super T> observerRef;
47+
private volatile AndroidComponent componentRef;
48+
private volatile Observer<? super T> observerRef;
4949

5050
private OnSubscribeBase(Observable<T> source, AndroidComponent component) {
5151
this.source = source;
@@ -54,9 +54,9 @@ private OnSubscribeBase(Observable<T> source, AndroidComponent component) {
5454

5555
private void log(String message) {
5656
if (Log.isLoggable(LOG_TAG, Log.DEBUG)) {
57-
Log.d(LOG_TAG, "componentRef = " + componentRef);
58-
Log.d(LOG_TAG, "observerRef = " + observerRef);
59-
Log.d(LOG_TAG, message);
57+
String thread = Thread.currentThread().getName();
58+
Log.d(LOG_TAG, "[" + thread + "] componentRef = " + componentRef + "; observerRef = " + observerRef);
59+
Log.d(LOG_TAG, "[" + thread + "]" + message);
6060
}
6161
}
6262

@@ -65,15 +65,15 @@ protected boolean isComponentValid(AndroidComponent component) {
6565
}
6666

6767
@Override
68-
public void call(Subscriber<? super T> subscriber) {
69-
assertUiThread();
68+
public void call(final Subscriber<? super T> subscriber) {
7069
observerRef = subscriber;
7170
source.observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<T>(subscriber) {
7271
@Override
7372
public void onCompleted() {
7473
if (componentRef != null && isComponentValid(componentRef)) {
7574
observerRef.onCompleted();
7675
} else {
76+
unsubscribe();
7777
log("onComplete: target component released or detached; dropping message");
7878
}
7979
}
@@ -83,6 +83,7 @@ public void onError(Throwable e) {
8383
if (componentRef != null && isComponentValid(componentRef)) {
8484
observerRef.onError(e);
8585
} else {
86+
unsubscribe();
8687
log("onError: target component released or detached; dropping message");
8788
}
8889
}
@@ -92,11 +93,12 @@ public void onNext(T args) {
9293
if (componentRef != null && isComponentValid(componentRef)) {
9394
observerRef.onNext(args);
9495
} else {
96+
unsubscribe();
9597
log("onNext: target component released or detached; dropping message");
9698
}
9799
}
98100
});
99-
subscriber.add(AndroidSubscriptions.unsubscribeInUiThread(new Action0() {
101+
subscriber.add(Subscriptions.create(new Action0() {
100102
@Override
101103
public void call() {
102104
log("unsubscribing from source sequence");
@@ -109,12 +111,6 @@ private void releaseReferences() {
109111
observerRef = null;
110112
componentRef = null;
111113
}
112-
113-
private void assertUiThread() {
114-
if (Looper.getMainLooper() != Looper.myLooper()) {
115-
throw new IllegalStateException("Observers must subscribe from the main UI thread, but was " + Thread.currentThread());
116-
}
117-
}
118114
}
119115

120116
private static final class OnSubscribeFragment<T> extends OnSubscribeBase<T, android.app.Fragment> {

rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorViewClick.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import rx.Observable;
2424
import rx.Subscriber;
2525
import rx.Subscription;
26-
import rx.android.observables.ViewObservable;
26+
import rx.android.observables.Assertions;
2727
import rx.android.subscriptions.AndroidSubscriptions;
2828
import rx.functions.Action0;
2929
import android.view.View;
@@ -39,7 +39,7 @@ public OperatorViewClick(final View view, final boolean emitInitialValue) {
3939

4040
@Override
4141
public void call(final Subscriber<? super View> observer) {
42-
ViewObservable.assertUiThread();
42+
Assertions.assertUiThread();
4343
final CompositeOnClickListener composite = CachedListeners.getFromViewOrCreate(view);
4444

4545
final View.OnClickListener listener = new View.OnClickListener() {

rxjava-contrib/rxjava-android/src/test/java/rx/android/observables/AndroidObservableTest.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package rx.android.observables;
1717

18-
import static org.mockito.Mockito.*;
18+
import static org.mockito.Mockito.verify;
1919

2020
import org.junit.Before;
2121
import org.junit.Test;
@@ -25,14 +25,20 @@
2525
import org.robolectric.Robolectric;
2626
import org.robolectric.RobolectricTestRunner;
2727
import org.robolectric.annotation.Config;
28-
2928
import rx.Observable;
3029
import rx.Observer;
3130
import rx.observers.TestObserver;
31+
3232
import android.app.Activity;
3333
import android.app.Fragment;
3434
import android.support.v4.app.FragmentActivity;
3535

36+
import java.util.concurrent.Callable;
37+
import java.util.concurrent.ExecutionException;
38+
import java.util.concurrent.Executors;
39+
import java.util.concurrent.Future;
40+
import java.util.concurrent.TimeUnit;
41+
3642

3743
@RunWith(RobolectricTestRunner.class)
3844
@Config(manifest = Config.NONE)
@@ -79,4 +85,36 @@ public void itSupportsNativeFragments() {
7985
public void itThrowsIfObjectPassedIsNotAFragment() {
8086
AndroidObservable.fromFragment("not a fragment", Observable.never());
8187
}
88+
89+
@Test(expected = IllegalStateException.class)
90+
public void itThrowsIfObserverCallsFromFragmentFromBackgroundThread() throws Throwable {
91+
final Future<Object> future = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
92+
@Override
93+
public Object call() throws Exception {
94+
AndroidObservable.fromFragment(fragment, Observable.empty());
95+
return null;
96+
}
97+
});
98+
try {
99+
future.get(1, TimeUnit.SECONDS);
100+
} catch (ExecutionException e) {
101+
throw e.getCause();
102+
}
103+
}
104+
105+
@Test(expected = IllegalStateException.class)
106+
public void itThrowsIfObserverCallsFromActivityFromBackgroundThread() throws Throwable {
107+
final Future<Object> future = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
108+
@Override
109+
public Object call() throws Exception {
110+
AndroidObservable.fromActivity(activity, Observable.empty());
111+
return null;
112+
}
113+
});
114+
try {
115+
future.get(1, TimeUnit.SECONDS);
116+
} catch (ExecutionException e) {
117+
throw e.getCause();
118+
}
119+
}
82120
}

rxjava-contrib/rxjava-android/src/test/java/rx/android/operators/OperatorObserveFromAndroidComponentTest.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,22 +70,6 @@ public void setupMocks() {
7070
when(mockFragment.isAdded()).thenReturn(true);
7171
}
7272

73-
@Test
74-
public void itThrowsIfObserverSubscribesFromBackgroundThread() throws Exception {
75-
final Observable<Integer> testObservable = Observable.from(1);
76-
final Future<Object> future = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
77-
@Override
78-
public Object call() throws Exception {
79-
OperatorObserveFromAndroidComponent.observeFromAndroidComponent(
80-
testObservable, mockFragment).subscribe(mockObserver);
81-
return null;
82-
}
83-
});
84-
future.get(1, TimeUnit.SECONDS);
85-
verify(mockObserver).onError(any(IllegalStateException.class));
86-
verifyNoMoreInteractions(mockObserver);
87-
}
88-
8973
// TODO needs to be fixed, see comments inline below
9074
@Ignore
9175
public void itObservesTheSourceSequenceOnTheMainUIThread() {

0 commit comments

Comments
 (0)