2424
2525import rx .Observer ;
2626import rx .Subscription ;
27+ import rx .functions .Func0 ;
2728import rx .observables .ConnectableObservable ;
2829import rx .subjects .PublishSubject ;
2930import rx .subjects .Subject ;
@@ -34,8 +35,7 @@ public class OnSubscribeMulticastTest {
3435 public void testMulticast () {
3536 Subject <String , String > source = PublishSubject .create ();
3637
37- ConnectableObservable <String > multicasted = source .multicast (
38- PublishSubject .<String > create ());
38+ ConnectableObservable <String > multicasted = source .multicast (new PublishSubjectFactory ());
3939
4040 @ SuppressWarnings ("unchecked" )
4141 Observer <String > observer = mock (Observer .class );
@@ -62,8 +62,7 @@ public void testMulticast() {
6262 public void testMulticastConnectTwice () {
6363 Subject <String , String > source = PublishSubject .create ();
6464
65- ConnectableObservable <String > multicasted = source .multicast (
66- PublishSubject .<String > create ());
65+ ConnectableObservable <String > multicasted = source .multicast (new PublishSubjectFactory ());
6766
6867 @ SuppressWarnings ("unchecked" )
6968 Observer <String > observer = mock (Observer .class );
@@ -87,8 +86,7 @@ public void testMulticastConnectTwice() {
8786 public void testMulticastDisconnect () {
8887 Subject <String , String > source = PublishSubject .create ();
8988
90- ConnectableObservable <String > multicasted = source .multicast (
91- PublishSubject .<String > create ());
89+ ConnectableObservable <String > multicasted = source .multicast (new PublishSubjectFactory ());
9290
9391 @ SuppressWarnings ("unchecked" )
9492 Observer <String > observer = mock (Observer .class );
@@ -102,6 +100,9 @@ public void testMulticastDisconnect() {
102100 connection .unsubscribe ();
103101 source .onNext ("three" );
104102
103+ // subscribe again
104+ multicasted .subscribe (observer );
105+ // reconnect
105106 multicasted .connect ();
106107 source .onNext ("four" );
107108 source .onCompleted ();
@@ -113,4 +114,13 @@ public void testMulticastDisconnect() {
113114 verify (observer , times (1 )).onCompleted ();
114115
115116 }
117+
118+ private static final class PublishSubjectFactory implements Func0 <Subject <String , String >> {
119+
120+ @ Override
121+ public Subject <String , String > call () {
122+ return PublishSubject .<String > create ();
123+ }
124+
125+ }
116126}
0 commit comments