11/**
22 * Copyright 2013 Netflix, Inc.
3- *
3+ *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
66 * You may obtain a copy of the License at
7- *
7+ *
88 * http://www.apache.org/licenses/LICENSE-2.0
9- *
9+ *
1010 * Unless required by applicable law or agreed to in writing, software
1111 * distributed under the License is distributed on an "AS IS" BASIS,
1212 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
5151 * <p>
5252 * <pre> {@code
5353
54- PublishSubject <Object> subject = PublishSubject.create();
54+ * ublishSubject <Object> subject = PublishSubject.create();
5555 // observer1 will receive all onNext and onCompleted events
5656 subject.subscribe(observer1);
5757 subject.onNext("one");
6262 subject.onCompleted();
6363
6464 } </pre>
65- *
65+ *
6666 * @param <T>
6767 */
6868public class PublishSubject <T > extends Subject <T , T > {
6969 public static <T > PublishSubject <T > create () {
7070 final ConcurrentHashMap <Subscription , Observer <? super T >> observers = new ConcurrentHashMap <Subscription , Observer <? super T >>();
71- final AtomicReference <Notification <? extends T >> terminalState = new AtomicReference <Notification <? extends T >>();
7271
7372 OnSubscribeFunc <T > onSubscribe = new OnSubscribeFunc <T >() {
7473 @ Override
7574 public Subscription onSubscribe (Observer <? super T > observer ) {
76- // shortcut check if terminal state exists already
77- Subscription s = checkTerminalState (observer );
78- if (s != null ) return s ;
79-
8075 final SafeObservableSubscription subscription = new SafeObservableSubscription ();
8176
8277 subscription .wrap (new Subscription () {
@@ -87,67 +82,26 @@ public void unsubscribe() {
8782 }
8883 });
8984
90- /**
91- * NOTE: We are synchronizing to avoid a race condition between terminalState being set and
92- * a new observer being added to observers.
93- *
94- * The synchronization only occurs on subscription and terminal states, it does not affect onNext calls
95- * so a high-volume hot-observable will not pay this cost for emitting data.
96- *
97- * Due to the restricted impact of blocking synchronization here I have not pursued more complicated
98- * approaches to try and stay completely non-blocking.
99- */
100- synchronized (terminalState ) {
101- // check terminal state again
102- s = checkTerminalState (observer );
103- if (s != null )
104- return s ;
105-
106- // on subscribe add it to the map of outbound observers to notify
107- observers .put (subscription , observer );
108-
109- return subscription ;
110- }
111- }
85+ // on subscribe add it to the map of outbound observers to notify
86+ observers .put (subscription , observer );
11287
113- private Subscription checkTerminalState (Observer <? super T > observer ) {
114- Notification <? extends T > n = terminalState .get ();
115- if (n != null ) {
116- // we are terminated to immediately emit and don't continue with subscription
117- if (n .isOnCompleted ()) {
118- observer .onCompleted ();
119- } else {
120- observer .onError (n .getThrowable ());
121- }
122- return Subscriptions .empty ();
123- } else {
124- return null ;
125- }
88+ return subscription ;
12689 }
90+
12791 };
12892
129- return new PublishSubject <T >(onSubscribe , observers , terminalState );
93+ return new PublishSubject <T >(onSubscribe , observers );
13094 }
13195
13296 private final ConcurrentHashMap <Subscription , Observer <? super T >> observers ;
133- private final AtomicReference <Notification <? extends T >> terminalState ;
13497
135- protected PublishSubject (OnSubscribeFunc <T > onSubscribe , ConcurrentHashMap <Subscription , Observer <? super T >> observers , AtomicReference < Notification <? extends T >> terminalState ) {
98+ protected PublishSubject (OnSubscribeFunc <T > onSubscribe , ConcurrentHashMap <Subscription , Observer <? super T >> observers ) {
13699 super (onSubscribe );
137100 this .observers = observers ;
138- this .terminalState = terminalState ;
139101 }
140102
141103 @ Override
142104 public void onCompleted () {
143- /**
144- * Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
145- * Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
146- * onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
147- */
148- synchronized (terminalState ) {
149- terminalState .set (new Notification <T >());
150- }
151105 for (Observer <? super T > observer : snapshotOfValues ()) {
152106 observer .onCompleted ();
153107 }
@@ -156,14 +110,6 @@ public void onCompleted() {
156110
157111 @ Override
158112 public void onError (Throwable e ) {
159- /**
160- * Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
161- * Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
162- * onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
163- */
164- synchronized (terminalState ) {
165- terminalState .set (new Notification <T >(e ));
166- }
167113 for (Observer <? super T > observer : snapshotOfValues ()) {
168114 observer .onError (e );
169115 }
@@ -179,12 +125,12 @@ public void onNext(T args) {
179125
180126 /**
181127 * Current snapshot of 'values()' so that concurrent modifications aren't included.
182- *
128+ *
183129 * This makes it behave deterministically in a single-threaded execution when nesting subscribes.
184- *
130+ *
185131 * In multi-threaded execution it will cause new subscriptions to wait until the following onNext instead
186132 * of possibly being included in the current onNext iteration.
187- *
133+ *
188134 * @return List<Observer<T>>
189135 */
190136 private Collection <Observer <? super T >> snapshotOfValues () {
@@ -378,75 +324,6 @@ private void assertObservedUntilTwo(Observer<String> aObserver)
378324 verify (aObserver , Mockito .never ()).onCompleted ();
379325 }
380326
381- /**
382- * Test that subscribing after onError/onCompleted immediately terminates instead of causing it to hang.
383- *
384- * Nothing is mentioned in Rx Guidelines for what to do in this case so I'm doing what seems to make sense
385- * which is:
386- *
387- * - cache terminal state (onError/onCompleted)
388- * - any subsequent subscriptions will immediately receive the terminal state rather than start a new subscription
389- *
390- */
391- @ Test
392- public void testUnsubscribeAfterOnCompleted () {
393- PublishSubject <String > subject = PublishSubject .create ();
394-
395- @ SuppressWarnings ("unchecked" )
396- Observer <String > anObserver = mock (Observer .class );
397- subject .subscribe (anObserver );
398-
399- subject .onNext ("one" );
400- subject .onNext ("two" );
401- subject .onCompleted ();
402-
403- InOrder inOrder = inOrder (anObserver );
404- inOrder .verify (anObserver , times (1 )).onNext ("one" );
405- inOrder .verify (anObserver , times (1 )).onNext ("two" );
406- inOrder .verify (anObserver , times (1 )).onCompleted ();
407- inOrder .verify (anObserver , Mockito .never ()).onError (any (Throwable .class ));
408-
409- @ SuppressWarnings ("unchecked" )
410- Observer <String > anotherObserver = mock (Observer .class );
411- subject .subscribe (anotherObserver );
412-
413- inOrder = inOrder (anotherObserver );
414- inOrder .verify (anotherObserver , Mockito .never ()).onNext ("one" );
415- inOrder .verify (anotherObserver , Mockito .never ()).onNext ("two" );
416- inOrder .verify (anotherObserver , times (1 )).onCompleted ();
417- inOrder .verify (anotherObserver , Mockito .never ()).onError (any (Throwable .class ));
418- }
419-
420- @ Test
421- public void testUnsubscribeAfterOnError () {
422- PublishSubject <String > subject = PublishSubject .create ();
423- RuntimeException exception = new RuntimeException ("failure" );
424-
425- @ SuppressWarnings ("unchecked" )
426- Observer <String > anObserver = mock (Observer .class );
427- subject .subscribe (anObserver );
428-
429- subject .onNext ("one" );
430- subject .onNext ("two" );
431- subject .onError (exception );
432-
433- InOrder inOrder = inOrder (anObserver );
434- inOrder .verify (anObserver , times (1 )).onNext ("one" );
435- inOrder .verify (anObserver , times (1 )).onNext ("two" );
436- inOrder .verify (anObserver , times (1 )).onError (exception );
437- inOrder .verify (anObserver , Mockito .never ()).onCompleted ();
438-
439- @ SuppressWarnings ("unchecked" )
440- Observer <String > anotherObserver = mock (Observer .class );
441- subject .subscribe (anotherObserver );
442-
443- inOrder = inOrder (anotherObserver );
444- inOrder .verify (anotherObserver , Mockito .never ()).onNext ("one" );
445- inOrder .verify (anotherObserver , Mockito .never ()).onNext ("two" );
446- inOrder .verify (anotherObserver , times (1 )).onError (exception );
447- inOrder .verify (anotherObserver , Mockito .never ()).onCompleted ();
448- }
449-
450327 @ Test
451328 public void testUnsubscribe ()
452329 {
@@ -519,8 +396,7 @@ public void call(String v) {
519396
520397 });
521398
522-
523- for (int i =0 ; i <10 ; i ++) {
399+ for (int i = 0 ; i < 10 ; i ++) {
524400 s .onNext (i );
525401 }
526402 s .onCompleted ();
@@ -533,5 +409,83 @@ public void call(String v) {
533409 assertEquals (45 , list .size ());
534410 }
535411
412+ /**
413+ * Should be able to unsubscribe all Observers, have it stop emitting, then subscribe new ones and it start emitting again.
414+ */
415+ @ Test
416+ public void testReSubscribe () {
417+ final PublishSubject <Integer > ps = PublishSubject .create ();
418+
419+ Observer <Integer > o1 = mock (Observer .class );
420+ Subscription s1 = ps .subscribe (o1 );
421+
422+ // emit
423+ ps .onNext (1 );
424+
425+ // validate we got it
426+ InOrder inOrder1 = inOrder (o1 );
427+ inOrder1 .verify (o1 , times (1 )).onNext (1 );
428+ inOrder1 .verifyNoMoreInteractions ();
429+
430+ // unsubscribe
431+ s1 .unsubscribe ();
432+
433+ // emit again but nothing will be there to receive it
434+ ps .onNext (2 );
435+
436+ Observer <Integer > o2 = mock (Observer .class );
437+ Subscription s2 = ps .subscribe (o2 );
438+
439+ // emit
440+ ps .onNext (3 );
441+
442+ // validate we got it
443+ InOrder inOrder2 = inOrder (o2 );
444+ inOrder2 .verify (o2 , times (1 )).onNext (3 );
445+ inOrder2 .verifyNoMoreInteractions ();
446+
447+ s2 .unsubscribe ();
448+ }
449+
450+ /**
451+ * Even if subject received an onError/onCompleted, new subscriptions should be able to restart it.
452+ */
453+ @ Test
454+ public void testReSubscribeAfterTerminalState () {
455+ final PublishSubject <Integer > ps = PublishSubject .create ();
456+
457+ Observer <Integer > o1 = mock (Observer .class );
458+ Subscription s1 = ps .subscribe (o1 );
459+
460+ // emit
461+ ps .onNext (1 );
462+
463+ // validate we got it
464+ InOrder inOrder1 = inOrder (o1 );
465+ inOrder1 .verify (o1 , times (1 )).onNext (1 );
466+ inOrder1 .verifyNoMoreInteractions ();
467+
468+ // unsubscribe
469+ s1 .unsubscribe ();
470+
471+ ps .onCompleted ();
472+
473+ // emit again but nothing will be there to receive it
474+ ps .onNext (2 );
475+
476+ Observer <Integer > o2 = mock (Observer .class );
477+ Subscription s2 = ps .subscribe (o2 );
478+
479+ // emit
480+ ps .onNext (3 );
481+
482+ // validate we got it
483+ InOrder inOrder2 = inOrder (o2 );
484+ inOrder2 .verify (o2 , times (1 )).onNext (3 );
485+ inOrder2 .verifyNoMoreInteractions ();
486+
487+ s2 .unsubscribe ();
488+ }
489+
536490 }
537491}
0 commit comments