Skip to content

Commit 90d5978

Browse files
Merge pull request #938 from soundcloud/operator-weak-binding
OperatorWeakBinding (deprecates OperatorObserveFromAndroidComponent)
2 parents 557e18a + 7ba5be9 commit 90d5978

File tree

5 files changed

+283
-6
lines changed

5 files changed

+283
-6
lines changed

rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,13 @@
1515
*/
1616
package rx.android.observables;
1717

18+
import static rx.android.schedulers.AndroidSchedulers.mainThread;
19+
1820
import rx.Observable;
21+
import rx.functions.Func1;
1922
import rx.operators.OperatorObserveFromAndroidComponent;
23+
import rx.operators.OperatorWeakBinding;
24+
2025
import android.app.Activity;
2126
import android.app.Fragment;
2227
import android.os.Build;
@@ -36,7 +41,30 @@ public final class AndroidObservable {
3641
USES_SUPPORT_FRAGMENTS = supportFragmentsAvailable;
3742
}
3843

39-
private AndroidObservable() {}
44+
private static final Func1<Activity, Boolean> ACTIVITY_VALIDATOR = new Func1<Activity, Boolean>() {
45+
@Override
46+
public Boolean call(Activity activity) {
47+
return !activity.isFinishing();
48+
}
49+
};
50+
51+
private static final Func1<Fragment, Boolean> FRAGMENT_VALIDATOR = new Func1<Fragment, Boolean>() {
52+
@Override
53+
public Boolean call(Fragment fragment) {
54+
return fragment.isAdded();
55+
}
56+
};
57+
58+
private static final Func1<android.support.v4.app.Fragment, Boolean> FRAGMENTV4_VALIDATOR =
59+
new Func1<android.support.v4.app.Fragment, Boolean>() {
60+
@Override
61+
public Boolean call(android.support.v4.app.Fragment fragment) {
62+
return fragment.isAdded();
63+
}
64+
};
65+
66+
private AndroidObservable() {
67+
}
4068

4169
/**
4270
* Transforms a source observable to be attached to the given Activity, in such a way that notifications will always
@@ -57,7 +85,9 @@ private AndroidObservable() {}
5785
* @param sourceObservable the observable sequence to observe from the given Activity
5886
* @param <T>
5987
* @return a new observable sequence that will emit notifications on the main UI thread
88+
* @deprecated Use {@link #bindActivity(android.app.Activity, rx.Observable)} instead
6089
*/
90+
@Deprecated
6191
public static <T> Observable<T> fromActivity(Activity activity, Observable<T> sourceObservable) {
6292
Assertions.assertUiThread();
6393
return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, activity);
@@ -86,7 +116,9 @@ public static <T> Observable<T> fromActivity(Activity activity, Observable<T> so
86116
* @param sourceObservable the observable sequence to observe from the given fragment
87117
* @param <T>
88118
* @return a new observable sequence that will emit notifications on the main UI thread
119+
* @deprecated Use {@link #bindFragment(Object, rx.Observable)} instead
89120
*/
121+
@Deprecated
90122
public static <T> Observable<T> fromFragment(Object fragment, Observable<T> sourceObservable) {
91123
Assertions.assertUiThread();
92124
if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) {
@@ -97,4 +129,43 @@ public static <T> Observable<T> fromFragment(Object fragment, Observable<T> sour
97129
throw new IllegalArgumentException("Target fragment is neither a native nor support library Fragment");
98130
}
99131
}
132+
133+
/**
134+
* Binds the given source sequence to the life-cycle of an activity.
135+
* <p/>
136+
* This helper will schedule the given sequence to be observed on the main UI thread and ensure
137+
* that no notifications will be forwarded to the activity in case it gets destroyed by the Android runtime
138+
* or garbage collected by the VM.
139+
*
140+
* @param activity the activity to bind the source sequence to
141+
* @param source the source sequence
142+
*/
143+
public static <T> Observable<T> bindActivity(Activity activity, Observable<T> source) {
144+
Assertions.assertUiThread();
145+
return source.observeOn(mainThread()).lift(new OperatorWeakBinding<T, Activity>(activity, ACTIVITY_VALIDATOR));
146+
}
147+
148+
/**
149+
* Binds the given source sequence to the life-cycle of a fragment (native or support-v4).
150+
* <p/>
151+
* This helper will schedule the given sequence to be observed on the main UI thread and ensure
152+
* that no notifications will be forwarded to the fragment in case it gets detached from its
153+
* activity or garbage collected by the VM.
154+
*
155+
* @param fragment the fragment to bind the source sequence to
156+
* @param source the source sequence
157+
*/
158+
public static <T> Observable<T> bindFragment(Object fragment, Observable<T> cachedSequence) {
159+
Assertions.assertUiThread();
160+
final Observable<T> source = cachedSequence.observeOn(mainThread());
161+
if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) {
162+
android.support.v4.app.Fragment f = (android.support.v4.app.Fragment) fragment;
163+
return source.lift(new OperatorWeakBinding<T, android.support.v4.app.Fragment>(f, FRAGMENTV4_VALIDATOR));
164+
} else if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.HONEYCOMB && fragment instanceof Fragment) {
165+
Fragment f = (Fragment) fragment;
166+
return source.lift(new OperatorWeakBinding<T, Fragment>(f, FRAGMENT_VALIDATOR));
167+
} else {
168+
throw new IllegalArgumentException("Target fragment is neither a native nor support library Fragment");
169+
}
170+
}
100171
}

rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorObserveFromAndroidComponent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import android.app.Activity;
2626
import android.util.Log;
2727

28+
@Deprecated
2829
public class OperatorObserveFromAndroidComponent {
2930

3031
public static <T> Observable<T> observeFromAndroidComponent(Observable<T> source, android.app.Fragment fragment) {
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package rx.operators;
2+
3+
import rx.Observable;
4+
import rx.Subscriber;
5+
import rx.functions.Func1;
6+
import rx.functions.Functions;
7+
8+
import android.util.Log;
9+
10+
import java.lang.ref.WeakReference;
11+
12+
/**
13+
* Ties a source sequence to the life-cycle of the given target object, and/or the subscriber
14+
* using weak references. When either object is gone, this operator automatically unsubscribes
15+
* from the source sequence.
16+
* <p/>
17+
* You can also pass in an optional predicate function, which whenever it evaluates to false
18+
* on the target object, will also result in the operator unsubscribing from the sequence.
19+
*
20+
* @param <T> the type of the objects emitted to a subscriber
21+
* @param <R> the type of the target object to bind to
22+
*/
23+
public final class OperatorWeakBinding<T, R> implements Observable.Operator<T, T> {
24+
25+
private static final String LOG_TAG = "WeakBinding";
26+
27+
final WeakReference<R> boundRef;
28+
private final Func1<? super R, Boolean> predicate;
29+
30+
public OperatorWeakBinding(R bound, Func1<? super R, Boolean> predicate) {
31+
boundRef = new WeakReference<R>(bound);
32+
this.predicate = predicate;
33+
}
34+
35+
public OperatorWeakBinding(R bound) {
36+
boundRef = new WeakReference<R>(bound);
37+
this.predicate = Functions.alwaysTrue();
38+
}
39+
40+
@Override
41+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
42+
return new WeakSubscriber(child);
43+
}
44+
45+
final class WeakSubscriber extends Subscriber<T> {
46+
47+
final WeakReference<Subscriber<? super T>> subscriberRef;
48+
49+
private WeakSubscriber(Subscriber<? super T> source) {
50+
super(source);
51+
subscriberRef = new WeakReference<Subscriber<? super T>>(source);
52+
}
53+
54+
@Override
55+
public void onCompleted() {
56+
final Subscriber<? super T> sub = subscriberRef.get();
57+
if (shouldForwardNotification(sub)) {
58+
sub.onCompleted();
59+
} else {
60+
handleLostBinding(sub, "onCompleted");
61+
}
62+
}
63+
64+
@Override
65+
public void onError(Throwable e) {
66+
final Subscriber<? super T> sub = subscriberRef.get();
67+
if (shouldForwardNotification(sub)) {
68+
sub.onError(e);
69+
} else {
70+
handleLostBinding(sub, "onError");
71+
}
72+
}
73+
74+
@Override
75+
public void onNext(T t) {
76+
final Subscriber<? super T> sub = subscriberRef.get();
77+
if (shouldForwardNotification(sub)) {
78+
sub.onNext(t);
79+
} else {
80+
handleLostBinding(sub, "onNext");
81+
}
82+
}
83+
84+
private boolean shouldForwardNotification(Subscriber<? super T> sub) {
85+
final R target = boundRef.get();
86+
return sub != null && target != null && predicate.call(target);
87+
}
88+
89+
private void handleLostBinding(Subscriber<? super T> sub, String context) {
90+
if (sub == null) {
91+
log("subscriber gone; skipping " + context);
92+
} else {
93+
final R r = boundRef.get();
94+
if (r != null) {
95+
// the predicate failed to validate
96+
log("bound component has become invalid; skipping " + context);
97+
} else {
98+
log("bound component gone; skipping " + context);
99+
}
100+
}
101+
log("unsubscribing...");
102+
unsubscribe();
103+
}
104+
105+
private void log(String message) {
106+
if (Log.isLoggable(LOG_TAG, Log.DEBUG)) {
107+
Log.d(LOG_TAG, message);
108+
}
109+
}
110+
}
111+
}

rxjava-contrib/rxjava-android/src/test/java/rx/android/observables/AndroidObservableTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,29 +69,29 @@ public void setup() {
6969

7070
@Test
7171
public void itSupportsFragmentsFromTheSupportV4Library() {
72-
AndroidObservable.fromFragment(supportFragment, Observable.just("success")).subscribe(new TestObserver<String>(observer));
72+
AndroidObservable.bindFragment(supportFragment, Observable.just("success")).subscribe(new TestObserver<String>(observer));
7373
verify(observer).onNext("success");
7474
verify(observer).onCompleted();
7575
}
7676

7777
@Test
7878
public void itSupportsNativeFragments() {
79-
AndroidObservable.fromFragment(fragment, Observable.just("success")).subscribe(new TestObserver<String>(observer));
79+
AndroidObservable.bindFragment(fragment, Observable.just("success")).subscribe(new TestObserver<String>(observer));
8080
verify(observer).onNext("success");
8181
verify(observer).onCompleted();
8282
}
8383

8484
@Test(expected = IllegalArgumentException.class)
8585
public void itThrowsIfObjectPassedIsNotAFragment() {
86-
AndroidObservable.fromFragment("not a fragment", Observable.never());
86+
AndroidObservable.bindFragment("not a fragment", Observable.never());
8787
}
8888

8989
@Test(expected = IllegalStateException.class)
9090
public void itThrowsIfObserverCallsFromFragmentFromBackgroundThread() throws Throwable {
9191
final Future<Object> future = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
9292
@Override
9393
public Object call() throws Exception {
94-
AndroidObservable.fromFragment(fragment, Observable.empty());
94+
AndroidObservable.bindFragment(fragment, Observable.empty());
9595
return null;
9696
}
9797
});
@@ -107,7 +107,7 @@ public void itThrowsIfObserverCallsFromActivityFromBackgroundThread() throws Thr
107107
final Future<Object> future = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
108108
@Override
109109
public Object call() throws Exception {
110-
AndroidObservable.fromActivity(activity, Observable.empty());
110+
AndroidObservable.bindActivity(activity, Observable.empty());
111111
return null;
112112
}
113113
});
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package rx.operators;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import org.junit.Before;
6+
import org.junit.Test;
7+
import org.junit.runner.RunWith;
8+
import org.mockito.MockitoAnnotations;
9+
import org.robolectric.RobolectricTestRunner;
10+
import rx.functions.Functions;
11+
import rx.observers.TestSubscriber;
12+
13+
import java.util.Arrays;
14+
15+
@RunWith(RobolectricTestRunner.class)
16+
public class OperatorWeakBindingTest {
17+
18+
private TestSubscriber<String> subscriber = new TestSubscriber<String>();
19+
20+
@Before
21+
public void setUp() throws Exception {
22+
MockitoAnnotations.initMocks(this);
23+
}
24+
25+
@Test
26+
public void shouldForwardAllNotificationsWhenSubscriberAndTargetAlive() {
27+
OperatorWeakBinding<String, Object> op = new OperatorWeakBinding<String, Object>(new Object());
28+
OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber);
29+
weakSub.onNext("one");
30+
weakSub.onNext("two");
31+
weakSub.onCompleted();
32+
weakSub.onError(new Exception());
33+
34+
subscriber.assertReceivedOnNext(Arrays.asList("one", "two"));
35+
assertEquals(1, subscriber.getOnCompletedEvents().size());
36+
assertEquals(1, subscriber.getOnErrorEvents().size());
37+
}
38+
39+
@Test
40+
public void shouldUnsubscribeFromSourceSequenceWhenSubscriberReleased() {
41+
OperatorWeakBinding<String, Object> op = new OperatorWeakBinding<String, Object>(new Object());
42+
43+
OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber);
44+
weakSub.onNext("one");
45+
weakSub.subscriberRef.clear();
46+
weakSub.onNext("two");
47+
weakSub.onCompleted();
48+
weakSub.onError(new Exception());
49+
50+
subscriber.assertReceivedOnNext(Arrays.asList("one"));
51+
assertEquals(0, subscriber.getOnCompletedEvents().size());
52+
assertEquals(0, subscriber.getOnErrorEvents().size());
53+
}
54+
55+
@Test
56+
public void shouldUnsubscribeFromSourceSequenceWhenTargetObjectReleased() {
57+
OperatorWeakBinding<String, Object> op = new OperatorWeakBinding<String, Object>(new Object());
58+
59+
OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber);
60+
weakSub.onNext("one");
61+
op.boundRef.clear();
62+
weakSub.onNext("two");
63+
weakSub.onCompleted();
64+
weakSub.onError(new Exception());
65+
66+
subscriber.assertReceivedOnNext(Arrays.asList("one"));
67+
assertEquals(0, subscriber.getOnCompletedEvents().size());
68+
assertEquals(0, subscriber.getOnErrorEvents().size());
69+
}
70+
71+
@Test
72+
public void shouldUnsubscribeFromSourceSequenceWhenPredicateFailsToPass() {
73+
OperatorWeakBinding<String, Object> op = new OperatorWeakBinding<String, Object>(
74+
new Object(), Functions.alwaysFalse());
75+
76+
OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber);
77+
weakSub.onNext("one");
78+
weakSub.onNext("two");
79+
weakSub.onCompleted();
80+
weakSub.onError(new Exception());
81+
82+
assertEquals(0, subscriber.getOnNextEvents().size());
83+
assertEquals(0, subscriber.getOnCompletedEvents().size());
84+
assertEquals(0, subscriber.getOnErrorEvents().size());
85+
}
86+
87+
@Test
88+
public void unsubscribeWillUnsubscribeFromWrappedSubscriber() {
89+
OperatorWeakBinding<String, Object> op = new OperatorWeakBinding<String, Object>(new Object());
90+
91+
op.call(subscriber).unsubscribe();
92+
subscriber.assertUnsubscribed();
93+
}
94+
}

0 commit comments

Comments
 (0)