Skip to content

Commit 29bceef

Browse files
mostroverkhovakarnokd
authored andcommitted
[2.x] UnicastSubject fail fast support (#5217)
* [2.x] Unicast subject fail fast support * follow up: mark new factory methods experimental, remove excessive constructor, fix typos * follow up: test coverage * follow up: add @SInCE for new methods
1 parent 0e19566 commit 29bceef

File tree

2 files changed

+200
-30
lines changed

2 files changed

+200
-30
lines changed

src/main/java/io/reactivex/subjects/UnicastSubject.java

Lines changed: 115 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.subjects;
1515

16+
import io.reactivex.annotations.Experimental;
1617
import io.reactivex.annotations.Nullable;
1718
import io.reactivex.plugins.RxJavaPlugins;
1819
import java.util.concurrent.atomic.*;
@@ -53,6 +54,9 @@ public final class UnicastSubject<T> extends Subject<T> {
5354
/** The optional callback when the Subject gets cancelled or terminates. */
5455
final AtomicReference<Runnable> onTerminate;
5556

57+
/** deliver onNext events before error event */
58+
final boolean delayError;
59+
5660
/** Indicates the single observer has cancelled. */
5761
volatile boolean disposed;
5862

@@ -79,7 +83,7 @@ public final class UnicastSubject<T> extends Subject<T> {
7983
*/
8084
@CheckReturnValue
8185
public static <T> UnicastSubject<T> create() {
82-
return new UnicastSubject<T>(bufferSize());
86+
return new UnicastSubject<T>(bufferSize(), true);
8387
}
8488

8589
/**
@@ -90,7 +94,7 @@ public static <T> UnicastSubject<T> create() {
9094
*/
9195
@CheckReturnValue
9296
public static <T> UnicastSubject<T> create(int capacityHint) {
93-
return new UnicastSubject<T>(capacityHint);
97+
return new UnicastSubject<T>(capacityHint, true);
9498
}
9599

96100
/**
@@ -102,37 +106,91 @@ public static <T> UnicastSubject<T> create(int capacityHint) {
102106
*
103107
* @param <T> the value type
104108
* @param capacityHint the hint to size the internal unbounded buffer
105-
* @param onCancelled the non null callback
109+
* @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed
110+
* @return an UnicastSubject instance
111+
*/
112+
@CheckReturnValue
113+
public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate) {
114+
return new UnicastSubject<T>(capacityHint, onTerminate, true);
115+
}
116+
117+
/**
118+
* Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and
119+
* a callback for the case when the single Subscriber cancels its subscription.
120+
*
121+
* <p>The callback, if not null, is called exactly once and
122+
* non-overlapped with any active replay.
123+
*
124+
* @param <T> the value type
125+
* @param capacityHint the hint to size the internal unbounded buffer
126+
* @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed
127+
* @param delayError deliver pending onNext events before onError
106128
* @return an UnicastSubject instance
129+
* @since 2.0.8 - experimental
107130
*/
108131
@CheckReturnValue
109-
public static <T> UnicastSubject<T> create(int capacityHint, Runnable onCancelled) {
110-
return new UnicastSubject<T>(capacityHint, onCancelled);
132+
@Experimental
133+
public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate, boolean delayError) {
134+
return new UnicastSubject<T>(capacityHint, onTerminate, delayError);
111135
}
112136

113137
/**
114-
* Creates an UnicastSubject with the given capacity hint.
138+
* Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag.
139+
*
140+
* <p>The callback, if not null, is called exactly once and
141+
* non-overlapped with any active replay.
142+
*
143+
* @param <T> the value type
144+
* @param delayError deliver pending onNext events before onError
145+
* @return an UnicastSubject instance
146+
* @since 2.0.8 - experimental
147+
*/
148+
@CheckReturnValue
149+
@Experimental
150+
public static <T> UnicastSubject<T> create(boolean delayError) {
151+
return new UnicastSubject<T>(bufferSize(), delayError);
152+
}
153+
154+
155+
/**
156+
* Creates an UnicastSubject with the given capacity hint and delay error flag.
115157
* @param capacityHint the capacity hint for the internal, unbounded queue
116-
* @since 2.0
158+
* @param delayError deliver pending onNext events before onError
159+
* @since 2.0.8 - experimental
117160
*/
118-
UnicastSubject(int capacityHint) {
161+
UnicastSubject(int capacityHint, boolean delayError) {
119162
this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint"));
120163
this.onTerminate = new AtomicReference<Runnable>();
164+
this.delayError = delayError;
121165
this.actual = new AtomicReference<Observer<? super T>>();
122166
this.once = new AtomicBoolean();
123167
this.wip = new UnicastQueueDisposable();
124168
}
125169

126170
/**
127-
* Creates an UnicastProcessor with the given capacity hint and callback
128-
* for when the Processor is terminated normally or its single Subscriber cancels.
171+
* Creates an UnicastSubject with the given capacity hint and callback
172+
* for when the Subject is terminated normally or its single Subscriber cancels.
129173
* @param capacityHint the capacity hint for the internal, unbounded queue
130-
* @param onTerminate the callback to run when the Processor is terminated or cancelled, null not allowed
174+
* @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed
131175
* @since 2.0
132-
*/
176+
*
177+
* */
133178
UnicastSubject(int capacityHint, Runnable onTerminate) {
179+
this(capacityHint, onTerminate, true);
180+
}
181+
182+
/**
183+
* Creates an UnicastSubject with the given capacity hint, delay error flag and callback
184+
* for when the Subject is terminated normally or its single Subscriber cancels.
185+
* @param capacityHint the capacity hint for the internal, unbounded queue
186+
* @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed
187+
* @param delayError deliver pending onNext events before onError
188+
* @since 2.0.8 - experimental
189+
*/
190+
UnicastSubject(int capacityHint, Runnable onTerminate, boolean delayError) {
134191
this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint"));
135192
this.onTerminate = new AtomicReference<Runnable>(ObjectHelper.requireNonNull(onTerminate, "onTerminate"));
193+
this.delayError = delayError;
136194
this.actual = new AtomicReference<Observer<? super T>>();
137195
this.once = new AtomicBoolean();
138196
this.wip = new UnicastQueueDisposable();
@@ -212,6 +270,8 @@ public void onComplete() {
212270
void drainNormal(Observer<? super T> a) {
213271
int missed = 1;
214272
SimpleQueue<T> q = queue;
273+
boolean failFast = !this.delayError;
274+
boolean canBeError = true;
215275
for (;;) {
216276
for (;;) {
217277

@@ -221,19 +281,23 @@ void drainNormal(Observer<? super T> a) {
221281
return;
222282
}
223283

224-
boolean d = done;
284+
boolean d = this.done;
225285
T v = queue.poll();
226286
boolean empty = v == null;
227287

228-
if (d && empty) {
229-
actual.lazySet(null);
230-
Throwable ex = error;
231-
if (ex != null) {
232-
a.onError(ex);
233-
} else {
234-
a.onComplete();
288+
if (d) {
289+
if (failFast && canBeError) {
290+
if (failedFast(q, a)) {
291+
return;
292+
} else {
293+
canBeError = false;
294+
}
295+
}
296+
297+
if (empty) {
298+
errorOrComplete(a);
299+
return;
235300
}
236-
return;
237301
}
238302

239303
if (empty) {
@@ -254,6 +318,7 @@ void drainFused(Observer<? super T> a) {
254318
int missed = 1;
255319

256320
final SpscLinkedArrayQueue<T> q = queue;
321+
final boolean failFast = !delayError;
257322

258323
for (;;) {
259324

@@ -262,20 +327,18 @@ void drainFused(Observer<? super T> a) {
262327
q.clear();
263328
return;
264329
}
265-
266330
boolean d = done;
267331

332+
if (failFast && d) {
333+
if (failedFast(q, a)) {
334+
return;
335+
}
336+
}
337+
268338
a.onNext(null);
269339

270340
if (d) {
271-
actual.lazySet(null);
272-
273-
Throwable ex = error;
274-
if (ex != null) {
275-
a.onError(ex);
276-
} else {
277-
a.onComplete();
278-
}
341+
errorOrComplete(a);
279342
return;
280343
}
281344

@@ -286,6 +349,28 @@ void drainFused(Observer<? super T> a) {
286349
}
287350
}
288351

352+
void errorOrComplete(Observer<? super T> a) {
353+
actual.lazySet(null);
354+
Throwable ex = error;
355+
if (ex != null) {
356+
a.onError(ex);
357+
} else {
358+
a.onComplete();
359+
}
360+
}
361+
362+
boolean failedFast(final SimpleQueue<T> q, Observer<? super T> a) {
363+
Throwable ex = error;
364+
if (ex != null) {
365+
actual.lazySet(null);
366+
q.clear();
367+
a.onError(ex);
368+
return true;
369+
} else {
370+
return false;
371+
}
372+
}
373+
289374
void drain() {
290375
if (wip.getAndIncrement() != 0) {
291376
return;

src/test/java/io/reactivex/subjects/UnicastSubjectTest.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.reactivex.observers.*;
2828
import io.reactivex.plugins.RxJavaPlugins;
2929
import io.reactivex.schedulers.Schedulers;
30+
import static org.mockito.Mockito.mock;
3031

3132
public class UnicastSubjectTest {
3233

@@ -69,6 +70,90 @@ public void fusionOfflie() {
6970
.assertResult(1);
7071
}
7172

73+
@Test
74+
public void failFast() {
75+
UnicastSubject<Integer> ap = UnicastSubject.create(false);
76+
ap.onNext(1);
77+
ap.onError(new RuntimeException());
78+
TestObserver<Integer> ts = TestObserver.create();
79+
ap.subscribe(ts);
80+
81+
ts
82+
.assertValueCount(0)
83+
.assertError(RuntimeException.class);
84+
}
85+
86+
@Test
87+
public void threeArgsFactoryFailFast() {
88+
Runnable noop = mock(Runnable.class);
89+
UnicastSubject<Integer> ap = UnicastSubject.create(16, noop, false);
90+
ap.onNext(1);
91+
ap.onError(new RuntimeException());
92+
TestObserver<Integer> ts = TestObserver.create();
93+
ap.subscribe(ts);
94+
95+
ts
96+
.assertValueCount(0)
97+
.assertError(RuntimeException.class);
98+
}
99+
100+
@Test
101+
public void threeArgsFactoryDelayError() {
102+
Runnable noop = mock(Runnable.class);
103+
UnicastSubject<Integer> ap = UnicastSubject.create(16, noop, true);
104+
ap.onNext(1);
105+
ap.onError(new RuntimeException());
106+
TestObserver<Integer> ts = TestObserver.create();
107+
ap.subscribe(ts);
108+
109+
ts
110+
.assertValueCount(1)
111+
.assertError(RuntimeException.class);
112+
}
113+
114+
@Test
115+
public void fusionOfflineFailFast() {
116+
UnicastSubject<Integer> ap = UnicastSubject.create(false);
117+
ap.onNext(1);
118+
ap.onError(new RuntimeException());
119+
TestObserver<Integer> ts = ObserverFusion.newTest(QueueDisposable.ANY);
120+
ap.subscribe(ts);
121+
122+
ts
123+
.assertValueCount(0)
124+
.assertError(RuntimeException.class);
125+
}
126+
127+
@Test
128+
public void fusionOfflineFailFastMultipleEvents() {
129+
UnicastSubject<Integer> ap = UnicastSubject.create(false);
130+
ap.onNext(1);
131+
ap.onNext(2);
132+
ap.onNext(3);
133+
ap.onComplete();
134+
TestObserver<Integer> ts = ObserverFusion.newTest(QueueDisposable.ANY);
135+
ap.subscribe(ts);
136+
137+
ts
138+
.assertValueCount(3)
139+
.assertComplete();
140+
}
141+
142+
@Test
143+
public void failFastMultipleEvents() {
144+
UnicastSubject<Integer> ap = UnicastSubject.create(false);
145+
ap.onNext(1);
146+
ap.onNext(2);
147+
ap.onNext(3);
148+
ap.onComplete();
149+
TestObserver<Integer> ts = TestObserver.create();
150+
ap.subscribe(ts);
151+
152+
ts
153+
.assertValueCount(3)
154+
.assertComplete();
155+
}
156+
72157
@Test
73158
public void onTerminateCalledWhenOnError() {
74159
final AtomicBoolean didRunOnTerminate = new AtomicBoolean();

0 commit comments

Comments
 (0)