|
1 | 1 | # RxJava Releases #
|
2 | 2 |
|
| 3 | +### Version 0.20.0-RC1 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.20.0-RC1%22)) ### |
| 4 | + |
| 5 | + |
| 6 | +Version 0.20.0-RC1 is a preview release that adds backpressure support to RxJava as per issue [#1000](https://github.com/Netflix/RxJava/issues/1000). It has been done in a way that is mostly additive and most existing code will not be affected by these additions. A section below on "Breaking Changes" will discuss use cases that do break and how to deal with them. |
| 7 | + |
| 8 | +This release has been tested successfully in Netflix production canaries, but that does not exercise all use cases or operators, nor does it leverage the newly added backpressure functionality (though the backpressure code paths are used). |
| 9 | + |
| 10 | + |
| 11 | +#### Outstanding Work |
| 12 | + |
| 13 | +- The `zip` operator has not yet been upgraded to support backpressure. The work is almost done and it will be included in the next release. |
| 14 | +- Not all operators have yet been reviewed for whether they need to be changed in any way. |
| 15 | +- Temporal operators (like `buffer`, `window`, `sample`, etc) need to be modified to disable backpressure upstream (using `request(Long.MAX_VALUE)`) and a decision made about how downstream backpressure requests will be supported. |
| 16 | +- Ensure all code works on Android. New data structures rely on `sun.misc.Unsafe` but are conditionally used only when it is available. We need to ensure those conditions are working and the alternative implementations are adequate. The default buffer size of 1024 also needs to be reviewed for whether it is a correct default for all systems, or needs to be modified by environment (such as smaller for Android). |
| 17 | +- Ensure use cases needing backpressure all work. |
| 18 | + |
| 19 | +#### Signature Changes |
| 20 | + |
| 21 | +A new type `Producer` has been added: |
| 22 | + |
| 23 | +```java |
| 24 | +public interface Producer { |
| 25 | + public void request(long n); |
| 26 | +} |
| 27 | +``` |
| 28 | + |
| 29 | +The `Subscriber` type now has these methods added: |
| 30 | + |
| 31 | +```java |
| 32 | +public abstract class Subscriber<T> implements Observer<T>, Subscription { |
| 33 | + public void onStart(); |
| 34 | + public final void request(long n); |
| 35 | + public final void setProducer(Producer producer); |
| 36 | + protected Producer onSetProducer(Producer producer); |
| 37 | +} |
| 38 | +``` |
| 39 | + |
| 40 | + |
| 41 | +#### Examples |
| 42 | + |
| 43 | + |
| 44 | +This trivial example shows requesting values one at a time: |
| 45 | + |
| 46 | +```java |
| 47 | +Observable.from(1, 2, 3, 4).subscribe(new Subscriber<Integer>() { |
| 48 | + |
| 49 | + @Override |
| 50 | + public void onStart() { |
| 51 | + request(1); |
| 52 | + } |
| 53 | + |
| 54 | + @Override |
| 55 | + public void onCompleted() { |
| 56 | + } |
| 57 | + |
| 58 | + @Override |
| 59 | + public void onError(Throwable e) { |
| 60 | + } |
| 61 | + |
| 62 | + @Override |
| 63 | + public void onNext(Integer t) { |
| 64 | + request(1); |
| 65 | + } |
| 66 | + |
| 67 | +}); |
| 68 | +``` |
| 69 | + |
| 70 | +The [OnSubscribeFromIterable](https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java) operator shows how an `Iterable` is consumed with backpressure. |
| 71 | + |
| 72 | +Some hi-lights (modified for simplicity rather than performance and completeness): |
| 73 | + |
| 74 | +```java |
| 75 | +public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> { |
| 76 | + |
| 77 | + @Override |
| 78 | + public void call(final Subscriber<? super T> o) { |
| 79 | + final Iterator<? extends T> it = is.iterator(); |
| 80 | + // instead of emitting directly to the Subscriber, it emits a Producer |
| 81 | + o.setProducer(new IterableProducer<T>(o, it)); |
| 82 | + } |
| 83 | + |
| 84 | + private static final class IterableProducer<T> implements Producer { |
| 85 | + |
| 86 | + public void request(long n) { |
| 87 | + int _c = requested.getAndAdd(n); |
| 88 | + if (_c == 0) { |
| 89 | + while (it.hasNext()) { |
| 90 | + if (o.isUnsubscribed()) { |
| 91 | + return; |
| 92 | + } |
| 93 | + T t = it.next(); |
| 94 | + o.onNext(t); |
| 95 | + if (requested.decrementAndGet() == 0) { |
| 96 | + // we're done emitting the number requested so return |
| 97 | + return; |
| 98 | + } |
| 99 | + } |
| 100 | + |
| 101 | + o.onCompleted(); |
| 102 | + } |
| 103 | + |
| 104 | + } |
| 105 | + } |
| 106 | +} |
| 107 | +``` |
| 108 | + |
| 109 | +The [`observeOn` operator](https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java#L85) is a sterotypical example of queuing on one side of a thread and draining on the other, now with backpressure. |
| 110 | + |
| 111 | +```java |
| 112 | +private static final class ObserveOnSubscriber<T> extends Subscriber<T> { |
| 113 | + @Override |
| 114 | + public void onStart() { |
| 115 | + // signal that this is an async operator capable of receiving this many |
| 116 | + request(RxRingBuffer.SIZE); |
| 117 | + } |
| 118 | + |
| 119 | + @Override |
| 120 | + public void onNext(final T t) { |
| 121 | + try { |
| 122 | + // enqueue |
| 123 | + queue.onNext(t); |
| 124 | + } catch (MissingBackpressureException e) { |
| 125 | + // fail if the upstream has not obeyed our backpressure requests |
| 126 | + onError(e); |
| 127 | + return; |
| 128 | + } |
| 129 | + // attempt to schedule draining if needed |
| 130 | + schedule(); |
| 131 | + } |
| 132 | + |
| 133 | + // the scheduling polling will then drain the queue and invoke `request(n)` to request more after draining |
| 134 | +} |
| 135 | +``` |
| 136 | + |
| 137 | + |
| 138 | +#### Breaking Changes |
| 139 | + |
| 140 | +The use of `Producer` has been added in such a way that it is optional and additive, but some operators that used to have unbounded queues are now bounded. This means that if a source `Observable` emits faster than the `Observer` can consume them, a `MissingBackpressureException` can be emitted via `onError`. |
| 141 | + |
| 142 | +This semantic change can break existing code. |
| 143 | + |
| 144 | +There are two ways of resolving this: |
| 145 | + |
| 146 | +1) Modify the source `Observable` to use `Producer` and support backpressure. |
| 147 | +2) Use newly added operators such as `onBackpressureBuffer` or `onBackpressureDrop` to choose a strategy for the source `Observable` of how to behave when it emits more data than the consuming `Observer` is capable of handling. Use of `onBackpressureBuffer` effectively returns it to having an unbounded buffer and behaving like version 0.19 or earlier. |
| 148 | + |
| 149 | +Example: |
| 150 | + |
| 151 | +```java |
| 152 | +sourceObservable.onBackpressureBuffer().subscribe(slowConsumer); |
| 153 | +``` |
| 154 | + |
| 155 | +#### Relation to Reactive Streams |
| 156 | + |
| 157 | +Contributors to RxJava are involved in defining the [Reactive Streams](https://github.com/reactive-streams/reactive-streams/) spec. RxJava 1.0 is trying to comply with the semantic rules but is not attempting to comply with the type signatures. It will however have a separate module that acts as a bridge between the RxJava `Observable` and the Reactive Stream types. |
| 158 | + |
| 159 | +The reasons for this are: |
| 160 | + |
| 161 | +- Rx has `Observer.onCompleted` whereas Reactive Streams has `onComplete`. This is a massive breaking change to remove a "d". |
| 162 | +- The RxJava `Subscription` is used also a "Closeable"/"Disposable" and it does not work well to make it now also be used for `request(n)`, hence the separate type `Producer` in RxJava. It was attempted to reuse `rx.Subscription` but it couldn't be done without massive breaking changes. |
| 163 | +- Reactive Streams uses `onSubscribe(Subscription s)` whereas RxJava injects the `Subscription` as the `Subscriber`. Again, this change could not be done without major breaking changes. |
| 164 | +- RxJava 1.0 needs to be backwards compatible with the major Rx contracts established during the 0.x roadmap. |
| 165 | +- Reactive Streams is not yet 1.0 and despite significant progress, it is a moving target. |
| 166 | + |
| 167 | +Considering these things, the major semantics of `request(long n)` for backpressure are compatible and this will allow interop with a bridge between the interfaces. As the Reactive Streams spec matures, RxJava 2.0 may choose to fully adopt the types in the future while RxJava 1.x retains the current signatures. |
| 168 | + |
| 169 | + |
| 170 | +#### How to Help |
| 171 | + |
| 172 | +First, please test this release against your existing code to help us determine if we have broken anything. |
| 173 | + |
| 174 | +Second, try to solve backpressure use cases and provide feedback on what works and what doesn't work. |
| 175 | + |
| 176 | +Thank you! |
| 177 | + |
| 178 | + |
3 | 179 | ### Version 0.19.6 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.19.6%22)) ###
|
4 | 180 |
|
5 | 181 | Inclusion of 'rxjava-contrib:rxjava-scalaz' in release.
|
|
0 commit comments