|
1 | 1 | # RxJava Releases #
|
2 | 2 |
|
| 3 | +### Version 0.20.0 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.20.0%22)) ### |
| 4 | + |
| 5 | +RxJava 0.20.0 is a major release that adds "reactive pull" support for backpressure along with several other enhancements leading into the 1.0 release. |
| 6 | + |
| 7 | +#### Reactive Pull for Backpressure |
| 8 | + |
| 9 | +Solutions for backpressure was the major focus of this release. A "reactive pull" implementation was implemented. Documentation on this and other options for backpressure are found in the wiki: https://github.com/ReactiveX/RxJava/wiki/Backpressure |
| 10 | + |
| 11 | +The reactive pull solution evolved out of [several prototypes and interaction with many people over the months](https://github.com/ReactiveX/RxJava/issues/1000). |
| 12 | + |
| 13 | +##### Signature Changes |
| 14 | + |
| 15 | +A new type `Producer` has been added: |
| 16 | + |
| 17 | +```java |
| 18 | +public interface Producer { |
| 19 | + public void request(long n); |
| 20 | +} |
| 21 | +``` |
| 22 | + |
| 23 | +The `Subscriber` type now has these methods added: |
| 24 | + |
| 25 | +```java |
| 26 | +public abstract class Subscriber<T> implements Observer<T>, Subscription { |
| 27 | + public void onStart(); |
| 28 | + protected final void request(long n); |
| 29 | + public final void setProducer(Producer producer); |
| 30 | +} |
| 31 | +``` |
| 32 | + |
| 33 | + |
| 34 | +##### Examples |
| 35 | + |
| 36 | + |
| 37 | +This trivial example shows requesting values one at a time: |
| 38 | + |
| 39 | +```java |
| 40 | +Observable.from(1, 2, 3, 4).subscribe(new Subscriber<Integer>() { |
| 41 | + |
| 42 | + @Override |
| 43 | + public void onStart() { |
| 44 | + // on start this tells it to request 1 |
| 45 | + // otherwise it defaults to request(Long.MAX_VALUE) |
| 46 | + request(1); |
| 47 | + } |
| 48 | + |
| 49 | + @Override |
| 50 | + public void onCompleted() { |
| 51 | + } |
| 52 | + |
| 53 | + @Override |
| 54 | + public void onError(Throwable e) { |
| 55 | + } |
| 56 | + |
| 57 | + @Override |
| 58 | + public void onNext(Integer t) { |
| 59 | + System.out.println(t); |
| 60 | + // as each onNext is consumed, request another |
| 61 | + // otherwise the Producer will not send more |
| 62 | + request(1); |
| 63 | + } |
| 64 | + |
| 65 | +}); |
| 66 | +``` |
| 67 | + |
| 68 | +The [OnSubscribeFromIterable](https://github.com/ReactiveX/RxJava/blob/1.x/rxjava/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java) operator shows how an `Iterable` is consumed with backpressure. |
| 69 | + |
| 70 | +Some hi-lights (modified for simplicity rather than performance and completeness): |
| 71 | + |
| 72 | +```java |
| 73 | +public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> { |
| 74 | + |
| 75 | + @Override |
| 76 | + public void call(final Subscriber<? super T> o) { |
| 77 | + final Iterator<? extends T> it = is.iterator(); |
| 78 | + // instead of emitting directly to the Subscriber, it emits a Producer |
| 79 | + o.setProducer(new IterableProducer<T>(o, it)); |
| 80 | + } |
| 81 | + |
| 82 | + private static final class IterableProducer<T> implements Producer { |
| 83 | + |
| 84 | + public void request(long n) { |
| 85 | + int _c = requested.getAndAdd(n); |
| 86 | + if (_c == 0) { |
| 87 | + while (it.hasNext()) { |
| 88 | + if (o.isUnsubscribed()) { |
| 89 | + return; |
| 90 | + } |
| 91 | + T t = it.next(); |
| 92 | + o.onNext(t); |
| 93 | + if (requested.decrementAndGet() == 0) { |
| 94 | + // we're done emitting the number requested so return |
| 95 | + return; |
| 96 | + } |
| 97 | + } |
| 98 | + |
| 99 | + o.onCompleted(); |
| 100 | + } |
| 101 | + |
| 102 | + } |
| 103 | + } |
| 104 | +} |
| 105 | +``` |
| 106 | + |
| 107 | +The [`observeOn` operator](https://github.com/ReactiveX/RxJava/blob/1.x/rxjava/src/main/java/rx/internal/operators/OperatorObserveOn.java#L85) is a sterotypical example of queuing on one side of a thread and draining on the other, now with backpressure. |
| 108 | + |
| 109 | +```java |
| 110 | +private static final class ObserveOnSubscriber<T> extends Subscriber<T> { |
| 111 | + @Override |
| 112 | + public void onStart() { |
| 113 | + // signal that this is an async operator capable of receiving this many |
| 114 | + request(RxRingBuffer.SIZE); |
| 115 | + } |
| 116 | + |
| 117 | + @Override |
| 118 | + public void onNext(final T t) { |
| 119 | + try { |
| 120 | + // enqueue |
| 121 | + queue.onNext(t); |
| 122 | + } catch (MissingBackpressureException e) { |
| 123 | + // fail if the upstream has not obeyed our backpressure requests |
| 124 | + onError(e); |
| 125 | + return; |
| 126 | + } |
| 127 | + // attempt to schedule draining if needed |
| 128 | + schedule(); |
| 129 | + } |
| 130 | + |
| 131 | + // the scheduling polling will then drain the queue and invoke `request(n)` to request more after draining |
| 132 | +} |
| 133 | +``` |
| 134 | + |
| 135 | +Many use cases will be able to use `Observable.from`, `Observable.onBackpressureDrop` and `Observable.onBackpressureBuffer` to achieve "reactive pull backpressure" without manually implementing `Producer` logic. Also, it is optional to make an `Observable` support backpressure. It can remain completely reactive and just push events as it always has. Most uses of RxJava this works just fine. If backpressure is needed then it can be migrated to use a `Producer` or several other approaches to flow control exist such as `throttle`, `sample`, `debounce`, `window`, `buffer`, `onBackpressureBuffer`, and `onBackpressureDrop`. |
| 136 | + |
| 137 | + |
| 138 | +The [wiki](https://github.com/ReactiveX/RxJava/wiki/Backpressure) provides further documentation. |
| 139 | + |
| 140 | +##### Relation to Reactive Streams |
| 141 | + |
| 142 | +Contributors to RxJava are involved in defining the [Reactive Streams](https://github.com/reactive-streams/reactive-streams/) spec. RxJava 1.0 is trying to comply with the semantic rules but is not attempting to comply with the type signatures. It will however have a separate module that acts as a bridge between the RxJava `Observable` and the Reactive Stream types. |
| 143 | + |
| 144 | +The reasons for this are: |
| 145 | + |
| 146 | +- Rx has `Observer.onCompleted` whereas Reactive Streams has `onComplete`. This is a massive breaking change to remove a "d". |
| 147 | +- The RxJava `Subscription` is used also a "Closeable"/"Disposable" and it does not work well to make it now also be used for `request(n)`, hence the separate type `Producer` in RxJava. It was attempted to reuse `rx.Subscription` but it couldn't be done without massive breaking changes. |
| 148 | +- Reactive Streams uses `onSubscribe(Subscription s)` whereas RxJava injects the `Subscription` as the `Subscriber`. Again, this change could not be done without major breaking changes. |
| 149 | +- RxJava 1.0 needs to be backwards compatible with the major Rx contracts established during the 0.x roadmap. |
| 150 | + |
| 151 | +Considering these things, the major semantics of `request(long n)` for backpressure are compatible and this will allow interop with a bridge between the interfaces. |
| 152 | + |
| 153 | + |
| 154 | + |
| 155 | +#### New Features |
| 156 | + |
| 157 | +##### Compose/Transformer |
| 158 | + |
| 159 | +The `compose` operator is similar to `lift` but allows custom operator implementations that are chaining `Observable` operators whereas `lift` is directly implementing the raw `Subscriber` logic. |
| 160 | + |
| 161 | +Here is a trival example demonstrating how using `compose` is a better option than `lift` when existing `Observable` operators can be used to achieve the custom behavior. |
| 162 | + |
| 163 | +```java |
| 164 | +import rx.Observable; |
| 165 | +import rx.Observable.Operator; |
| 166 | +import rx.Observable.Transformer; |
| 167 | +import rx.Subscriber; |
| 168 | + |
| 169 | +public class ComposeExample { |
| 170 | + |
| 171 | + public static void main(String[] args) { |
| 172 | + Observable.just("hello").compose(appendWorldTransformer()).forEach(System.out::println); |
| 173 | + Observable.just("hello").lift(appendWorldOperator()).forEach(System.out::println); |
| 174 | + } |
| 175 | + |
| 176 | + // if existing operators can be used, compose with Transformer is ideal |
| 177 | + private static Transformer<? super String, String> appendWorldTransformer() { |
| 178 | + return o -> o.map(s -> s + " world!").finallyDo(() -> { |
| 179 | + System.out.println(" some side-effect"); |
| 180 | + }); |
| 181 | + } |
| 182 | + |
| 183 | + // whereas lift is more low level |
| 184 | + private static Operator<? super String, String> appendWorldOperator() { |
| 185 | + return new Operator<String, String>() { |
| 186 | + |
| 187 | + @Override |
| 188 | + public Subscriber<? super String> call(Subscriber<? super String> child) { |
| 189 | + return new Subscriber<String>(child) { |
| 190 | + |
| 191 | + @Override |
| 192 | + public void onCompleted() { |
| 193 | + child.onCompleted(); |
| 194 | + } |
| 195 | + |
| 196 | + @Override |
| 197 | + public void onError(Throwable e) { |
| 198 | + child.onError(e); |
| 199 | + } |
| 200 | + |
| 201 | + @Override |
| 202 | + public void onNext(String t) { |
| 203 | + child.onNext(t + " world!"); |
| 204 | + System.out.println(" some side-effect"); |
| 205 | + } |
| 206 | + |
| 207 | + }; |
| 208 | + } |
| 209 | + |
| 210 | + }; |
| 211 | + } |
| 212 | +} |
| 213 | +``` |
| 214 | + |
| 215 | +##### retryWhen/repeatWhen |
| 216 | + |
| 217 | +New operators `retryWhen` and `repeatWhen` were added which offer support for more advanced recursion such as retry with exponential backoff. |
| 218 | + |
| 219 | +Here is an example that increases delay between each retry: |
| 220 | + |
| 221 | +```java |
| 222 | +Observable.create((Subscriber<? super String> s) -> { |
| 223 | + System.out.println("subscribing"); |
| 224 | + s.onError(new RuntimeException("always fails")); |
| 225 | +}).retryWhen(attempts -> { |
| 226 | + return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> { |
| 227 | + System.out.println("delay retry by " + i + " second(s)"); |
| 228 | + return Observable.timer(i, TimeUnit.SECONDS); |
| 229 | + }); |
| 230 | +}).toBlocking().forEach(System.out::println); |
| 231 | +``` |
| 232 | + |
| 233 | + |
| 234 | +#### Breaking Changes |
| 235 | + |
| 236 | + |
| 237 | +The use of `Producer` has been added in such a way that it is optional and additive, but some operators that used to have unbounded queues are now bounded. This means that if a source `Observable` emits faster than the `Observer` can consume them, a `MissingBackpressureException` can be emitted via `onError`. |
| 238 | + |
| 239 | +This semantic change can break existing code. |
| 240 | + |
| 241 | +There are two ways of resolving this: |
| 242 | + |
| 243 | +1) Modify the source `Observable` to use `Producer` and support backpressure. |
| 244 | +2) Use newly added operators such as `onBackpressureBuffer` or `onBackpressureDrop` to choose a strategy for the source `Observable` of how to behave when it emits more data than the consuming `Observer` is capable of handling. Use of `onBackpressureBuffer` effectively returns it to having an unbounded buffer and behaving like version 0.19 or earlier. |
| 245 | + |
| 246 | +Example: |
| 247 | + |
| 248 | +```java |
| 249 | +sourceObservable.onBackpressureBuffer().subscribe(slowConsumer); |
| 250 | +``` |
| 251 | + |
| 252 | + |
| 253 | +#### Deprecations |
| 254 | + |
| 255 | +Various methods, operators or classes have been deprecated and will be removed in 1.0. Primarily they have been done to remove ambiguity, remove nuanced functionality that is easy to use wrong, clear out superfluous methods and eliminate cruft that was added during the 0.x development process but has been replaced. |
| 256 | + |
| 257 | +For example, `Observable.from(T)` was deprecated in favor of `Observable.just(T)` despite being a [painful breaking change](https://github.com/ReactiveX/RxJava/issues/1563) so as to solve ambiguity with `Observable.from(Iterable)`. |
| 258 | + |
| 259 | +This means that the upgrade from 0.20 to 1.0 will be breaking. This is being done so that the 1.x version can be a long-lived stable API built upon as clean a foundation as possible. |
| 260 | + |
| 261 | +A stable API for RxJava is important because it is intended to be a foundational library that many projects will depend upon. The deprecations are intended to help this be achieved. |
| 262 | + |
| 263 | +#### Future |
| 264 | + |
| 265 | +The next release will be 1.0 (after a few release candidates). The RxJava project has been split up into many new top-level projects at https://github.com/ReactiveX so each of their release cycles and version strategies can be decoupled. |
| 266 | + |
| 267 | +The 1.x version is intended to be stable for many years and target Java 6, 7 and 8. The expected outcome is for a 2.x version to target Java 8+ but for RxJava 1.x and 2.x to co-exist and both be living, supported versions. |
| 268 | + |
3 | 269 |
|
4 | 270 | ### Version 0.20.0-RC6 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.20.0-RC6%22)) ###
|
5 | 271 |
|
|
0 commit comments