Skip to content

Commit ec4eb4f

Browse files
authored
2.x: add withLatestFrom many, cleanups and other enhancements (#4368)
1 parent 4aab4eb commit ec4eb4f

File tree

201 files changed

+3095
-441
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

201 files changed

+3095
-441
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ public static Completable create(CompletableSource source) {
188188
} catch (NullPointerException ex) { // NOPMD
189189
throw ex;
190190
} catch (Throwable ex) {
191+
Exceptions.throwIfFatal(ex);
191192
RxJavaPlugins.onError(ex);
192193
throw toNpe(ex);
193194
}
@@ -219,6 +220,7 @@ public static Completable unsafeCreate(CompletableSource source) {
219220
} catch (NullPointerException ex) { // NOPMD
220221
throw ex;
221222
} catch (Throwable ex) {
223+
Exceptions.throwIfFatal(ex);
222224
RxJavaPlugins.onError(ex);
223225
throw toNpe(ex);
224226
}
@@ -1384,6 +1386,7 @@ public final void subscribe(CompletableObserver s) {
13841386
} catch (NullPointerException ex) { // NOPMD
13851387
throw ex;
13861388
} catch (Throwable ex) {
1389+
Exceptions.throwIfFatal(ex);
13871390
RxJavaPlugins.onError(ex);
13881391
throw toNpe(ex);
13891392
}
@@ -1606,6 +1609,7 @@ public final <U> U to(Function<? super Completable, U> converter) {
16061609
try {
16071610
return converter.apply(this);
16081611
} catch (Throwable ex) {
1612+
Exceptions.throwIfFatal(ex);
16091613
throw Exceptions.propagate(ex);
16101614
}
16111615
}

src/main/java/io/reactivex/CompletableObserver.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919
* Represents the subscription API callbacks when subscribing to a Completable instance.
2020
*/
2121
public interface CompletableObserver {
22+
/**
23+
* Called once by the Completable to set a Disposable on this instance which
24+
* then can be used to cancel the subscription at any time.
25+
* @param d the Disposable instance to call dispose on for cancellation, not null
26+
*/
27+
void onSubscribe(Disposable d);
28+
2229
/**
2330
* Called once the deferred computation completes normally.
2431
*/
@@ -29,11 +36,4 @@ public interface CompletableObserver {
2936
* @param e the exception, not null.
3037
*/
3138
void onError(Throwable e);
32-
33-
/**
34-
* Called once by the Completable to set a Disposable on this instance which
35-
* then can be used to cancel the subscription at any time.
36-
* @param d the Disposable instance to call dispose on for cancellation, not null
37-
*/
38-
void onSubscribe(Disposable d);
3939
}

src/main/java/io/reactivex/Flowable.java

Lines changed: 110 additions & 119 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Observable.java

Lines changed: 109 additions & 84 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Observer.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,62 @@
1515

1616
import io.reactivex.disposables.Disposable;
1717

18+
/**
19+
* Provides a mechanism for receiving push-based notifications.
20+
* <p>
21+
* After an Observer calls an {@link Observable}'s {@link Observable#subscribe subscribe} method,
22+
* first the Observable calls {@link #onSubscribe(Disposable)} with a {@link Disposable} that allows
23+
* cancelling the sequence at any time, then the
24+
* {@code Observable} may call the Observer's {@link #onNext} method any number of times
25+
* to provide notifications. A well-behaved
26+
* {@code Observable} will call an Observer's {@link #onComplete} method exactly once or the Observer's
27+
* {@link #onError} method exactly once.
28+
*
29+
* @see <a href="http://reactivex.io/documentation/observable.html">ReactiveX documentation: Observable</a>
30+
* @param <T>
31+
* the type of item the Observer expects to observe
32+
*/
1833
public interface Observer<T> {
19-
34+
35+
/**
36+
* Provides the Observer with the means of cancelling (disposing) the
37+
* connection (channel) with the Observable in both
38+
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
39+
* @param d the Disposable instance whose {@link Disposable#dispose()} can
40+
* be called anytime to cancel the connection
41+
* @since 2.0
42+
*/
2043
void onSubscribe(Disposable d);
2144

45+
/**
46+
* Provides the Observer with a new item to observe.
47+
* <p>
48+
* The {@link Observable} may call this method 0 or more times.
49+
* <p>
50+
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
51+
* {@link #onError}.
52+
*
53+
* @param value
54+
* the item emitted by the Observable
55+
*/
2256
void onNext(T value);
2357

58+
/**
59+
* Notifies the Observer that the {@link Observable} has experienced an error condition.
60+
* <p>
61+
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
62+
* {@link #onComplete}.
63+
*
64+
* @param e
65+
* the exception encountered by the Observable
66+
*/
2467
void onError(Throwable e);
2568

69+
/**
70+
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
71+
* <p>
72+
* The {@link Observable} will not call this method if it calls {@link #onError}.
73+
*/
2674
void onComplete();
2775

2876
}

src/main/java/io/reactivex/Scheduler.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515

1616
import java.util.concurrent.TimeUnit;
1717

18-
import io.reactivex.disposables.*;
18+
import io.reactivex.disposables.Disposable;
1919
import io.reactivex.exceptions.Exceptions;
20+
import io.reactivex.internal.disposables.SequentialDisposable;
2021
import io.reactivex.plugins.RxJavaPlugins;
2122

2223
public abstract class Scheduler {
@@ -106,9 +107,9 @@ public Disposable schedule(Runnable run) {
106107
}
107108

108109
public Disposable schedulePeriodically(Runnable run, final long initialDelay, final long period, final TimeUnit unit) {
109-
final SerialDisposable first = new SerialDisposable();
110+
final SequentialDisposable first = new SequentialDisposable();
110111

111-
final SerialDisposable sd = new SerialDisposable(first);
112+
final SequentialDisposable sd = new SequentialDisposable(first);
112113

113114
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
114115

@@ -182,6 +183,7 @@ public void run() {
182183
try {
183184
run.run();
184185
} catch (Throwable ex) {
186+
Exceptions.throwIfFatal(ex);
185187
worker.dispose();
186188
throw Exceptions.propagate(ex);
187189
}

src/main/java/io/reactivex/Single.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2783,6 +2783,7 @@ public final <R> R to(Function<? super Single<T>, R> convert) {
27832783
try {
27842784
return convert.apply(this);
27852785
} catch (Throwable ex) {
2786+
Exceptions.throwIfFatal(ex);
27862787
throw Exceptions.propagate(ex);
27872788
}
27882789
}

src/main/java/io/reactivex/SingleObserver.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,50 @@
1515

1616
import io.reactivex.disposables.Disposable;
1717

18+
/**
19+
* Provides a mechanism for receiving push-based notifications.
20+
* <p>
21+
* After a SingleSubscriber calls a {@link Single}'s {@link Single#subscribe subscribe} method,
22+
* first the Single calls {@link #onSubscribe(Disposable)} with a {@link Disposable} that allows
23+
* cancelling the sequence at any time, then the
24+
* {@code Single} calls only one of the SingleSubscriber's {@link #onSuccess} and {@link #onError} methods to provide
25+
* notifications.
26+
*
27+
* @see <a href="http://reactivex.io/documentation/observable.html">ReactiveX documentation: Observable</a>
28+
* @param <T>
29+
* the type of item the SingleSubscriber expects to observe
30+
* @since 2.0
31+
*/
1832
public interface SingleObserver<T> {
1933

34+
/**
35+
* Provides the SingleObserver with the means of cancelling (disposing) the
36+
* connection (channel) with the Single in both
37+
* synchronous (from within {@link #onSubscribe(Disposable)} itself) and asynchronous manner.
38+
* @param d the Disposable instance whose {@link Disposable#dispose()} can
39+
* be called anytime to cancel the connection
40+
* @since 2.0
41+
*/
2042
void onSubscribe(Disposable d);
2143

44+
/**
45+
* Notifies the SingleSubscriber with a single item and that the {@link Single} has finished sending
46+
* push-based notifications.
47+
* <p>
48+
* The {@link Single} will not call this method if it calls {@link #onError}.
49+
*
50+
* @param value
51+
* the item emitted by the Single
52+
*/
2253
void onSuccess(T value);
2354

55+
/**
56+
* Notifies the SingleSubscriber that the {@link Single} has experienced an error condition.
57+
* <p>
58+
* If the {@link Single} calls this method, it will not thereafter call {@link #onSuccess}.
59+
*
60+
* @param e
61+
* the exception encountered by the Single
62+
*/
2463
void onError(Throwable e);
2564
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/**
18+
* Annotations for indicating experimental and beta operators, classes, methods, types or fields.
19+
*/
20+
package io.reactivex.annotations;

src/main/java/io/reactivex/disposables/ActionDisposable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ protected void onDisposed(Action value) {
2828
try {
2929
value.run();
3030
} catch (Throwable ex) {
31+
Exceptions.throwIfFatal(ex);
3132
throw Exceptions.propagate(ex);
3233
}
3334
}

0 commit comments

Comments
 (0)