File tree Expand file tree Collapse file tree 2 files changed +4
-34
lines changed
src/main/java/rx/internal/operators Expand file tree Collapse file tree 2 files changed +4
-34
lines changed Original file line number Diff line number Diff line change 2020import rx .Subscriber ;
2121import rx .functions .Func1 ;
2222import rx .observers .SerializedSubscriber ;
23+ import rx .observers .Subscribers ;
2324import rx .subjects .PublishSubject ;
2425
2526/**
@@ -44,24 +45,7 @@ public Subscriber<? super T> call(final Subscriber<? super T> _child) {
4445 final SerializedSubscriber <T > child = new SerializedSubscriber <T >(_child );
4546 final PublishSubject <Observable <T >> delayedEmissions = PublishSubject .create ();
4647
47- _child .add (Observable .merge (delayedEmissions ).unsafeSubscribe (new Subscriber <T >() {
48-
49- @ Override
50- public void onCompleted () {
51- child .onCompleted ();
52- }
53-
54- @ Override
55- public void onError (Throwable e ) {
56- child .onError (e );
57- }
58-
59- @ Override
60- public void onNext (T t ) {
61- child .onNext (t );
62- }
63-
64- }));
48+ _child .add (Observable .merge (delayedEmissions ).unsafeSubscribe (Subscribers .from (child )));
6549
6650 return new Subscriber <T >(_child ) {
6751
Original file line number Diff line number Diff line change 2626import rx .functions .Action1 ;
2727import rx .functions .Func0 ;
2828import rx .observables .ConnectableObservable ;
29+ import rx .observers .Subscribers ;
2930import rx .subjects .Subject ;
3031import rx .subscriptions .Subscriptions ;
3132
@@ -90,22 +91,7 @@ public void connect(Action1<? super Subscription> connection) {
9091 final Subject <? super T , ? extends R > subject = subjectFactory .call ();
9192 // create new Subscriber that will pass-thru to the subject we just created
9293 // we do this since it is also a Subscription whereas the Subject is not
93- subscription = new Subscriber <T >() {
94- @ Override
95- public void onCompleted () {
96- subject .onCompleted ();
97- }
98-
99- @ Override
100- public void onError (Throwable e ) {
101- subject .onError (e );
102- }
103-
104- @ Override
105- public void onNext (T args ) {
106- subject .onNext (args );
107- }
108- };
94+ subscription = Subscribers .from (subject );
10995 final AtomicReference <Subscription > gs = new AtomicReference <Subscription >();
11096 gs .set (Subscriptions .create (new Action0 () {
11197 @ Override
You can’t perform that action at this time.
0 commit comments