Skip to content

Commit 47e6c67

Browse files
authored
1.x: deprecate create(), add alternatives (#5086)
1 parent a37e292 commit 47e6c67

File tree

99 files changed

+595
-529
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+595
-529
lines changed

src/main/java/rx/Completable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2249,7 +2249,7 @@ public final <R> R to(Func1<? super Completable, R> converter) {
22492249
* @return the new Observable created
22502250
*/
22512251
public final <T> Observable<T> toObservable() {
2252-
return Observable.create(new Observable.OnSubscribe<T>() {
2252+
return Observable.unsafeCreate(new Observable.OnSubscribe<T>() {
22532253
@Override
22542254
public void call(Subscriber<? super T> s) {
22552255
unsafeSubscribe(s);

src/main/java/rx/Observable.java

Lines changed: 139 additions & 73 deletions
Large diffs are not rendered by default.

src/main/java/rx/Single.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public interface Transformer<T, R> extends Func1<Single<T>, Single<R>> {
202202
*/
203203
private static <T> Observable<T> asObservable(Single<T> t) {
204204
// is this sufficient, or do I need to keep the outer Single and subscribe to it?
205-
return Observable.create(new SingleToObservable<T>(t.onSubscribe));
205+
return Observable.unsafeCreate(new SingleToObservable<T>(t.onSubscribe));
206206
}
207207

208208
/* *********************************************************************************************************

src/main/java/rx/internal/operators/EmptyObservableHolder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public enum EmptyObservableHolder implements OnSubscribe<Object> {
2828
;
2929

3030
/** The singleton instance. */
31-
static final Observable<Object> EMPTY = Observable.create(INSTANCE);
31+
static final Observable<Object> EMPTY = Observable.unsafeCreate(INSTANCE);
3232

3333

3434
/**

src/main/java/rx/internal/operators/NeverObservableHolder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public enum NeverObservableHolder implements OnSubscribe<Object> {
2828
;
2929

3030
/** The singleton instance. */
31-
static final Observable<Object> NEVER = Observable.create(INSTANCE);
31+
static final Observable<Object> NEVER = Observable.unsafeCreate(INSTANCE);
3232

3333
/**
3434
* Returns a type-corrected singleton instance of the never Observable.

src/main/java/rx/internal/operators/OnSubscribeFromEmitter.java renamed to src/main/java/rx/internal/operators/OnSubscribeCreate.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@
2929
import rx.plugins.RxJavaHooks;
3030
import rx.subscriptions.SerialSubscription;
3131

32-
public final class OnSubscribeFromEmitter<T> implements OnSubscribe<T> {
32+
public final class OnSubscribeCreate<T> implements OnSubscribe<T> {
3333

3434
final Action1<Emitter<T>> Emitter;
3535

3636
final Emitter.BackpressureMode backpressure;
3737

38-
public OnSubscribeFromEmitter(Action1<Emitter<T>> Emitter, Emitter.BackpressureMode backpressure) {
38+
public OnSubscribeCreate(Action1<Emitter<T>> Emitter, Emitter.BackpressureMode backpressure) {
3939
this.Emitter = Emitter;
4040
this.backpressure = backpressure;
4141
}
@@ -268,7 +268,7 @@ public void onError(Throwable e) {
268268

269269
@Override
270270
void onOverflow() {
271-
onError(new MissingBackpressureException("fromEmitter: could not emit value due to lack of requests"));
271+
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
272272
}
273273

274274
}

src/main/java/rx/internal/operators/OnSubscribeFlattenIterable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ public static <T, R> Observable<R> createFrom(Observable<? extends T> source,
7070
Func1<? super T, ? extends Iterable<? extends R>> mapper, int prefetch) {
7171
if (source instanceof ScalarSynchronousObservable) {
7272
T scalar = ((ScalarSynchronousObservable<? extends T>) source).get();
73-
return Observable.create(new OnSubscribeScalarFlattenIterable<T, R>(scalar, mapper));
73+
return Observable.unsafeCreate(new OnSubscribeScalarFlattenIterable<T, R>(scalar, mapper));
7474
}
75-
return Observable.create(new OnSubscribeFlattenIterable<T, R>(source, mapper, prefetch));
75+
return Observable.unsafeCreate(new OnSubscribeFlattenIterable<T, R>(source, mapper, prefetch));
7676
}
7777

7878
static final class FlattenIterableSubscriber<T, R> extends Subscriber<T> {

src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public void onNext(T1 args) {
170170
leftMap().put(id, subjSerial);
171171
}
172172

173-
Observable<T2> window = Observable.create(new WindowObservableFunc<T2>(subj, cancel));
173+
Observable<T2> window = Observable.unsafeCreate(new WindowObservableFunc<T2>(subj, cancel));
174174

175175
Observable<D1> duration = leftDuration.call(args);
176176

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* limitations under the License.
3232
*/
3333

34-
import static rx.Observable.create; // NOPMD
34+
import static rx.Observable.unsafeCreate; // NOPMD
3535

3636
import java.util.concurrent.atomic.*;
3737

@@ -133,11 +133,11 @@ public static <T> Observable<T> retry(Observable<T> source, final long count) {
133133
}
134134

135135
public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
136-
return create(new OnSubscribeRedo<T>(source, notificationHandler, true, false, Schedulers.trampoline()));
136+
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, true, false, Schedulers.trampoline()));
137137
}
138138

139139
public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
140-
return create(new OnSubscribeRedo<T>(source, notificationHandler, true, false, scheduler));
140+
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, true, false, scheduler));
141141
}
142142

143143
public static <T> Observable<T> repeat(Observable<T> source) {
@@ -163,15 +163,15 @@ public static <T> Observable<T> repeat(Observable<T> source, final long count, S
163163
}
164164

165165
public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
166-
return create(new OnSubscribeRedo<T>(source, notificationHandler, false, true, Schedulers.trampoline()));
166+
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, true, Schedulers.trampoline()));
167167
}
168168

169169
public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
170-
return create(new OnSubscribeRedo<T>(source, notificationHandler, false, true, scheduler));
170+
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, true, scheduler));
171171
}
172172

173173
public static <T> Observable<T> redo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
174-
return create(new OnSubscribeRedo<T>(source, notificationHandler, false, false, scheduler));
174+
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, false, scheduler));
175175
}
176176

177177
private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f, boolean stopOnComplete, boolean stopOnError,

src/main/java/rx/internal/operators/OperatorPublish.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public static <T, R> Observable<R> create(final Observable<? extends T> source,
123123

124124
public static <T, R> Observable<R> create(final Observable<? extends T> source,
125125
final Func1<? super Observable<T>, ? extends Observable<R>> selector, final boolean delayError) {
126-
return create(new OnSubscribe<R>() {
126+
return unsafeCreate(new OnSubscribe<R>() {
127127
@Override
128128
public void call(final Subscriber<? super R> child) {
129129
final OnSubscribePublishMulticast<T> op = new OnSubscribePublishMulticast<T>(RxRingBuffer.SIZE, delayError);
@@ -155,7 +155,7 @@ public void setProducer(Producer p) {
155155
child.add(op);
156156
child.add(subscriber);
157157

158-
selector.call(Observable.create(op)).unsafeSubscribe(subscriber);
158+
selector.call(Observable.unsafeCreate(op)).unsafeSubscribe(subscriber);
159159

160160
source.unsafeSubscribe(op.subscriber());
161161
}

0 commit comments

Comments
 (0)