Skip to content

Commit 5209ab1

Browse files
committed
Check child unsubscription status more eagerly.
1 parent dade7e1 commit 5209ab1

File tree

1 file changed

+26
-16
lines changed

1 file changed

+26
-16
lines changed

rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -84,22 +84,23 @@ public void enterPassthroughMode() {
8484
if (!passthroughMode) {
8585
while (!queue.isEmpty()) {
8686
Object o = queue.poll();
87-
88-
if (o == NULL_SENTINEL) {
89-
actual.onNext(null);
90-
} else
91-
if (o == COMPLETE_SENTINEL) {
92-
actual.onCompleted();
93-
} else
94-
if (o instanceof ErrorSentinel) {
95-
actual.onError(((ErrorSentinel)o).t);
96-
} else
97-
if (o != null) {
98-
@SuppressWarnings("unchecked")
99-
T v = (T)o;
100-
actual.onNext(v);
101-
} else {
102-
throw new NullPointerException();
87+
if (!actual.isUnsubscribed()) {
88+
if (o == NULL_SENTINEL) {
89+
actual.onNext(null);
90+
} else
91+
if (o == COMPLETE_SENTINEL) {
92+
actual.onCompleted();
93+
} else
94+
if (o instanceof ErrorSentinel) {
95+
actual.onError(((ErrorSentinel)o).t);
96+
} else
97+
if (o != null) {
98+
@SuppressWarnings("unchecked")
99+
T v = (T)o;
100+
actual.onNext(v);
101+
} else {
102+
throw new NullPointerException();
103+
}
103104
}
104105
}
105106
passthroughMode = true;
@@ -121,6 +122,9 @@ public void onNext(T t) {
121122
while (!passthroughMode) {
122123
gate.wait();
123124
}
125+
if (actual.isUnsubscribed()) {
126+
return;
127+
}
124128
} catch (InterruptedException ex) {
125129
Thread.currentThread().interrupt();
126130
actual.onError(ex);
@@ -145,6 +149,9 @@ public void onError(Throwable e) {
145149
while (!passthroughMode) {
146150
gate.wait();
147151
}
152+
if (actual.isUnsubscribed()) {
153+
return;
154+
}
148155
} catch (InterruptedException ex) {
149156
Thread.currentThread().interrupt();
150157
actual.onError(ex);
@@ -169,6 +176,9 @@ public void onCompleted() {
169176
while (!passthroughMode) {
170177
gate.wait();
171178
}
179+
if (actual.isUnsubscribed()) {
180+
return;
181+
}
172182
} catch (InterruptedException ex) {
173183
Thread.currentThread().interrupt();
174184
actual.onError(ex);

0 commit comments

Comments
 (0)