Skip to content

Commit c38a780

Browse files
akarnokdakarnokd
authored andcommitted
Fixed race & late termination condition.
1 parent b02e572 commit c38a780

File tree

2 files changed

+117
-14
lines changed

2 files changed

+117
-14
lines changed

src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ void init() {
5858
child.setProducer(new Producer() {
5959
@Override
6060
public void request(long n) {
61+
if (n == 0) {
62+
return;
63+
}
6164
synchronized (BlockingSubscriber.this) {
6265
if (n == Long.MAX_VALUE || requestedCount == Long.MAX_VALUE) {
6366
requestedCount = Long.MAX_VALUE;
@@ -95,28 +98,41 @@ public void onCompleted() {
9598
}
9699
void drain() {
97100
long n;
101+
boolean term;
98102
synchronized (this) {
99103
if (emitting) {
100104
return;
101105
}
102106
emitting = true;
103107
n = requestedCount;
108+
term = terminated;
104109
}
105110
boolean skipFinal = false;
106111
try {
112+
Subscriber<? super T> child = this.child;
113+
BlockingQueue<Object> queue = this.queue;
107114
while (true) {
108115
int emitted = 0;
109-
while (n > 0) {
110-
Object o = queue.poll();
111-
if (o == null) {
112-
if (terminated) {
113-
if (exception != null) {
114-
child.onError(exception);
116+
while (n > 0 || term) {
117+
Object o;
118+
if (term) {
119+
o = queue.peek();
120+
if (o == null) {
121+
Throwable e = exception;
122+
if (e != null) {
123+
child.onError(e);
115124
} else {
116125
child.onCompleted();
117126
}
127+
skipFinal = true;
118128
return;
119129
}
130+
if (n == 0) {
131+
break;
132+
}
133+
}
134+
o = queue.poll();
135+
if (o == null) {
120136
break;
121137
} else {
122138
child.onNext(nl.getValue(o));
@@ -125,23 +141,25 @@ void drain() {
125141
}
126142
}
127143
synchronized (this) {
144+
term = terminated;
145+
boolean more = queue.peek() != null;
128146
// if no backpressure below
129147
if (requestedCount == Long.MAX_VALUE) {
130148
// no new data arrived since the last poll
131-
if (queue.peek() == null) {
149+
if (!more && !term) {
132150
skipFinal = true;
133151
emitting = false;
134152
return;
135153
}
136154
n = Long.MAX_VALUE;
137155
} else {
138-
if (emitted == 0) {
156+
requestedCount -= emitted;
157+
n = requestedCount;
158+
if ((n == 0 || !more) && (!term || more)) {
139159
skipFinal = true;
140160
emitting = false;
141161
return;
142162
}
143-
requestedCount -= emitted;
144-
n = requestedCount;
145163
}
146164
}
147165
}

src/test/java/rx/internal/operators/OnBackpressureBlockTest.java

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616

1717
package rx.internal.operators;
1818

19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertTrue;
21+
import static org.mockito.Matchers.any;
1922
import static org.mockito.Mockito.*;
2023

2124
import java.util.Arrays;
22-
23-
import static org.junit.Assert.*;
25+
import java.util.Collections;
2426

2527
import org.junit.Test;
2628

@@ -34,6 +36,7 @@
3436
import rx.observers.TestObserver;
3537
import rx.observers.TestSubscriber;
3638
import rx.schedulers.Schedulers;
39+
import rx.subjects.PublishSubject;
3740

3841
/**
3942
* Test the onBackpressureBlock() behavior.
@@ -161,13 +164,15 @@ public void onStart() {
161164
Thread.sleep(WAIT);
162165

163166
o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
164-
o.assertNoErrors();
165-
assertTrue(o.getOnCompletedEvents().isEmpty());
167+
o.assertTerminalEvent();
168+
assertEquals(1, o.getOnErrorEvents().size());
169+
assertTrue(o.getOnErrorEvents().get(0) instanceof TestException);
166170

167171
o.requestMore(10);
168172

169173
Thread.sleep(WAIT);
170174

175+
o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
171176
o.assertTerminalEvent();
172177
assertEquals(1, o.getOnErrorEvents().size());
173178
assertTrue(o.getOnErrorEvents().get(0) instanceof TestException);
@@ -259,4 +264,84 @@ public void testTakeWorksSubscriberRequestUnlimitedBufferedException() {
259264
o.assertNoErrors();
260265
o.assertTerminalEvent();
261266
}
267+
@Test(timeout = 10000)
268+
public void testOnCompletedDoesntWaitIfNoEvents() {
269+
270+
TestSubscriber<Integer> o = new TestSubscriber<Integer>() {
271+
@Override
272+
public void onStart() {
273+
request(0); // make sure it doesn't start in unlimited mode
274+
}
275+
};
276+
Observable.<Integer>empty().onBackpressureBlock(2).subscribe(o);
277+
278+
o.assertNoErrors();
279+
o.assertTerminalEvent();
280+
o.assertReceivedOnNext(Collections.<Integer>emptyList());
281+
}
282+
@Test(timeout = 10000)
283+
public void testOnCompletedDoesWaitIfEvents() {
284+
285+
TestSubscriber<Integer> o = new TestSubscriber<Integer>() {
286+
@Override
287+
public void onStart() {
288+
request(0); // make sure it doesn't start in unlimited mode
289+
}
290+
};
291+
Observable.just(1).onBackpressureBlock(2).subscribe(o);
292+
293+
o.assertReceivedOnNext(Collections.<Integer>emptyList());
294+
assertTrue(o.getOnErrorEvents().isEmpty());
295+
assertTrue(o.getOnCompletedEvents().isEmpty());
296+
}
297+
@Test(timeout = 10000)
298+
public void testOnCompletedDoesntWaitIfNoEvents2() {
299+
final PublishSubject<Integer> ps = PublishSubject.create();
300+
TestSubscriber<Integer> o = new TestSubscriber<Integer>() {
301+
@Override
302+
public void onStart() {
303+
request(0); // make sure it doesn't start in unlimited mode
304+
}
305+
@Override
306+
public void onNext(Integer t) {
307+
super.onNext(t);
308+
ps.onCompleted(); // as if an async completion arrived while in the loop
309+
}
310+
};
311+
ps.onBackpressureBlock(2).unsafeSubscribe(o);
312+
ps.onNext(1);
313+
o.requestMore(1);
314+
315+
o.assertNoErrors();
316+
o.assertTerminalEvent();
317+
o.assertReceivedOnNext(Arrays.asList(1));
318+
}
319+
@Test(timeout = 10000)
320+
public void testOnCompletedDoesntWaitIfNoEvents3() {
321+
final PublishSubject<Integer> ps = PublishSubject.create();
322+
TestSubscriber<Integer> o = new TestSubscriber<Integer>() {
323+
boolean once = true;
324+
@Override
325+
public void onStart() {
326+
request(0); // make sure it doesn't start in unlimited mode
327+
}
328+
@Override
329+
public void onNext(Integer t) {
330+
super.onNext(t);
331+
if (once) {
332+
once = false;
333+
ps.onNext(2);
334+
ps.onCompleted(); // as if an async completion arrived while in the loop
335+
requestMore(1);
336+
}
337+
}
338+
};
339+
ps.onBackpressureBlock(3).unsafeSubscribe(o);
340+
ps.onNext(1);
341+
o.requestMore(1);
342+
343+
o.assertNoErrors();
344+
o.assertTerminalEvent();
345+
o.assertReceivedOnNext(Arrays.asList(1, 2));
346+
}
262347
}

0 commit comments

Comments
 (0)