@@ -49,6 +49,7 @@ public final class UnicastSubject<T> extends Subject<T, T> {
4949 public static <T > UnicastSubject <T > create () {
5050 return create (16 );
5151 }
52+
5253 /**
5354 * Constructs an empty UnicastSubject instance with a capacity hint.
5455 * <p>The capacity hint determines the internal queue's island size: the larger
@@ -59,7 +60,18 @@ public static <T> UnicastSubject<T> create() {
5960 * @return the created BufferUntilSubscriber instance
6061 */
6162 public static <T > UnicastSubject <T > create (int capacityHint ) {
62- State <T > state = new State <T >(capacityHint , null );
63+ State <T > state = new State <T >(capacityHint , false , null );
64+ return new UnicastSubject <T >(state );
65+ }
66+
67+ /**
68+ * Constructs an empty UnicastSubject instance with the default capacity hint of 16 elements.
69+ *
70+ * @param delayError deliver pending next events before error.
71+ * @return the created UnicastSubject instance
72+ */
73+ public static <T > UnicastSubject <T > create (boolean delayError ) {
74+ State <T > state = new State <T >(16 , delayError , null );
6375 return new UnicastSubject <T >(state );
6476 }
6577
@@ -78,7 +90,28 @@ public static <T> UnicastSubject<T> create(int capacityHint) {
7890 * @return the created BufferUntilSubscriber instance
7991 */
8092 public static <T > UnicastSubject <T > create (int capacityHint , Action0 onTerminated ) {
81- State <T > state = new State <T >(capacityHint , onTerminated );
93+ State <T > state = new State <T >(capacityHint , false , onTerminated );
94+ return new UnicastSubject <T >(state );
95+ }
96+
97+ /**
98+ * Constructs an empty UnicastSubject instance with a capacity hint, delay error
99+ * flag and Action0 instance to call if the subject reaches its terminal state
100+ * or the single Subscriber unsubscribes mid-sequence.
101+ * <p>The capacity hint determines the internal queue's island size: the larger
102+ * it is the less frequent allocation will happen if there is no subscriber
103+ * or the subscriber hasn't caught up.
104+ * @param <T> the input and output value type
105+ * @param capacityHint the capacity hint for the internal queue
106+ * @param onTerminated the optional callback to call when subject reaches its terminal state
107+ * or the single Subscriber unsubscribes mid-sequence. It will be called
108+ * at most once.
109+ * @param delayError flag indicating whether to deliver pending next events before error.
110+ * @return the created BufferUntilSubscriber instance
111+ */
112+ public static <T > UnicastSubject <T > create (int capacityHint ,
113+ Action0 onTerminated , boolean delayError ) {
114+ State <T > state = new State <T >(capacityHint , delayError , onTerminated );
82115 return new UnicastSubject <T >(state );
83116 }
84117
@@ -119,6 +152,8 @@ static final class State<T> extends AtomicLong implements Producer, Observer<T>,
119152 final AtomicReference <Subscriber <? super T >> subscriber ;
120153 /** The queue holding values until the subscriber arrives and catches up. */
121154 final Queue <Object > queue ;
155+ /** Deliver pending next events before error. */
156+ final boolean delayError ;
122157 /** Atomically set to true on terminal condition. */
123158 final AtomicReference <Action0 > terminateOnce ;
124159 /** In case the source emitted an error. */
@@ -137,10 +172,12 @@ static final class State<T> extends AtomicLong implements Producer, Observer<T>,
137172 * reduce allocation frequency
138173 * @param onTerminated the action to call when the subject reaches its terminal state or
139174 * the single subscriber unsubscribes.
175+ * @param delayError deliver pending next events before error.
140176 */
141- public State (int capacityHint , Action0 onTerminated ) {
177+ public State (int capacityHint , boolean delayError , Action0 onTerminated ) {
142178 this .subscriber = new AtomicReference <Subscriber <? super T >>();
143179 this .terminateOnce = onTerminated != null ? new AtomicReference <Action0 >(onTerminated ) : null ;
180+ this .delayError = delayError ;
144181
145182 Queue <Object > q ;
146183 if (capacityHint > 1 ) {
@@ -266,14 +303,14 @@ void replay() {
266303 emitting = true ;
267304 }
268305 Queue <Object > q = queue ;
306+ boolean delayError = this .delayError ;
269307 for (;;) {
270308 Subscriber <? super T > s = subscriber .get ();
271309 boolean unlimited = false ;
272310 if (s != null ) {
273311 boolean d = done ;
274312 boolean empty = q .isEmpty ();
275-
276- if (checkTerminated (d , empty , s )) {
313+ if (checkTerminated (d , empty , delayError , s )) {
277314 return ;
278315 }
279316 long r = get ();
@@ -284,7 +321,7 @@ void replay() {
284321 d = done ;
285322 Object v = q .poll ();
286323 empty = v == null ;
287- if (checkTerminated (d , empty , s )) {
324+ if (checkTerminated (d , empty , delayError , s )) {
288325 return ;
289326 }
290327 if (empty ) {
@@ -348,23 +385,28 @@ public boolean isUnsubscribed() {
348385 * an error happened or the source terminated and the queue is empty
349386 * @param done indicates the source has called onCompleted
350387 * @param empty indicates if there are no more source values in the queue
388+ * @param delayError indicates whether to deliver pending next events before error
351389 * @param s the target Subscriber to emit events to
352390 * @return true if this Subject reached a terminal state and the drain loop should quit
353391 */
354- boolean checkTerminated (boolean done , boolean empty , Subscriber <? super T > s ) {
392+ boolean checkTerminated (boolean done , boolean empty , boolean delayError , Subscriber <? super T > s ) {
355393 if (s .isUnsubscribed ()) {
356394 queue .clear ();
357395 return true ;
358396 }
359397 if (done ) {
360398 Throwable e = error ;
361- if (e != null ) {
399+ if (e != null && ! delayError ) {
362400 queue .clear ();
363401 s .onError (e );
364402 return true ;
365- } else
403+ }
366404 if (empty ) {
367- s .onCompleted ();
405+ if (e != null ) {
406+ s .onError (e );
407+ } else {
408+ s .onCompleted ();
409+ }
368410 return true ;
369411 }
370412 }
0 commit comments