Skip to content

Commit d7037b5

Browse files
committed
OperatorWeakBinding supports predicates now
1 parent 0ff4237 commit d7037b5

File tree

3 files changed

+178
-18
lines changed

3 files changed

+178
-18
lines changed

rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static rx.android.schedulers.AndroidSchedulers.mainThread;
1919

2020
import rx.Observable;
21+
import rx.functions.Func1;
2122
import rx.operators.OperatorObserveFromAndroidComponent;
2223
import rx.operators.OperatorWeakBinding;
2324

@@ -40,7 +41,30 @@ public final class AndroidObservable {
4041
USES_SUPPORT_FRAGMENTS = supportFragmentsAvailable;
4142
}
4243

43-
private AndroidObservable() {}
44+
private static final Func1<Activity, Boolean> ACTIVITY_VALIDATOR = new Func1<Activity, Boolean>() {
45+
@Override
46+
public Boolean call(Activity activity) {
47+
return !activity.isFinishing();
48+
}
49+
};
50+
51+
private static final Func1<Fragment, Boolean> FRAGMENT_VALIDATOR = new Func1<Fragment, Boolean>() {
52+
@Override
53+
public Boolean call(Fragment fragment) {
54+
return fragment.isAdded();
55+
}
56+
};
57+
58+
private static final Func1<android.support.v4.app.Fragment, Boolean> FRAGMENTV4_VALIDATOR =
59+
new Func1<android.support.v4.app.Fragment, Boolean>() {
60+
@Override
61+
public Boolean call(android.support.v4.app.Fragment fragment) {
62+
return fragment.isAdded();
63+
}
64+
};
65+
66+
private AndroidObservable() {
67+
}
4468

