Skip to content

Commit b86e1da

Browse files
Use Subscriber.onStart as idiomatic backpressure start point
1 parent 032d3af commit b86e1da

File tree

8 files changed

+277
-26
lines changed

8 files changed

+277
-26
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,10 @@ public final <R> Observable<R> lift(final Operator<? extends R, ? super T> lift)
157157
@Override
158158
public void call(Subscriber<? super R> o) {
159159
try {
160-
onSubscribe.call(hook.onLift(lift).call(o));
160+
Subscriber<? super T> st = hook.onLift(lift).call(o);
161+
// new Subscriber created and being subscribed with so 'onStart' it
162+
st.onStart();
163+
onSubscribe.call(st);
161164
} catch (Throwable e) {
162165
// localized capture of errors rather than it skipping all operators
163166
// and ending up in the try/catch of the subscribe method which then
@@ -6837,6 +6840,8 @@ public void onNext(T t) {
68376840
*/
68386841
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
68396842
try {
6843+
// new Subscriber so onStart it
6844+
subscriber.onStart();
68406845
// allow the hook to intercept and/or decorate
68416846
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
68426847
return hook.onSubscribeReturn(subscriber);
@@ -6908,6 +6913,10 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
69086913
* so I won't mention that in the exception
69096914
*/
69106915
}
6916+
6917+
// new Subscriber so onStart it
6918+
subscriber.onStart();
6919+
69116920
/*
69126921
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
69136922
* to user code from within an Observer"

rxjava-core/src/main/java/rx/Subscriber.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,23 +52,11 @@ protected Subscriber() {
5252
this.cs = new SubscriptionList();
5353
}
5454

55-
protected Subscriber(int bufferRequest) {
56-
this.op = null;
57-
this.cs = new SubscriptionList();
58-
request(bufferRequest);
59-
}
60-
6155
protected Subscriber(Subscriber<?> op) {
6256
this.op = op;
6357
this.cs = op.cs;
6458
}
6559

66-
protected Subscriber(Subscriber<?> op, int bufferRequest) {
67-
this.op = op;
68-
this.cs = op.cs;
69-
request(bufferRequest);
70-
}
71-
7260
/**
7361
* Adds a {@link Subscription} to this Subscriber's list of subscriptions if this list is not marked as
7462
* unsubscribed. If the list <em>is</em> marked as unsubscribed, {@code add} will indicate this by
@@ -95,6 +83,10 @@ public final boolean isUnsubscribed() {
9583
return cs.isUnsubscribed();
9684
}
9785

86+
public void onStart() {
87+
// do nothing by default
88+
}
89+
9890
public final void request(int n) {
9991
Producer shouldRequest = null;
10092
synchronized (this) {

rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,21 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex
7777
*/
7878

7979
public MergeSubscriber(Subscriber<? super T> actual) {
80-
// we request backpressure so we can handle long-running Observables that are enqueueing, such as flatMap use cases
81-
// passing the request value in decouples the Producer chain while keeping the Subscription chain together (perf benefit)
82-
super(actual, RxRingBuffer.SIZE);
80+
super(actual);
8381
this.actual = actual;
8482
this.mergeProducer = new MergeProducer<T>(this);
8583
// decoupled the subscription chain because we need to decouple and control backpressure
8684
actual.add(this);
8785
actual.setProducer(mergeProducer);
8886
}
8987

88+
@Override
89+
public void onStart() {
90+
// we request backpressure so we can handle long-running Observables that are enqueueing, such as flatMap use cases
91+
// we decouple the Producer chain while keeping the Subscription chain together (perf benefit) via super(actual)
92+
request(RxRingBuffer.SIZE);
93+
}
94+
9095
/*
9196
* This is expected to be executed sequentially as per the Rx contract or it will not work.
9297
*/

rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
8383
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
8484
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
8585
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
86-
// signal that this is an async operator capable of receiving this many
87-
super(RxRingBuffer.SIZE);
8886
this.child = child;
8987
this.recursiveScheduler = scheduler.createWorker();
9088
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler, queue);
@@ -103,6 +101,12 @@ public void request(int n) {
103101
child.add(this);
104102

105103
}
104+
105+
@Override
106+
public void onStart() {
107+
// signal that this is an async operator capable of receiving this many
108+
request(RxRingBuffer.SIZE);
109+
}
106110

107111
@Override
108112
public void onNext(final T t) {

rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,14 @@ public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
4747
final QueueProducer<T> producer = new QueueProducer<T>(notification, deque, subscriber);
4848
subscriber.setProducer(producer);
4949

50-
// no backpressure up as it wants to receive and discard all but the last
51-
return new Subscriber<T>(subscriber, -1) {
50+
return new Subscriber<T>(subscriber) {
51+
52+
// no backpressure up as it wants to receive and discard all but the last
53+
@Override
54+
public void onStart() {
55+
// we do this to break the chain of the child subscriber being passed through
56+
request(-1);
57+
}
5258

5359
@Override
5460
public void onCompleted() {

rxjava-core/src/perf/java/rx/operators/OperatorRangePerf.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,13 @@ public void setup(final Blackhole bh) {
4040
}
4141

4242
public Subscriber<Integer> newSubscriber() {
43-
return new Subscriber<Integer>(size) {
43+
return new Subscriber<Integer>() {
4444

45+
@Override
46+
public void onStart() {
47+
request(size);
48+
}
49+
4550
@Override
4651
public void onCompleted() {
4752

rxjava-core/src/test/java/rx/BackpressureTests.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,8 +303,13 @@ public void testUserSubscriberUsingRequestSync() {
303303
final AtomicInteger totalReceived = new AtomicInteger();
304304
final AtomicInteger batches = new AtomicInteger();
305305
final AtomicInteger received = new AtomicInteger();
306-
incrementingIntegers(c).subscribe(new Subscriber<Integer>(100) {
306+
incrementingIntegers(c).subscribe(new Subscriber<Integer>() {
307307

308+
@Override
309+
public void onStart() {
310+
request(100);
311+
}
312+
308313
@Override
309314
public void onCompleted() {
310315

@@ -344,8 +349,14 @@ public void testUserSubscriberUsingRequestAsync() throws InterruptedException {
344349
final AtomicInteger received = new AtomicInteger();
345350
final AtomicInteger batches = new AtomicInteger();
346351
final CountDownLatch latch = new CountDownLatch(1);
347-
incrementingIntegers(c).subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>(100) {
352+
incrementingIntegers(c).subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {
348353

354+
@Override
355+
public void onStart() {
356+
request(100);
357+
}
358+
359+
349360
@Override
350361
public void onCompleted() {
351362
latch.countDown();

0 commit comments

Comments
 (0)