Skip to content

Commit 4328619

Browse files
davidmotenakarnokd
authored andcommitted
rename AsyncEmitter to Emitter (#4580)
1 parent 646be2d commit 4328619

File tree

10 files changed

+1641
-89
lines changed

10 files changed

+1641
-89
lines changed

src/main/java/rx/AsyncEmitter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
*
3030
* @param <T> the value type to emit
3131
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
32+
* @deprecated as of 1.2.1 because Async prefix of this class potentially misleading. Use {@link Emitter} instead.
3233
*/
3334
@Experimental
35+
@Deprecated
3436
public interface AsyncEmitter<T> extends Observer<T> {
3537

3638
/**

src/main/java/rx/CompletableEmitter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
* All methods are thread-safe; calling onCompleted or onError twice or one after the other has
2626
* no effect.
2727
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
28+
* @deprecated as of 1.2.1 because `setCancellation` method will change signature (not name) for 1.2.2.
2829
*/
2930
@Experimental
31+
@Deprecated
3032
public interface CompletableEmitter {
3133

3234
/**

src/main/java/rx/Emitter.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx;
18+
19+
import rx.annotations.Experimental;
20+
import rx.functions.Cancellable;
21+
22+
/**
23+
* Abstraction over a RxJava Subscriber that allows associating
24+
* a resource with it and exposes the current number of downstream
25+
* requested amount.
26+
* <p>
27+
* The onNext, onError and onCompleted methods should be called
28+
* in a sequential manner, just like the Observer's methods. The
29+
* other methods are thread-safe.
30+
*
31+
* @param <T> the value type to emit
32+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
33+
*/
34+
@Experimental
35+
public interface Emitter<T> extends Observer<T> {
36+
37+
/**
38+
* Sets a Subscription on this emitter; any previous Subscription
39+
* or Cancellation will be unsubscribed/cancelled.
40+
* @param s the subscription, null is allowed
41+
*/
42+
void setSubscription(Subscription s);
43+
44+
/**
45+
* Sets a Cancellable on this emitter; any previous Subscription
46+
* or Cancellation will be unsubscribed/cancelled.
47+
* @param c the cancellable resource, null is allowed
48+
*/
49+
void setCancellation(Cancellable c);
50+
/**
51+
* The current outstanding request amount.
52+
* <p>This method it thread-safe.
53+
* @return the current outstanding request amount
54+
*/
55+
long requested();
56+
57+
/**
58+
* Options to handle backpressure in the emitter.
59+
*/
60+
enum BackpressureMode {
61+
/**
62+
* No backpressure is applied as the onNext calls pass through the Emitter;
63+
* note that this may cause {@link rx.exceptions.MissingBackpressureException} or {@link IllegalStateException}
64+
* somewhere downstream.
65+
*/
66+
NONE,
67+
/**
68+
* Signals a {@link rx.exceptions.MissingBackpressureException} if the downstream can't keep up.
69+
*/
70+
ERROR,
71+
/**
72+
* Buffers (unbounded) all onNext calls until the downstream can consume them.
73+
*/
74+
BUFFER,
75+
/**
76+
* Drops the incoming onNext value if the downstream can't keep up.
77+
*/
78+
DROP,
79+
/**
80+
* Keeps the latest onNext value and overwrites it with newer ones until the downstream
81+
* can consume it.
82+
*/
83+
LATEST
84+
}
85+
}

src/main/java/rx/Observable.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2016,9 +2016,58 @@ public static <T> Observable<T> from(T[] array) {
20162016
* @see AsyncEmitter.BackpressureMode
20172017
* @see AsyncEmitter.Cancellable
20182018
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
2019+
* @deprecated since 1.2.1 because Async prefix of AsyncEmitter class is potentially misleading. Use
2020+
* {@link #fromEmitter(Action1, Emitter.BackpressureMode)} instead.
20192021
*/
20202022
@Experimental
2023+
@Deprecated
20212024
public static <T> Observable<T> fromEmitter(Action1<AsyncEmitter<T>> emitter, AsyncEmitter.BackpressureMode backpressure) {
2025+
return create(new OnSubscribeFromAsyncEmitter<T>(emitter, backpressure));
2026+
}
2027+
2028+
/**
2029+
* Provides an API (via a cold Observable) that bridges the reactive world with the callback-style,
2030+
* generally non-backpressured world.
2031+
* <p>
2032+
* Example:
2033+
* <pre><code>
2034+
* Observable.&lt;Event&gt;fromEmitter(emitter -&gt; {
2035+
* Callback listener = new Callback() {
2036+
* &#64;Override
2037+
* public void onEvent(Event e) {
2038+
* emitter.onNext(e);
2039+
* if (e.isLast()) {
2040+
* emitter.onCompleted();
2041+
* }
2042+
* }
2043+
*
2044+
* &#64;Override
2045+
* public void onFailure(Exception e) {
2046+
* emitter.onError(e);
2047+
* }
2048+
* };
2049+
*
2050+
* AutoCloseable c = api.someMethod(listener);
2051+
*
2052+
* emitter.setCancellation(c::close);
2053+
*
2054+
* }, BackpressureMode.BUFFER);
2055+
* </code></pre>
2056+
* <p>
2057+
* You should call the Emitter's onNext, onError and onCompleted methods in a serialized fashion. The
2058+
* rest of its methods are thread-safe.
2059+
*
2060+
* @param <T> the element type
2061+
* @param emitter the emitter that is called when a Subscriber subscribes to the returned {@code Observable}
2062+
* @param backpressure the backpressure mode to apply if the downstream Subscriber doesn't request (fast) enough
2063+
* @return the new Observable instance
2064+
* @see Emitter
2065+
* @see Emitter.BackpressureMode
2066+
* @see Emitter.Cancellable
2067+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
2068+
*/
2069+
@Experimental
2070+
public static <T> Observable<T> fromEmitter(Action1<Emitter<T>> emitter, Emitter.BackpressureMode backpressure) {
20222071
return create(new OnSubscribeFromEmitter<T>(emitter, backpressure));
20232072
}
20242073

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.functions;
18+
19+
import rx.annotations.Experimental;
20+
21+
/**
22+
* A functional interface that has a single close method that can throw.
23+
*/
24+
@Experimental
25+
public interface Cancellable {
26+
27+
/**
28+
* Cancel the action or free a resource.
29+
*
30+
* @throws Exception
31+
* on error
32+
*/
33+
void cancel() throws Exception;
34+
}

src/main/java/rx/internal/operators/CompletableFromEmitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import rx.AsyncEmitter.Cancellable;
2222
import rx.exceptions.Exceptions;
2323
import rx.functions.Action1;
24-
import rx.internal.operators.OnSubscribeFromEmitter.CancellableSubscription;
24+
import rx.internal.operators.OnSubscribeFromAsyncEmitter.CancellableSubscription;
2525
import rx.internal.subscriptions.SequentialSubscription;
2626
import rx.plugins.RxJavaHooks;
2727

0 commit comments

Comments
 (0)