Skip to content

Commit 7646371

Browse files
authored
2.x: Coverage improvements, logical fixes and cleanups 03/08 (#5905)
1 parent 4e50ea4 commit 7646371

File tree

11 files changed

+411
-42
lines changed

11 files changed

+411
-42
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,24 @@ public FlowableCache(Flowable<T> source, int capacityHint) {
5050
protected void subscribeActual(Subscriber<? super T> t) {
5151
// we can connect first because we replay everything anyway
5252
ReplaySubscription<T> rp = new ReplaySubscription<T>(t, state);
53-
state.addChild(rp);
54-
5553
t.onSubscribe(rp);
5654

55+
boolean doReplay = true;
56+
if (state.addChild(rp)) {
57+
if (rp.requested.get() == ReplaySubscription.CANCELLED) {
58+
state.removeChild(rp);
59+
doReplay = false;
60+
}
61+
}
62+
5763
// we ensure a single connection here to save an instance field of AtomicBoolean in state.
5864
if (!once.get() && once.compareAndSet(false, true)) {
5965
state.connect();
6066
}
6167

62-
// no need to call rp.replay() here because the very first request will trigger it anyway
68+
if (doReplay) {
69+
rp.replay();
70+
}
6371
}
6472

6573
/**
@@ -122,22 +130,23 @@ static final class CacheState<T> extends LinkedArrayList implements FlowableSubs
122130
/**
123131
* Adds a ReplaySubscription to the subscribers array atomically.
124132
* @param p the target ReplaySubscription wrapping a downstream Subscriber with state
133+
* @return true if the ReplaySubscription was added or false if the cache is already terminated
125134
*/
126-
public void addChild(ReplaySubscription<T> p) {
135+
public boolean addChild(ReplaySubscription<T> p) {
127136
// guarding by connection to save on allocating another object
128137
// thus there are two distinct locks guarding the value-addition and child come-and-go
129138
for (;;) {
130139
ReplaySubscription<T>[] a = subscribers.get();
131140
if (a == TERMINATED) {
132-
return;
141+
return false;
133142
}
134143
int n = a.length;
135144
@SuppressWarnings("unchecked")
136145
ReplaySubscription<T>[] b = new ReplaySubscription[n + 1];
137146
System.arraycopy(a, 0, b, 0, n);
138147
b[n] = p;
139148
if (subscribers.compareAndSet(a, b)) {
140-
return;
149+
return true;
141150
}
142151
}
143152
}
@@ -240,12 +249,16 @@ static final class ReplaySubscription<T>
240249
extends AtomicInteger implements Subscription {
241250

242251
private static final long serialVersionUID = -2557562030197141021L;
243-
private static final long CANCELLED = -1;
252+
private static final long CANCELLED = Long.MIN_VALUE;
244253
/** The actual child subscriber. */
245254
final Subscriber<? super T> child;
246255
/** The cache state object. */
247256
final CacheState<T> state;
248257

258+
/**
259+
* Number of items requested and also the cancelled indicator if
260+
* it contains {@link #CANCELLED}.
261+
*/
249262
final AtomicLong requested;
250263

251264
/**
@@ -263,6 +276,9 @@ static final class ReplaySubscription<T>
263276
*/
264277
int index;
265278

279+
/** Number of items emitted so far. */
280+
long emitted;
281+
266282
ReplaySubscription(Subscriber<? super T> child, CacheState<T> state) {
267283
this.child = child;
268284
this.state = state;
@@ -271,17 +287,8 @@ static final class ReplaySubscription<T>
271287
@Override
272288
public void request(long n) {
273289
if (SubscriptionHelper.validate(n)) {
274-
for (;;) {
275-
long r = requested.get();
276-
if (r == CANCELLED) {
277-
return;
278-
}
279-
long u = BackpressureHelper.addCap(r, n);
280-
if (requested.compareAndSet(r, u)) {
281-
replay();
282-
return;
283-
}
284-
}
290+
BackpressureHelper.addCancel(requested, n);
291+
replay();
285292
}
286293
}
287294

@@ -303,12 +310,13 @@ public void replay() {
303310
int missed = 1;
304311
final Subscriber<? super T> child = this.child;
305312
AtomicLong rq = requested;
313+
long e = emitted;
306314

307315
for (;;) {
308316

309317
long r = rq.get();
310318

311-
if (r < 0L) {
319+
if (r == CANCELLED) {
312320
return;
313321
}
314322

@@ -326,9 +334,8 @@ public void replay() {
326334
final int n = b.length - 1;
327335
int j = index;
328336
int k = currentIndexInBuffer;
329-
int valuesProduced = 0;
330337

331-
while (j < s && r > 0) {
338+
while (j < s && e != r) {
332339
if (rq.get() == CANCELLED) {
333340
return;
334341
}
@@ -344,15 +351,14 @@ public void replay() {
344351

345352
k++;
346353
j++;
347-
r--;
348-
valuesProduced++;
354+
e++;
349355
}
350356

351357
if (rq.get() == CANCELLED) {
352358
return;
353359
}
354360

355-
if (r == 0) {
361+
if (r == e) {
356362
Object o = b[k];
357363
if (NotificationLite.isComplete(o)) {
358364
child.onComplete();
@@ -364,15 +370,12 @@ public void replay() {
364370
}
365371
}
366372

367-
if (valuesProduced != 0) {
368-
BackpressureHelper.producedCancel(rq, valuesProduced);
369-
}
370-
371373
index = j;
372374
currentIndexInBuffer = k;
373375
currentBuffer = b;
374376
}
375377

378+
emitted = e;
376379
missed = addAndGet(-missed);
377380
if (missed == 0) {
378381
break;

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -411,11 +411,7 @@ public void clear() {
411411

412412
@Override
413413
public boolean isEmpty() {
414-
Iterator<? extends R> it = current;
415-
if (it == null) {
416-
return queue.isEmpty();
417-
}
418-
return !it.hasNext();
414+
return current == null && queue.isEmpty();
419415
}
420416

421417
@Nullable

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1215,7 +1215,8 @@ public void subscribe(Subscriber<? super T> child) {
12151215
buf = bufferFactory.call();
12161216
} catch (Throwable ex) {
12171217
Exceptions.throwIfFatal(ex);
1218-
throw ExceptionHelper.wrapOrThrow(ex);
1218+
EmptySubscription.error(ex, child);
1219+
return;
12191220
}
12201221
// create a new subscriber to source
12211222
ReplaySubscriber<T> u = new ReplaySubscriber<T>(buf);

src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,11 @@ static final class PublishObserver<T>
148148
@SuppressWarnings("unchecked")
149149
@Override
150150
public void dispose() {
151-
if (observers.get() != TERMINATED) {
152-
InnerDisposable[] ps = observers.getAndSet(TERMINATED);
153-
if (ps != TERMINATED) {
154-
current.compareAndSet(PublishObserver.this, null);
151+
InnerDisposable[] ps = observers.getAndSet(TERMINATED);
152+
if (ps != TERMINATED) {
153+
current.compareAndSet(PublishObserver.this, null);
155154

156-
DisposableHelper.dispose(s);
157-
}
155+
DisposableHelper.dispose(s);
158156
}
159157
}
160158

src/test/java/io/reactivex/internal/operators/flowable/FlowableCacheTest.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,4 +419,124 @@ public void error() {
419419
.test(0L)
420420
.assertFailure(TestException.class);
421421
}
422+
423+
@Test
424+
public void cancelledUpFrontConnectAnyway() {
425+
final AtomicInteger call = new AtomicInteger();
426+
Flowable.fromCallable(new Callable<Object>() {
427+
@Override
428+
public Object call() throws Exception {
429+
return call.incrementAndGet();
430+
}
431+
})
432+
.cache()
433+
.test(1L, true)
434+
.assertNoValues();
435+
436+
assertEquals(1, call.get());
437+
}
438+
439+
@Test
440+
public void cancelledUpFront() {
441+
final AtomicInteger call = new AtomicInteger();
442+
Flowable<Object> f = Flowable.fromCallable(new Callable<Object>() {
443+
@Override
444+
public Object call() throws Exception {
445+
return call.incrementAndGet();
446+
}
447+
}).concatWith(Flowable.never())
448+
.cache();
449+
450+
f.test().assertValuesOnly(1);
451+
452+
f.test(1L, true)
453+
.assertEmpty();
454+
455+
assertEquals(1, call.get());
456+
}
457+
458+
@Test
459+
public void subscribeSubscribeRace() {
460+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
461+
final Flowable<Integer> cache = Flowable.range(1, 500).cache();
462+
463+
final TestSubscriber<Integer> to1 = new TestSubscriber<Integer>();
464+
final TestSubscriber<Integer> to2 = new TestSubscriber<Integer>();
465+
466+
Runnable r1 = new Runnable() {
467+
@Override
468+
public void run() {
469+
cache.subscribe(to1);
470+
}
471+
};
472+
473+
Runnable r2 = new Runnable() {
474+
@Override
475+
public void run() {
476+
cache.subscribe(to2);
477+
}
478+
};
479+
480+
TestHelper.race(r1, r2);
481+
482+
to1
483+
.awaitDone(5, TimeUnit.SECONDS)
484+
.assertSubscribed()
485+
.assertValueCount(500)
486+
.assertComplete()
487+
.assertNoErrors();
488+
489+
to2
490+
.awaitDone(5, TimeUnit.SECONDS)
491+
.assertSubscribed()
492+
.assertValueCount(500)
493+
.assertComplete()
494+
.assertNoErrors();
495+
}
496+
}
497+
498+
@Test
499+
public void subscribeCompleteRace() {
500+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
501+
final PublishProcessor<Integer> ps = PublishProcessor.<Integer>create();
502+
503+
final Flowable<Integer> cache = ps.cache();
504+
505+
cache.test();
506+
507+
final TestSubscriber<Integer> to = new TestSubscriber<Integer>();
508+
509+
Runnable r1 = new Runnable() {
510+
@Override
511+
public void run() {
512+
cache.subscribe(to);
513+
}
514+
};
515+
516+
Runnable r2 = new Runnable() {
517+
@Override
518+
public void run() {
519+
ps.onComplete();
520+
}
521+
};
522+
523+
TestHelper.race(r1, r2);
524+
525+
to
526+
.awaitDone(5, TimeUnit.SECONDS)
527+
.assertResult();
528+
}
529+
}
530+
531+
@Test
532+
public void backpressure() {
533+
Flowable.range(1, 5)
534+
.cache()
535+
.test(0)
536+
.assertEmpty()
537+
.requestMore(2)
538+
.assertValuesOnly(1, 2)
539+
.requestMore(3)
540+
.assertResult(1, 2, 3, 4, 5);
541+
}
422542
}

0 commit comments

Comments
 (0)