Skip to content

Commit 344453f

Browse files
JakeWhartonakarnokd
authored andcommitted
Skip static factories when converting between stream types. (#4324)
Also rename Publisher->Completable factory method and operator implementation to match other stream types.
1 parent 7518541 commit 344453f

File tree

5 files changed

+46
-42
lines changed

5 files changed

+46
-42
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -250,18 +250,18 @@ public static Completable fromCallable(final Callable<?> callable) {
250250
}
251251

252252
/**
253-
* Returns a Completable instance that subscribes to the given flowable, ignores all values and
253+
* Returns a Completable instance that subscribes to the given publisher, ignores all values and
254254
* emits only the terminal event.
255255
*
256-
* @param <T> the type of the flowable
257-
* @param flowable the Flowable instance to subscribe to, not null
256+
* @param <T> the type of the publisher
257+
* @param publisher the Publisher instance to subscribe to, not null
258258
* @return the new Completable instance
259-
* @throws NullPointerException if flowable is null
259+
* @throws NullPointerException if publisher is null
260260
*/
261261
@SchedulerSupport(SchedulerSupport.NONE)
262-
public static <T> Completable fromFlowable(final Publisher<T> flowable) {
263-
Objects.requireNonNull(flowable, "flowable is null");
264-
return new CompletableFromFlowable<T>(flowable);
262+
public static <T> Completable fromPublisher(final Publisher<T> publisher) {
263+
Objects.requireNonNull(publisher, "publisher is null");
264+
return new CompletableFromPublisher<T>(publisher);
265265
}
266266

267267
@SchedulerSupport(SchedulerSupport.NONE)
@@ -974,7 +974,7 @@ public final Completable onErrorResumeNext(final Function<? super Throwable, ? e
974974
*/
975975
@SchedulerSupport(SchedulerSupport.NONE)
976976
public final Completable repeat() {
977-
return fromFlowable(toFlowable().repeat());
977+
return fromPublisher(toFlowable().repeat());
978978
}
979979

980980
/**
@@ -985,7 +985,7 @@ public final Completable repeat() {
985985
*/
986986
@SchedulerSupport(SchedulerSupport.NONE)
987987
public final Completable repeat(long times) {
988-
return fromFlowable(toFlowable().repeat(times));
988+
return fromPublisher(toFlowable().repeat(times));
989989
}
990990

991991
/**
@@ -997,7 +997,7 @@ public final Completable repeat(long times) {
997997
*/
998998
@SchedulerSupport(SchedulerSupport.NONE)
999999
public final Completable repeatUntil(BooleanSupplier stop) {
1000-
return fromFlowable(toFlowable().repeatUntil(stop));
1000+
return fromPublisher(toFlowable().repeatUntil(stop));
10011001
}
10021002

10031003
/**
@@ -1015,7 +1015,7 @@ public final Completable repeatUntil(BooleanSupplier stop) {
10151015
* FIXME add unit test once the type has been fixed
10161016
*/
10171017
public final Completable repeatWhen(Function<? super Flowable<Object>, ? extends Publisher<Object>> handler) {
1018-
return fromFlowable(toFlowable().repeatWhen(handler));
1018+
return fromPublisher(toFlowable().repeatWhen(handler));
10191019
}
10201020

10211021
/**
@@ -1024,7 +1024,7 @@ public final Completable repeatWhen(Function<? super Flowable<Object>, ? extends
10241024
*/
10251025
@SchedulerSupport(SchedulerSupport.NONE)
10261026
public final Completable retry() {
1027-
return fromFlowable(toFlowable().retry());
1027+
return fromPublisher(toFlowable().retry());
10281028
}
10291029

10301030
/**
@@ -1036,7 +1036,7 @@ public final Completable retry() {
10361036
*/
10371037
@SchedulerSupport(SchedulerSupport.NONE)
10381038
public final Completable retry(BiPredicate<? super Integer, ? super Throwable> predicate) {
1039-
return fromFlowable(toFlowable().retry(predicate));
1039+
return fromPublisher(toFlowable().retry(predicate));
10401040
}
10411041

10421042
/**
@@ -1048,7 +1048,7 @@ public final Completable retry(BiPredicate<? super Integer, ? super Throwable> p
10481048
*/
10491049
@SchedulerSupport(SchedulerSupport.NONE)
10501050
public final Completable retry(long times) {
1051-
return fromFlowable(toFlowable().retry(times));
1051+
return fromPublisher(toFlowable().retry(times));
10521052
}
10531053

10541054
/**
@@ -1061,7 +1061,7 @@ public final Completable retry(long times) {
10611061
*/
10621062
@SchedulerSupport(SchedulerSupport.NONE)
10631063
public final Completable retry(Predicate<? super Throwable> predicate) {
1064-
return fromFlowable(toFlowable().retry(predicate));
1064+
return fromPublisher(toFlowable().retry(predicate));
10651065
}
10661066

10671067
/**
@@ -1075,7 +1075,7 @@ public final Completable retry(Predicate<? super Throwable> predicate) {
10751075
*/
10761076
@SchedulerSupport(SchedulerSupport.NONE)
10771077
public final Completable retryWhen(Function<? super Flowable<? extends Throwable>, ? extends Publisher<Object>> handler) {
1078-
return fromFlowable(toFlowable().retryWhen(handler));
1078+
return fromPublisher(toFlowable().retryWhen(handler));
10791079
}
10801080

10811081
/**

src/main/java/io/reactivex/Flowable.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@
2727
import io.reactivex.internal.functions.Functions;
2828
import io.reactivex.internal.functions.Objects;
2929
import io.reactivex.internal.fuseable.*;
30+
import io.reactivex.internal.operators.completable.CompletableFromPublisher;
3031
import io.reactivex.internal.operators.flowable.*;
3132
import io.reactivex.internal.operators.flowable.FlowableConcatMap.ErrorMode;
33+
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
34+
import io.reactivex.internal.operators.single.SingleFromPublisher;
3235
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3336
import io.reactivex.internal.subscribers.flowable.*;
3437
import io.reactivex.internal.util.ArrayListSupplier;
@@ -3496,7 +3499,7 @@ public final BlockingFlowable<T> toBlocking() {
34963499
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
34973500
@SchedulerSupport(SchedulerSupport.NONE)
34983501
public final Completable toCompletable() {
3499-
return Completable.fromFlowable(this);
3502+
return new CompletableFromPublisher<T>(this);
35003503
}
35013504

35023505
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@@ -3631,13 +3634,13 @@ public final <K, V> Flowable<Map<K, Collection<V>>> toMultimap(
36313634
@BackpressureSupport(BackpressureKind.NONE)
36323635
@SchedulerSupport(SchedulerSupport.NONE)
36333636
public final Observable<T> toObservable() {
3634-
return Observable.fromPublisher(this);
3637+
return new ObservableFromPublisher<T>(this);
36353638
}
36363639

36373640
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
36383641
@SchedulerSupport(SchedulerSupport.NONE)
36393642
public final Single<T> toSingle() {
3640-
return Single.fromPublisher(this);
3643+
return new SingleFromPublisher<T>(this);
36413644
}
36423645

36433646
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)

src/main/java/io/reactivex/Observable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.reactivex.functions.*;
2626
import io.reactivex.internal.functions.Functions;
2727
import io.reactivex.internal.functions.Objects;
28+
import io.reactivex.internal.operators.completable.CompletableFromObservable;
2829
import io.reactivex.internal.operators.flowable.FlowableFromObservable;
2930
import io.reactivex.internal.operators.single.SingleFromObservable;
3031
import io.reactivex.internal.operators.observable.*;
@@ -2867,7 +2868,7 @@ public final BlockingObservable<T> toBlocking() {
28672868

28682869
@SchedulerSupport(SchedulerSupport.NONE)
28692870
public final Completable toCompletable() {
2870-
return Completable.fromObservable(this);
2871+
return new CompletableFromObservable<T>(this);
28712872
}
28722873

28732874
@SchedulerSupport(SchedulerSupport.NONE)

src/main/java/io/reactivex/internal/operators/completable/CompletableFromFlowable.java renamed to src/main/java/io/reactivex/internal/operators/completable/CompletableFromPublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
import io.reactivex.*;
1919
import io.reactivex.disposables.Disposables;
2020

21-
public final class CompletableFromFlowable<T> extends Completable {
21+
public final class CompletableFromPublisher<T> extends Completable {
2222

2323
final Publisher<T> flowable;
2424

25-
public CompletableFromFlowable(Publisher<T> flowable) {
25+
public CompletableFromPublisher(Publisher<T> flowable) {
2626
this.flowable = flowable;
2727
}
2828

src/test/java/io/reactivex/completable/CompletableTest.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -547,28 +547,28 @@ public void fromCallableThrows() {
547547

548548
@Test(expected = NullPointerException.class)
549549
public void fromFlowableNull() {
550-
Completable.fromFlowable(null);
550+
Completable.fromPublisher(null);
551551
}
552552

553553
@Test(timeout = 1000)
554554
public void fromFlowableEmpty() {
555-
Completable c = Completable.fromFlowable(Flowable.empty());
555+
Completable c = Completable.fromPublisher(Flowable.empty());
556556

557557
c.await();
558558
}
559559

560560
@Test(timeout = 5000)
561561
public void fromFlowableSome() {
562562
for (int n = 1; n < 10000; n *= 10) {
563-
Completable c = Completable.fromFlowable(Flowable.range(1, n));
563+
Completable c = Completable.fromPublisher(Flowable.range(1, n));
564564

565565
c.await();
566566
}
567567
}
568568

569569
@Test(timeout = 1000, expected = TestException.class)
570570
public void fromFlowableError() {
571-
Completable c = Completable.fromFlowable(Flowable.error(new Callable<Throwable>() {
571+
Completable c = Completable.fromPublisher(Flowable.error(new Callable<Throwable>() {
572572
@Override
573573
public Throwable call() {
574574
return new TestException();
@@ -3007,9 +3007,9 @@ public void ambArrayOneFires() {
30073007
PublishProcessor<Object> ps1 = PublishProcessor.create();
30083008
PublishProcessor<Object> ps2 = PublishProcessor.create();
30093009

3010-
Completable c1 = Completable.fromFlowable(ps1);
3010+
Completable c1 = Completable.fromPublisher(ps1);
30113011

3012-
Completable c2 = Completable.fromFlowable(ps2);
3012+
Completable c2 = Completable.fromPublisher(ps2);
30133013

30143014
Completable c = Completable.amb(c1, c2);
30153015

@@ -3038,9 +3038,9 @@ public void ambArrayOneFiresError() {
30383038
PublishProcessor<Object> ps1 = PublishProcessor.create();
30393039
PublishProcessor<Object> ps2 = PublishProcessor.create();
30403040

3041-
Completable c1 = Completable.fromFlowable(ps1);
3041+
Completable c1 = Completable.fromPublisher(ps1);
30423042

3043-
Completable c2 = Completable.fromFlowable(ps2);
3043+
Completable c2 = Completable.fromPublisher(ps2);
30443044

30453045
Completable c = Completable.amb(c1, c2);
30463046

@@ -3069,9 +3069,9 @@ public void ambArraySecondFires() {
30693069
PublishProcessor<Object> ps1 = PublishProcessor.create();
30703070
PublishProcessor<Object> ps2 = PublishProcessor.create();
30713071

3072-
Completable c1 = Completable.fromFlowable(ps1);
3072+
Completable c1 = Completable.fromPublisher(ps1);
30733073

3074-
Completable c2 = Completable.fromFlowable(ps2);
3074+
Completable c2 = Completable.fromPublisher(ps2);
30753075

30763076
Completable c = Completable.amb(c1, c2);
30773077

@@ -3100,9 +3100,9 @@ public void ambArraySecondFiresError() {
31003100
PublishProcessor<Object> ps1 = PublishProcessor.create();
31013101
PublishProcessor<Object> ps2 = PublishProcessor.create();
31023102

3103-
Completable c1 = Completable.fromFlowable(ps1);
3103+
Completable c1 = Completable.fromPublisher(ps1);
31043104

3105-
Completable c2 = Completable.fromFlowable(ps2);
3105+
Completable c2 = Completable.fromPublisher(ps2);
31063106

31073107
Completable c = Completable.amb(c1, c2);
31083108

@@ -3232,9 +3232,9 @@ public void ambWithArrayOneFires() {
32323232
PublishProcessor<Object> ps1 = PublishProcessor.create();
32333233
PublishProcessor<Object> ps2 = PublishProcessor.create();
32343234

3235-
Completable c1 = Completable.fromFlowable(ps1);
3235+
Completable c1 = Completable.fromPublisher(ps1);
32363236

3237-
Completable c2 = Completable.fromFlowable(ps2);
3237+
Completable c2 = Completable.fromPublisher(ps2);
32383238

32393239
Completable c = c1.ambWith(c2);
32403240

@@ -3263,9 +3263,9 @@ public void ambWithArrayOneFiresError() {
32633263
PublishProcessor<Object> ps1 = PublishProcessor.create();
32643264
PublishProcessor<Object> ps2 = PublishProcessor.create();
32653265

3266-
Completable c1 = Completable.fromFlowable(ps1);
3266+
Completable c1 = Completable.fromPublisher(ps1);
32673267

3268-
Completable c2 = Completable.fromFlowable(ps2);
3268+
Completable c2 = Completable.fromPublisher(ps2);
32693269

32703270
Completable c = c1.ambWith(c2);
32713271

@@ -3294,9 +3294,9 @@ public void ambWithArraySecondFires() {
32943294
PublishProcessor<Object> ps1 = PublishProcessor.create();
32953295
PublishProcessor<Object> ps2 = PublishProcessor.create();
32963296

3297-
Completable c1 = Completable.fromFlowable(ps1);
3297+
Completable c1 = Completable.fromPublisher(ps1);
32983298

3299-
Completable c2 = Completable.fromFlowable(ps2);
3299+
Completable c2 = Completable.fromPublisher(ps2);
33003300

33013301
Completable c = c1.ambWith(c2);
33023302

@@ -3325,9 +3325,9 @@ public void ambWithArraySecondFiresError() {
33253325
PublishProcessor<Object> ps1 = PublishProcessor.create();
33263326
PublishProcessor<Object> ps2 = PublishProcessor.create();
33273327

3328-
Completable c1 = Completable.fromFlowable(ps1);
3328+
Completable c1 = Completable.fromPublisher(ps1);
33293329

3330-
Completable c2 = Completable.fromFlowable(ps2);
3330+
Completable c2 = Completable.fromPublisher(ps2);
33313331

33323332
Completable c = c1.ambWith(c2);
33333333

0 commit comments

Comments
 (0)