Skip to content

Commit 8525f6f

Browse files
UnsafeSubscribe
Migrate from reflection to unsafeSubscribe as per discussion at #676
1 parent 14dda8b commit 8525f6f

File tree

77 files changed

+510
-560
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+510
-560
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 209 additions & 340 deletions
Large diffs are not rendered by default.

rxjava-core/src/main/java/rx/joins/JoinObserver1.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void addActivePlan(ActivePlan0 activePlan) {
6161
public void subscribe(Object gate) {
6262
if (subscribed.compareAndSet(false, true)) {
6363
this.gate = gate;
64-
source.materialize().subscribe(this);
64+
source.materialize().unsafeSubscribe(this);
6565
} else {
6666
throw new IllegalStateException("Can only be subscribed to once.");
6767
}

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
7272
* "Guideline 6.4: Protect calls to user code from within an operator"
7373
*/
7474
private Subscription protectivelyWrapAndSubscribe(Subscriber<? super T> observer) {
75-
return o.subscribe(new SafeSubscriber<T>(observer));
75+
return o.unsafeSubscribe(new SafeSubscriber<T>(observer));
7676
}
7777

7878
/**

rxjava-core/src/main/java/rx/observables/GroupedObservable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public static <K, T> GroupedObservable<K, T> from(K key, final Observable<T> o)
3737

3838
@Override
3939
public void call(Subscriber<? super T> s) {
40-
o.subscribe(s);
40+
o.unsafeSubscribe(s);
4141
}
4242
});
4343
}

rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void onNext(T t) {
9191
* It will then immediately swap itself out for the actual (after a single notification), but since this is now
9292
* being done on the same producer thread no further buffering will occur.
9393
*/
94-
private static class PassThruObserver<T> implements Observer<T> {
94+
private static class PassThruObserver<T> extends Subscriber<T> {
9595

9696
private final Observer<? super T> actual;
9797
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
@@ -133,7 +133,7 @@ private void drainIfNeededAndSwitchToActual() {
133133

134134
}
135135

136-
private static class BufferedObserver<T> implements Observer<T> {
136+
private static class BufferedObserver<T> extends Subscriber<T> {
137137
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
138138

139139
@Override

rxjava-core/src/main/java/rx/operators/ChunkedOperation.java

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
import rx.Observer;
2929
import rx.Scheduler;
3030
import rx.Scheduler.Inner;
31+
import rx.Subscriber;
3132
import rx.Subscription;
3233
import rx.functions.Action1;
3334
import rx.functions.Func0;
3435
import rx.functions.Func1;
36+
import rx.subscriptions.CompositeSubscription;
3537

3638
/**
3739
* The base class for operations that break observables into "chunks". Currently buffers and windows.
@@ -408,7 +410,7 @@ public void pushValue(T value) {
408410
* The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record.
409411
* <C> The type of object being tracked by the {@link Chunk}
410412
*/
411-
protected static class ChunkObserver<T, C> implements Observer<T> {
413+
protected static class ChunkObserver<T, C> extends Subscriber<T> {
412414

413415
private final Chunks<T, C> chunks;
414416
private final Observer<? super C> observer;
@@ -492,12 +494,24 @@ public ObservableBasedSingleChunkCreator(NonOverlappingChunks<T, C> chunks, Func
492494

493495
private void listenForChunkEnd() {
494496
Observable<? extends TClosing> closingObservable = chunkClosingSelector.call();
495-
closingObservable.subscribe(new Action1<TClosing>() {
497+
closingObservable.unsafeSubscribe(new Subscriber<TClosing>() {
498+
499+
@Override
500+
public void onCompleted() {
501+
502+
}
503+
504+
@Override
505+
public void onError(Throwable e) {
506+
507+
}
508+
496509
@Override
497-
public void call(TClosing closing) {
510+
public void onNext(TClosing t) {
498511
chunks.emitAndReplaceChunk();
499-
listenForChunkEnd();
512+
listenForChunkEnd();
500513
}
514+
501515
});
502516
}
503517

@@ -524,23 +538,47 @@ public void stop() {
524538
*/
525539
protected static class ObservableBasedMultiChunkCreator<T, C, TOpening, TClosing> implements ChunkCreator {
526540

527-
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
541+
private final CompositeSubscription subscription = new CompositeSubscription();
528542

529543
public ObservableBasedMultiChunkCreator(final OverlappingChunks<T, C> chunks, Observable<? extends TOpening> openings, final Func1<? super TOpening, ? extends Observable<? extends TClosing>> chunkClosingSelector) {
530-
subscription.wrap(openings.subscribe(new Action1<TOpening>() {
544+
openings.unsafeSubscribe(new Subscriber<TOpening>(subscription) {
545+
546+
@Override
547+
public void onCompleted() {
548+
549+
}
550+
531551
@Override
532-
public void call(TOpening opening) {
552+
public void onError(Throwable e) {
553+
554+
}
555+
556+
@Override
557+
public void onNext(TOpening opening) {
533558
final Chunk<T, C> chunk = chunks.createChunk();
534559
Observable<? extends TClosing> closingObservable = chunkClosingSelector.call(opening);
535560

536-
closingObservable.subscribe(new Action1<TClosing>() {
561+
closingObservable.unsafeSubscribe(new Subscriber<TClosing>() {
562+
537563
@Override
538-
public void call(TClosing closing) {
539-
chunks.emitChunk(chunk);
564+
public void onCompleted() {
565+
540566
}
541-
});
567+
568+
@Override
569+
public void onError(Throwable e) {
570+
571+
}
572+
573+
@Override
574+
public void onNext(TClosing t) {
575+
chunks.emitChunk(chunk);
576+
}
577+
578+
});
542579
}
543-
}));
580+
581+
});
544582
}
545583

546584
@Override

rxjava-core/src/main/java/rx/operators/NotificationLite.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package rx.operators;
22

3-
import java.io.ObjectStreamException;
43
import java.io.Serializable;
54

65
import rx.Notification;

rxjava-core/src/main/java/rx/operators/OperationAll.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import rx.Observable;
2121
import rx.Observable.OnSubscribeFunc;
2222
import rx.Observer;
23+
import rx.Subscriber;
2324
import rx.Subscription;
2425
import rx.functions.Func1;
2526

@@ -39,20 +40,18 @@ private static class AllObservable<T> implements OnSubscribeFunc<Boolean> {
3940
private final Observable<? extends T> sequence;
4041
private final Func1<? super T, Boolean> predicate;
4142

42-
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
43-
4443
private AllObservable(Observable<? extends T> sequence, Func1<? super T, Boolean> predicate) {
4544
this.sequence = sequence;
4645
this.predicate = predicate;
4746
}
4847

4948
@Override
5049
public Subscription onSubscribe(final Observer<? super Boolean> observer) {
51-
return subscription.wrap(sequence.subscribe(new AllObserver(observer)));
50+
return sequence.unsafeSubscribe(new AllObserver(observer));
5251

5352
}
5453

55-
private class AllObserver implements Observer<T> {
54+
private class AllObserver extends Subscriber<T> {
5655
private final Observer<? super Boolean> underlying;
5756

5857
private final AtomicBoolean status = new AtomicBoolean(true);
@@ -82,7 +81,7 @@ public void onNext(T args) {
8281
if (changed && !result) {
8382
underlying.onNext(false);
8483
underlying.onCompleted();
85-
subscription.unsubscribe();
84+
unsubscribe();
8685
}
8786
}
8887
}

rxjava-core/src/main/java/rx/operators/OperationAny.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@
1515
*/
1616
package rx.operators;
1717

18-
import static rx.functions.Functions.*;
18+
import static rx.functions.Functions.alwaysTrue;
1919

2020
import java.util.concurrent.atomic.AtomicBoolean;
2121

2222
import rx.Observable;
2323
import rx.Observable.OnSubscribeFunc;
2424
import rx.Observer;
25+
import rx.Subscriber;
2526
import rx.Subscription;
2627
import rx.functions.Func1;
2728

@@ -81,7 +82,7 @@ private Any(Observable<? extends T> source, Func1<? super T, Boolean> predicate,
8182
@Override
8283
public Subscription onSubscribe(final Observer<? super Boolean> observer) {
8384
final SafeObservableSubscription subscription = new SafeObservableSubscription();
84-
return subscription.wrap(source.subscribe(new Observer<T>() {
85+
return subscription.wrap(source.unsafeSubscribe(new Subscriber<T>() {
8586

8687
private final AtomicBoolean hasEmitted = new AtomicBoolean(false);
8788

rxjava-core/src/main/java/rx/operators/OperationAsObservable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import rx.Observable.OnSubscribeFunc;
2020
import rx.Observer;
2121
import rx.Subscription;
22+
import rx.observers.Subscribers;
2223

2324
/**
2425
* Hides the identity of another observable.
@@ -34,7 +35,7 @@ public OperationAsObservable(Observable<? extends T> source) {
3435
}
3536

3637
@Override
37-
public Subscription onSubscribe(Observer<? super T> t1) {
38-
return source.subscribe(t1);
38+
public Subscription onSubscribe(final Observer<? super T> t1) {
39+
return source.unsafeSubscribe(Subscribers.from(t1));
3940
}
4041
}

0 commit comments

Comments
 (0)