Skip to content

Commit 26f36ec

Browse files
Merge pull request #796 from benjchristensen/subscription-isunsubscribed
Add Subscription.isUnsubscribed()
2 parents 025de00 + c07e459 commit 26f36ec

35 files changed

+255
-183
lines changed

language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/RxExamples.groovy

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import rx.Observable;
1919
import rx.Observer;
2020
import rx.Subscription;
2121
import rx.subscriptions.Subscriptions;
22+
import rx.util.functions.Action0
2223
import rx.util.functions.Func1;
2324

2425
// --------------------------------------------------
@@ -123,13 +124,13 @@ def customObservableNonBlocking() {
123124
});
124125
t.start();
125126

126-
return new Subscription() {
127-
public void unsubscribe() {
127+
return Subscriptions.create(new Action0() {
128+
public void call() {
128129
// Ask the thread to stop doing work.
129130
// For this simple example it just interrupts.
130131
t.interrupt();
131132
}
132-
};
133+
});
133134
};
134135
});
135136
}

language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -530,12 +530,7 @@ def class ObservableTests {
530530
observer.onNext("hello_" + count);
531531
observer.onCompleted();
532532

533-
return new Subscription() {
534-
535-
public void unsubscribe() {
536-
// unregister ... will never be called here since we are executing synchronously
537-
}
538-
};
533+
return Subscriptions.empty();
539534
}
540535
}
541536
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ trait Subscription {
4444
private [scala] val unsubscribed = new AtomicBoolean(false)
4545
private [scala] val asJavaSubscription: rx.Subscription = new rx.Subscription {
4646
override def unsubscribe() { unsubscribed.compareAndSet(false, true) }
47+
override def isUnsubscribed(): Boolean = { unsubscribed.get() }
4748
}
4849

4950

@@ -81,6 +82,7 @@ object Subscription {
8182
def apply(u: => Unit): Subscription = new Subscription() {
8283
override val asJavaSubscription = new rx.Subscription {
8384
override def unsubscribe() { if(unsubscribed.compareAndSet(false, true)) { u } }
85+
override def isUnsubscribed(): Boolean = { unsubscribed.get() }
8486
}
8587
}
8688

rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveFromAndroidComponent.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import rx.Observer;
2020
import rx.Subscription;
2121
import rx.android.schedulers.AndroidSchedulers;
22+
import rx.subscriptions.Subscriptions;
23+
import rx.util.functions.Action0;
2224
import android.app.Activity;
2325
import android.os.Looper;
2426
import android.util.Log;
@@ -94,14 +96,14 @@ public void onNext(T args) {
9496
}
9597
}
9698
});
97-
return new Subscription() {
99+
return Subscriptions.create(new Action0() {
98100
@Override
99-
public void unsubscribe() {
101+
public void call() {
100102
log("unsubscribing from source sequence");
101103
releaseReferences();
102104
sourceSub.unsubscribe();
103105
}
104-
};
106+
});
105107
}
106108

