Skip to content

Commit 35c8da6

Browse files
authored
2.x: add assembly tracking, minor fixes and cleanup (#4417)
* 2.x: add assembly tracking, minor fixes and cleanup * Add missing header, add more time to test
1 parent 9fb4040 commit 35c8da6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1206
-1175
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 48 additions & 48 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Flowable.java

Lines changed: 185 additions & 183 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Observable.java

Lines changed: 222 additions & 258 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Single.java

Lines changed: 45 additions & 45 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/flowables/ConnectableFlowable.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.functions.Consumer;
2121
import io.reactivex.internal.functions.Functions;
2222
import io.reactivex.internal.operators.flowable.*;
23+
import io.reactivex.plugins.RxJavaPlugins;
2324

2425
/**
2526
* A {@code ConnectableObservable} resembles an ordinary {@link Flowable}, except that it does not begin
@@ -75,7 +76,7 @@ public void accept(Disposable d) {
7576
* @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX documentation: RefCount</a>
7677
*/
7778
public Flowable<T> refCount() {
78-
return new FlowableRefCount<T>(this);
79+
return RxJavaPlugins.onAssembly(new FlowableRefCount<T>(this));
7980
}
8081

8182
/**
@@ -119,8 +120,8 @@ public Flowable<T> autoConnect(int numberOfSubscribers) {
119120
public Flowable<T> autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection) {
120121
if (numberOfSubscribers <= 0) {
121122
this.connect(connection);
122-
return this;
123+
return RxJavaPlugins.onAssembly(this);
123124
}
124-
return new FlowableAutoConnect<T>(this, numberOfSubscribers, connection);
125+
return RxJavaPlugins.onAssembly(new FlowableAutoConnect<T>(this, numberOfSubscribers, connection));
125126
}
126127
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.reactivex.exceptions.Exceptions;
2323
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2424
import io.reactivex.internal.util.*;
25+
import io.reactivex.plugins.RxJavaPlugins;
2526

2627
/**
2728
* An observable which auto-connects to another observable, caches the elements
@@ -35,28 +36,28 @@ public final class FlowableCache<T> extends AbstractFlowableWithUpstream<T, T> {
3536

3637
final AtomicBoolean once;
3738
/**
38-
* Creates a cached Observable with a default capacity hint of 16.
39+
* Creates a cached Flowable with a default capacity hint of 16.
3940
* @param <T> the value type
4041
* @param source the source Observable to cache
4142
* @return the CachedObservable instance
4243
*/
43-
public static <T> FlowableCache<T> from(Flowable<T> source) {
44+
public static <T> Flowable<T> from(Flowable<T> source) {
4445
return from(source, 16);
4546
}
4647

4748
/**
48-
* Creates a cached Observable with the given capacity hint.
49+
* Creates a cached Flowable with the given capacity hint.
4950
* @param <T> the value type
5051
* @param source the source Observable to cache
5152
* @param capacityHint the hint for the internal buffer size
5253
* @return the CachedObservable instance
5354
*/
54-
public static <T> FlowableCache<T> from(Flowable<T> source, int capacityHint) {
55+
public static <T> Flowable<T> from(Flowable<T> source, int capacityHint) {
5556
if (capacityHint < 1) {
5657
throw new IllegalArgumentException("capacityHint > 0 required");
5758
}
5859
CacheState<T> state = new CacheState<T>(source, capacityHint);
59-
return new FlowableCache<T>(source, state);
60+
return RxJavaPlugins.onAssembly(new FlowableCache<T>(source, state));
6061
}
6162

6263
/**

src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinct.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
import org.reactivestreams.*;
2020

21+
import io.reactivex.Flowable;
2122
import io.reactivex.exceptions.*;
2223
import io.reactivex.functions.*;
2324
import io.reactivex.internal.functions.*;
2425
import io.reactivex.internal.subscriptions.*;
26+
import io.reactivex.plugins.RxJavaPlugins;
2527

2628
public final class FlowableDistinct<T, K> extends AbstractFlowableWithUpstream<T, T> {
2729
final Function<? super T, K> keySelector;
@@ -33,7 +35,7 @@ public FlowableDistinct(Publisher<T> source, Function<? super T, K> keySelector,
3335
this.keySelector = keySelector;
3436
}
3537

36-
public static <T, K> FlowableDistinct<T, K> withCollection(Publisher<T> source, Function<? super T, K> keySelector, final Callable<? extends Collection<? super K>> collectionSupplier) {
38+
public static <T, K> Flowable<T> withCollection(Publisher<T> source, Function<? super T, K> keySelector, final Callable<? extends Collection<? super K>> collectionSupplier) {
3739
Callable<? extends Predicate<? super K>> p = new Callable<Predicate<K>>() {
3840
@Override
3941
public Predicate<K> call() throws Exception {
@@ -52,10 +54,10 @@ public boolean test(K t) {
5254
}
5355
};
5456

55-
return new FlowableDistinct<T, K>(source, keySelector, p);
57+
return RxJavaPlugins.onAssembly(new FlowableDistinct<T, K>(source, keySelector, p));
5658
}
5759

58-
public static <T> FlowableDistinct<T, T> untilChanged(Publisher<T> source) {
60+
public static <T> Flowable<T> untilChanged(Publisher<T> source) {
5961
Callable<? extends Predicate<? super T>> p = new Callable<Predicate<T>>() {
6062
Object last;
6163
@Override
@@ -75,10 +77,10 @@ public boolean test(T t) {
7577
};
7678
}
7779
};
78-
return new FlowableDistinct<T, T>(source, Functions.<T>identity(), p);
80+
return RxJavaPlugins.onAssembly(new FlowableDistinct<T, T>(source, Functions.<T>identity(), p));
7981
}
8082

81-
public static <T, K> FlowableDistinct<T, K> untilChanged(Publisher<T> source, Function<? super T, K> keySelector) {
83+
public static <T, K> Flowable<T> untilChanged(Publisher<T> source, Function<? super T, K> keySelector) {
8284
Callable<? extends Predicate<? super K>> p = new Callable<Predicate<K>>() {
8385
Object last;
8486
@Override
@@ -98,7 +100,7 @@ public boolean test(K t) {
98100
};
99101
}
100102
};
101-
return new FlowableDistinct<T, K>(source, keySelector, p);
103+
return RxJavaPlugins.onAssembly(new FlowableDistinct<T, K>(source, keySelector, p));
102104
}
103105

104106
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void subscribe(Subscriber<? super T> child) {
117117
}
118118
}
119119
};
120-
return new FlowablePublish<T>(onSubscribe, source, curr, bufferSize);
120+
return RxJavaPlugins.onAssembly(new FlowablePublish<T>(onSubscribe, source, curr, bufferSize));
121121
}
122122

123123
private FlowablePublish(Publisher<T> onSubscribe, Publisher<T> source,

src/main/java/io/reactivex/internal/operators/flowable/FlowableRepeatWhen.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void subscribeActual(Subscriber<? super T> s) {
3939

4040
SerializedSubscriber<T> z = new SerializedSubscriber<T>(s);
4141

42-
FlowProcessor<Object> processor = new UnicastProcessor<Object>(8).toSerialized();
42+
FlowableProcessor<Object> processor = new UnicastProcessor<Object>(8).toSerialized();
4343

4444
Publisher<?> when;
4545

@@ -137,13 +137,13 @@ static abstract class WhenSourceSubscriber<T, U> extends SubscriptionArbiter imp
137137

138138
protected final Subscriber<? super T> actual;
139139

140-
protected final FlowProcessor<U> processor;
140+
protected final FlowableProcessor<U> processor;
141141

142142
protected final Subscription receiver;
143143

144144
private long produced;
145145

146-
public WhenSourceSubscriber(Subscriber<? super T> actual, FlowProcessor<U> processor,
146+
public WhenSourceSubscriber(Subscriber<? super T> actual, FlowableProcessor<U> processor,
147147
Subscription receiver) {
148148
this.actual = actual;
149149
this.processor = processor;
@@ -183,7 +183,7 @@ static final class RepeatWhenSubscriber<T> extends WhenSourceSubscriber<T, Objec
183183
/** */
184184
private static final long serialVersionUID = -2680129890138081029L;
185185

186-
public RepeatWhenSubscriber(Subscriber<? super T> actual, FlowProcessor<Object> processor,
186+
public RepeatWhenSubscriber(Subscriber<? super T> actual, FlowableProcessor<Object> processor,
187187
Subscription receiver) {
188188
super(actual, processor, receiver);
189189
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void accept(Disposable r) {
113113
*/
114114
public static <T> ConnectableFlowable<T> observeOn(final ConnectableFlowable<T> co, final Scheduler scheduler) {
115115
final Flowable<T> observable = co.observeOn(scheduler);
116-
return new ConnectableFlowable<T>() {
116+
return RxJavaPlugins.onAssembly(new ConnectableFlowable<T>() {
117117
@Override
118118
public void connect(Consumer<? super Disposable> connection) {
119119
co.connect(connection);
@@ -123,7 +123,7 @@ public void connect(Consumer<? super Disposable> connection) {
123123
protected void subscribeActual(Subscriber<? super T> s) {
124124
observable.subscribe(s);
125125
}
126-
};
126+
});
127127
}
128128

129129
/**
@@ -248,7 +248,7 @@ public void subscribe(Subscriber<? super T> child) {
248248
}
249249
}
250250
};
251-
return new FlowableReplay<T>(onSubscribe, source, curr, bufferFactory);
251+
return RxJavaPlugins.onAssembly(new FlowableReplay<T>(onSubscribe, source, curr, bufferFactory));
252252
}
253253

254254
private FlowableReplay(Publisher<T> onSubscribe, Flowable<T> source,

0 commit comments

Comments
 (0)