4569
/**
4670
* Transforms a source observable to be attached to the given Activity, in such a way that notifications will always
@@ -61,6 +85,7 @@ private AndroidObservable() {}
6185
* @param sourceObservable the observable sequence to observe from the given Activity
6286
* @param <T>
6387
* @return a new observable sequence that will emit notifications on the main UI thread
88+
* @deprecated Use {@link #bindActivity(android.app.Activity, rx.Observable)} instead
6489
*/
6590
@Deprecated
6691
public static <T> Observable<T> fromActivity(Activity activity, Observable<T> sourceObservable) {
@@ -91,6 +116,7 @@ public static <T> Observable<T> fromActivity(Activity activity, Observable<T> so
91116
* @param sourceObservable the observable sequence to observe from the given fragment
92117
* @param <T>
93118
* @return a new observable sequence that will emit notifications on the main UI thread
119+
* @deprecated Use {@link #bindFragment(Object, rx.Observable)} instead
94120
*/
95121
@Deprecated
96122
public static <T> Observable<T> fromFragment(Object fragment, Observable<T> sourceObservable) {
@@ -104,18 +130,38 @@ public static <T> Observable<T> fromFragment(Object fragment, Observable<T> sour
104130
}
105131
}
106132

107-
public static <T> Observable<T> bindActivity(Activity activity, Observable<T> cachedSequence) {
108-
return cachedSequence.observeOn(mainThread()).lift(new OperatorWeakBinding<T, Activity>(activity));
133+
/**
134+
* Binds the given source sequence to the life-cycle of an activity.
135+
* <p/>
136+
* This helper will schedule the given sequence to be observed on the main UI thread and ensure
137+
* that no notifications will be forwarded to the activity in case it gets destroyed by the Android runtime
138+
* or garbage collected by the VM.
139+
*
140+
* @param activity the activity to bind the source sequence to
141+
* @param source the source sequence
142+
*/
143+
public static <T> Observable<T> bindActivity(Activity activity, Observable<T> source) {
144+
return source.observeOn(mainThread()).lift(new OperatorWeakBinding<T, Activity>(activity, ACTIVITY_VALIDATOR));
109145
}
110146

147+
/**
148+
* Binds the given source sequence to the life-cycle of a fragment (native or support-v4).
149+
* <p/>
150+
* This helper will schedule the given sequence to be observed on the main UI thread and ensure
151+
* that no notifications will be forwarded to the fragment in case it gets detached from its
152+
* activity or garbage collected by the VM.
153+
*
154+
* @param fragment the fragment to bind the source sequence to
155+
* @param source the source sequence
156+
*/
111157
public static <T> Observable<T> bindFragment(Object fragment, Observable<T> cachedSequence) {
112158
Observable<T> source = cachedSequence.observeOn(mainThread());
113159
if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) {
114160
android.support.v4.app.Fragment f = (android.support.v4.app.Fragment) fragment;
115-
return source.lift(new OperatorWeakBinding<T, android.support.v4.app.Fragment>(f));
161+
return source.lift(new OperatorWeakBinding<T, android.support.v4.app.Fragment>(f, FRAGMENTV4_VALIDATOR));
116162
} else if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.HONEYCOMB && fragment instanceof Fragment) {
117163
Fragment f = (Fragment) fragment;
118-
return source.lift(new OperatorWeakBinding<T, Fragment>(f));
164+
return source.lift(new OperatorWeakBinding<T, Fragment>(f, FRAGMENT_VALIDATOR));
119165
} else {
120166
throw new IllegalArgumentException("Target fragment is neither a native nor support library Fragment");
121167
}

rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,40 +2,58 @@
22

33
import rx.Observable;
44
import rx.Subscriber;
5+
import rx.functions.Func1;
6+
import rx.functions.Functions;
57

68
import android.util.Log;
79

810
import java.lang.ref.WeakReference;
911

12+
/**
13+
* Ties a source sequence to the life-cycle of the given target object, and/or the subscriber
14+
* using weak references. When either object is gone, this operator automatically unsubscribes
15+
* from the source sequence.
16+
* <p/>
17+
* You can also pass in an optional predicate function, which whenever it evaluates to false
18+
* on the target object, will also result in the operator unsubscribing from the sequence.
19+
*
20+
* @param <T> the type of the objects emitted to a subscriber
21+
* @param <R> the type of the target object to bind to
22+
*/
1023
public final class OperatorWeakBinding<T, R> implements Observable.Operator<T, T> {
1124

1225
private static final String LOG_TAG = "WeakBinding";
1326

14-
private final WeakReference<R> boundRef;
27+
final WeakReference<R> boundRef;
28+
private final Func1<? super R, Boolean> predicate;
29+
30+
public OperatorWeakBinding(R bound, Func1<? super R, Boolean> predicate) {
31+
boundRef = new WeakReference<R>(bound);
32+
this.predicate = predicate;
33+
}
1534

1635
public OperatorWeakBinding(R bound) {
1736
boundRef = new WeakReference<R>(bound);
37+
this.predicate = Functions.alwaysTrue();
1838
}
1939

2040
@Override
2141
public Subscriber<? super T> call(final Subscriber<? super T> child) {
22-
return new WeakSubscriber<T, R>(child, boundRef);
42+
return new WeakSubscriber(child);
2343
}
2444

25-
private static final class WeakSubscriber<T, R> extends Subscriber<T> {
45+
final class WeakSubscriber extends Subscriber<T> {
2646

27-
private final WeakReference<Subscriber<? super T>> subscriberRef;
28-
private final WeakReference<R> boundRef;
47+
final WeakReference<Subscriber<? super T>> subscriberRef;
2948

30-
private WeakSubscriber(Subscriber<? super T> op, WeakReference<R> boundRef) {
31-
subscriberRef = new WeakReference<Subscriber<? super T>>(op);
32-
this.boundRef = boundRef;
49+
private WeakSubscriber(Subscriber<? super T> source) {
50+
subscriberRef = new WeakReference<Subscriber<? super T>>(source);
3351
}
3452

3553
@Override
3654
public void onCompleted() {
3755
Subscriber<? super T> sub = subscriberRef.get();
38-
if (sub != null && boundRef.get() != null) {
56+
if (shouldForwardNotification(sub)) {
3957
sub.onCompleted();
4058
} else {
4159
handleLostBinding(sub, "onCompleted");
@@ -45,7 +63,7 @@ public void onCompleted() {
4563
@Override
4664
public void onError(Throwable e) {
4765
Subscriber<? super T> sub = subscriberRef.get();
48-
if (sub != null && boundRef.get() != null) {
66+
if (shouldForwardNotification(sub)) {
4967
sub.onError(e);
5068
} else {
5169
handleLostBinding(sub, "onError");
@@ -55,21 +73,31 @@ public void onError(Throwable e) {
5573
@Override
5674
public void onNext(T t) {
5775
Subscriber<? super T> sub = subscriberRef.get();
58-
if (sub != null && boundRef.get() != null) {
76+
if (shouldForwardNotification(sub)) {
5977
sub.onNext(t);
6078
} else {
6179
handleLostBinding(sub, "onNext");
6280
}
6381
}
6482

83+
private boolean shouldForwardNotification(Subscriber<? super T> sub) {
84+
final R target = boundRef.get();
85+
return sub != null && target != null && predicate.call(target);
86+
}
87+
6588
private void handleLostBinding(Subscriber<? super T> sub, String context) {
6689
if (sub == null) {
6790
Log.d(LOG_TAG, "subscriber gone; skipping " + context);
6891
} else {
69-
Log.d(LOG_TAG, "bound component gone; skipping " + context);
92+
final R r = boundRef.get();
93+
if (r != null) { // the predicate failed to validate
94+
Log.d(LOG_TAG, "bound component has become invalid; skipping " + context);
95+
} else {
96+
Log.d(LOG_TAG, "bound component gone; skipping " + context);
97+
}
7098
}
99+
Log.d(LOG_TAG, "unsubscribing...");
71100
unsubscribe();
72101
}
73-
74102
}
75103
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package rx.operators;
2+
3+
import static org.mockito.Matchers.any;
4+
import static org.mockito.Mockito.verify;
5+
import static org.mockito.Mockito.verifyNoMoreInteractions;
6+
import static org.mockito.Mockito.verifyZeroInteractions;
7+
8+
import org.junit.Before;
9+
import org.junit.Test;
10+
import org.junit.runner.RunWith;
11+
import org.mockito.Mock;
12+
import org.mockito.MockitoAnnotations;
13+
import org.robolectric.RobolectricTestRunner;
14+
import rx.Subscriber;
15+
import rx.functions.Functions;
16+
17+
@RunWith(RobolectricTestRunner.class)
18+
public class OperatorWeakBindingTest {
19+
20+
@Mock
21+
private Subscriber<String> subscriber;
22+
23+
@Before
24+
public void setUp() throws Exception {
25+
MockitoAnnotations.initMocks(this);
26+
}
27+
28+
@Test
29+
public void shouldForwardAllNotificationsWhenSubscriberAndTargetAlive() {
30+
OperatorWeakBinding<String, Object> op = new OperatorWeakBinding<String, Object>(new Object());
31+
OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber);
32+
weakSub.onNext("one");
33+
weakSub.onNext("two");
34+
weakSub.onCompleted();
35+
weakSub.onError(new Exception());
36+
37+
verify(subscriber).onNext("one");
38+
verify(subscriber).onNext("two");
39+
verify(subscriber).onCompleted();
40+
verify(subscriber).onError(any(Exception.class));
41+
}
42+
43+
@Test
44+
public void shouldUnsubscribeFromSourceSequenceWhenSubscriberReleased() {
45+
OperatorWeakBinding<String, Object> op = new OperatorWeakBinding<String, Object>(new Object());
46+
47+
OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber);
48+
weakSub.onNext("one");
49+
weakSub.subscriberRef.clear();
50+
weakSub.onNext("two");
51+
weakSub.onCompleted();
52+
weakSub.onError(new Exception());
53+
54+
verify(subscriber).onNext("one");
55+
verifyNoMoreInteractions(subscriber);
56+
}
57+
58+
@Test
59+
public void shouldUnsubscribeFromSourceSequenceWhenTargetObjectReleased() {
60+
OperatorWeakBinding<String, Object> op = new OperatorWeakBinding<String, Object>(new Object());
61+
62+
OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber);
63+
weakSub.onNext("one");
64+
op.boundRef.clear();
65+
weakSub.onNext("two");
66+
weakSub.onCompleted();
67+
weakSub.onError(new Exception());
68+
69+
verify(subscriber).onNext("one");
70+
verifyNoMoreInteractions(subscriber);
71+
}
72+
73+
@Test
74+
public void shouldUnsubscribeFromSourceSequenceWhenPredicateFailsToPass() {
75+
OperatorWeakBinding<String, Object> op = new OperatorWeakBinding<String, Object>(
76+
new Object(), Functions.alwaysFalse());
77+
78+
OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber);
79+
weakSub.onNext("one");
80+
weakSub.onNext("two");
81+
weakSub.onCompleted();
82+
weakSub.onError(new Exception());
83+
84+
verifyZeroInteractions(subscriber);
85+
}
86+
}

0 commit comments

Comments
 (0)