107109
private void releaseReferences() {

rxjava-core/src/main/java/rx/Subscription.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,7 @@ public interface Subscription {
3232
* This allows unregistering an {@link Subscriber} before it has finished receiving all events (ie. before onCompleted is called).
3333
*/
3434
public void unsubscribe();
35+
36+
public boolean isUnsubscribed();
3537

3638
}

rxjava-core/src/main/java/rx/operators/ChunkedOperation.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ protected static class TimeAndSizeBasedChunks<T, C> extends Chunks<T, C> impleme
156156
private final long maxTime;
157157
private final TimeUnit unit;
158158
private final int maxSize;
159+
private volatile boolean unsubscribed = false;
159160

160161
public TimeAndSizeBasedChunks(Observer<? super C> observer, Func0<? extends Chunk<T, C>> chunkMaker, int maxSize, long maxTime, TimeUnit unit, Scheduler scheduler) {
161162
super(observer, chunkMaker);
@@ -210,10 +211,16 @@ public void pushValue(T value) {
210211

211212
@Override
212213
public void unsubscribe() {
214+
unsubscribed = true;
213215
for (Subscription s : subscriptions.values()) {
214216
s.unsubscribe();
215217
}
216218
}
219+
220+
@Override
221+
public boolean isUnsubscribed() {
222+
return unsubscribed;
223+
}
217224
}
218225

219226
/**
@@ -232,6 +239,7 @@ protected static class TimeBasedChunks<T, C> extends OverlappingChunks<T, C> imp
232239
private final Scheduler scheduler;
233240
private final long time;
234241
private final TimeUnit unit;
242+
private volatile boolean unsubscribed = false;
235243

236244
public TimeBasedChunks(Observer<? super C> observer, Func0<? extends Chunk<T, C>> chunkMaker, long time, TimeUnit unit, Scheduler scheduler) {
237245
super(observer, chunkMaker);
@@ -260,11 +268,17 @@ public void emitChunk(Chunk<T, C> chunk) {
260268

261269
@Override
262270
public void unsubscribe() {
271+
unsubscribed = true;
263272
for (Subscription s : subscriptions.values()) {
264273
s.unsubscribe();
265274
}
266275
}
267276

277+
@Override
278+
public boolean isUnsubscribed() {
279+
return unsubscribed;
280+
}
281+
268282
}
269283

270284
/**

rxjava-core/src/main/java/rx/operators/OperationBuffer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,11 @@ public void unsubscribe() {
393393
cc0.stop();
394394
}
395395
}
396+
397+
@Override
398+
public boolean isUnsubscribed() {
399+
return done.get();
400+
}
396401
}
397402

398403
/**

rxjava-core/src/main/java/rx/operators/OperationConcat.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import rx.Observable.OnSubscribeFunc;
2424
import rx.Observer;
2525
import rx.Subscription;
26+
import rx.subscriptions.Subscriptions;
27+
import rx.util.functions.Action0;
2628

2729
/**
2830
* Returns an Observable that emits the items emitted by two or more Observables, one after the
@@ -153,9 +155,9 @@ public void onCompleted() {
153155
}
154156
}));
155157

156-
return new Subscription() {
158+
return Subscriptions.create(new Action0() {
157159
@Override
158-
public void unsubscribe() {
160+
public void call() {
159161
Subscription q;
160162
synchronized (nextSequences) {
161163
q = innerSubscription;
@@ -165,7 +167,7 @@ public void unsubscribe() {
165167
}
166168
outerSubscription.unsubscribe();
167169
}
168-
};
170+
});
169171
}
170172
}
171173
}

rxjava-core/src/main/java/rx/operators/OperationGroupJoin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ public void init() {
9999
public void unsubscribe() {
100100
cancel.unsubscribe();
101101
}
102+
103+
@Override
104+
public boolean isUnsubscribed() {
105+
return cancel.isUnsubscribed();
106+
}
102107

103108
void groupsOnCompleted() {
104109
List<Observer<T2>> list = new ArrayList<Observer<T2>>(leftMap.values());
@@ -299,6 +304,7 @@ public void onNext(D2 args) {
299304
onCompleted();
300305
}
301306
}
307+
302308
}
303309

304310
/**

rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@
2525
import rx.Observer;
2626
import rx.Subscription;
2727
import rx.observers.SynchronizedObserver;
28+
import rx.subscriptions.BooleanSubscription;
2829
import rx.subscriptions.CompositeSubscription;
30+
import rx.subscriptions.Subscriptions;
2931
import rx.util.CompositeException;
32+
import rx.util.functions.Action0;
3033

3134
/**
3235
* This behaves like {@link OperatorMerge} except that if any of the merged Observables notify of
@@ -69,60 +72,46 @@ public Subscription onSubscribe(Observer<? super T> observer) {
6972

7073
public static <T> OnSubscribeFunc<T> mergeDelayError(final Observable<? extends T>... sequences) {
7174
return mergeDelayError(Observable.create(new OnSubscribeFunc<Observable<? extends T>>() {
72-
private volatile boolean unsubscribed = false;
75+
private final BooleanSubscription s = new BooleanSubscription();
7376

7477
@Override
7578
public Subscription onSubscribe(Observer<? super Observable<? extends T>> observer) {
7679
for (Observable<? extends T> o : sequences) {
77-
if (!unsubscribed) {
80+
if (!s.isUnsubscribed()) {
7881
observer.onNext(o);
7982
} else {
8083
// break out of the loop if we are unsubscribed
8184
break;
8285
}
8386
}
84-
if (!unsubscribed) {
87+
if (!s.isUnsubscribed()) {
8588
observer.onCompleted();
8689
}
87-
return new Subscription() {
88-
89-
@Override
90-
public void unsubscribe() {
91-
unsubscribed = true;
92-
}
93-
94-
};
90+
return s;
9591
}
9692
}));
9793
}
9894

9995
public static <T> OnSubscribeFunc<T> mergeDelayError(final List<? extends Observable<? extends T>> sequences) {
10096
return mergeDelayError(Observable.create(new OnSubscribeFunc<Observable<? extends T>>() {
10197

102-
private volatile boolean unsubscribed = false;
98+
private final BooleanSubscription s = new BooleanSubscription();
10399

104100
@Override
105101
public Subscription onSubscribe(Observer<? super Observable<? extends T>> observer) {
106102
for (Observable<? extends T> o : sequences) {
107-
if (!unsubscribed) {
103+
if (!s.isUnsubscribed()) {
108104
observer.onNext(o);
109105
} else {
110106
// break out of the loop if we are unsubscribed
111107
break;
112108
}
113109
}
114-
if (!unsubscribed) {
110+
if (!s.isUnsubscribed()) {
115111
observer.onCompleted();
116112
}
117113

118-
return new Subscription() {
119-
120-
@Override
121-
public void unsubscribe() {
122-
unsubscribed = true;
123-
}
124-
125-
};
114+
return s;
126115
}
127116
}));
128117
}
@@ -201,6 +190,11 @@ public boolean stop() {
201190
return false;
202191
}
203192
}
193+
194+
@Override
195+
public boolean isUnsubscribed() {
196+
return stopped.get();
197+
}
204198
}
205199

206200
/**

0 commit comments

Comments
 (0)