@@ -255,7 +255,7 @@ public void testSimpleAsyncConcat() {
255255 TestObservable <String > o1 = new TestObservable <String >("one" , "two" , "three" );
256256 TestObservable <String > o2 = new TestObservable <String >("four" , "five" , "six" );
257257
258- Observable .concat (o1 , o2 ).subscribe (observer );
258+ Observable .concat (Observable . create ( o1 ), Observable . create ( o2 ) ).subscribe (observer );
259259
260260 try {
261261 // wait for async observables to complete
@@ -301,12 +301,12 @@ public void run() {
301301 // emit first
302302 if (!s .isUnsubscribed ()) {
303303 System .out .println ("Emit o1" );
304- observer .onNext (o1 );
304+ observer .onNext (Observable . create ( o1 ) );
305305 }
306306 // emit second
307307 if (!s .isUnsubscribed ()) {
308308 System .out .println ("Emit o2" );
309- observer .onNext (o2 );
309+ observer .onNext (Observable . create ( o2 ) );
310310 }
311311
312312 // wait until sometime later and emit third
@@ -317,7 +317,7 @@ public void run() {
317317 }
318318 if (!s .isUnsubscribed ()) {
319319 System .out .println ("Emit o3" );
320- observer .onNext (o3 );
320+ observer .onNext (Observable . create ( o3 ) );
321321 }
322322
323323 } catch (Throwable e ) {
@@ -404,7 +404,7 @@ public void testBlockedObservableOfObservables() {
404404 final CountDownLatch callOnce = new CountDownLatch (1 );
405405 final CountDownLatch okToContinue = new CountDownLatch (1 );
406406 TestObservable <Observable <String >> observableOfObservables = new TestObservable <Observable <String >>(callOnce , okToContinue , odds , even );
407- OnSubscribeFunc <String > concatF = concat (observableOfObservables );
407+ OnSubscribeFunc <String > concatF = concat (Observable . create ( observableOfObservables ) );
408408 Observable <String > concat = Observable .create (concatF );
409409 concat .subscribe (observer );
410410 try {
@@ -443,8 +443,8 @@ public void testConcatConcurrentWithInfinity() {
443443 @ SuppressWarnings ("unchecked" )
444444 Observer <String > aObserver = mock (Observer .class );
445445 @ SuppressWarnings ("unchecked" )
446- TestObservable <Observable <String >> observableOfObservables = new TestObservable <Observable <String >>(w1 , w2 );
447- OnSubscribeFunc <String > concatF = concat (observableOfObservables );
446+ TestObservable <Observable <String >> observableOfObservables = new TestObservable <Observable <String >>(Observable . create ( w1 ), Observable . create ( w2 ) );
447+ OnSubscribeFunc <String > concatF = concat (Observable . create ( observableOfObservables ) );
448448
449449 Observable <String > concat = Observable .create (concatF );
450450
@@ -485,8 +485,8 @@ public void testConcatNonBlockingObservables() {
485485 @ Override
486486 public Subscription onSubscribe (Observer <? super Observable <String >> observer ) {
487487 // simulate what would happen in an observable
488- observer .onNext (w1 );
489- observer .onNext (w2 );
488+ observer .onNext (Observable . create ( w1 ) );
489+ observer .onNext (Observable . create ( w2 ) );
490490 observer .onCompleted ();
491491
492492 return new Subscription () {
@@ -540,7 +540,7 @@ public void testConcatUnsubscribe() {
540540 @ SuppressWarnings ("unchecked" )
541541 final Observer <String > aObserver = mock (Observer .class );
542542 @ SuppressWarnings ("unchecked" )
543- final Observable <String > concat = Observable .create (concat (w1 , w2 ));
543+ final Observable <String > concat = Observable .create (concat (Observable . create ( w1 ), Observable . create ( w2 ) ));
544544 final SafeObservableSubscription s1 = new SafeObservableSubscription ();
545545
546546 try {
@@ -583,8 +583,8 @@ public void testConcatUnsubscribeConcurrent() {
583583 @ SuppressWarnings ("unchecked" )
584584 Observer <String > aObserver = mock (Observer .class );
585585 @ SuppressWarnings ("unchecked" )
586- TestObservable <Observable <String >> observableOfObservables = new TestObservable <Observable <String >>(w1 , w2 );
587- OnSubscribeFunc <String > concatF = concat (observableOfObservables );
586+ TestObservable <Observable <String >> observableOfObservables = new TestObservable <Observable <String >>(Observable . create ( w1 ), Observable . create ( w2 ) );
587+ OnSubscribeFunc <String > concatF = concat (Observable . create ( observableOfObservables ) );
588588
589589 Observable <String > concat = Observable .create (concatF );
590590
@@ -616,7 +616,7 @@ public void testConcatUnsubscribeConcurrent() {
616616 verify (aObserver , never ()).onError (any (Throwable .class ));
617617 }
618618
619- private static class TestObservable <T > extends Observable <T > {
619+ private static class TestObservable <T > implements OnSubscribeFunc <T > {
620620
621621 private final Subscription s = new Subscription () {
622622
@@ -656,7 +656,7 @@ public TestObservable(T seed, int size) {
656656 }
657657
658658 @ Override
659- public Subscription subscribe (final Observer <? super T > observer ) {
659+ public Subscription onSubscribe (final Observer <? super T > observer ) {
660660 t = new Thread (new Runnable () {
661661
662662 @ Override
0 commit comments