Skip to content

Commit 81f0bbd

Browse files
Merge pull request #1417 from benjchristensen/onStart
Proposal: Subscriber.onStart
2 parents 032d3af + 6ca3b43 commit 81f0bbd

File tree

9 files changed

+284
-29
lines changed

9 files changed

+284
-29
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,10 @@ public final <R> Observable<R> lift(final Operator<? extends R, ? super T> lift)
157157
@Override
158158
public void call(Subscriber<? super R> o) {
159159
try {
160-
onSubscribe.call(hook.onLift(lift).call(o));
160+
Subscriber<? super T> st = hook.onLift(lift).call(o);
161+
// new Subscriber created and being subscribed with so 'onStart' it
162+
st.onStart();
163+
onSubscribe.call(st);
161164
} catch (Throwable e) {
162165
// localized capture of errors rather than it skipping all operators
163166
// and ending up in the try/catch of the subscribe method which then
@@ -6837,6 +6840,8 @@ public void onNext(T t) {
68376840
*/
68386841
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
68396842
try {
6843+
// new Subscriber so onStart it
6844+
subscriber.onStart();
68406845
// allow the hook to intercept and/or decorate
68416846
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
68426847
return hook.onSubscribeReturn(subscriber);
@@ -6908,6 +6913,10 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
69086913
* so I won't mention that in the exception
69096914
*/
69106915
}
6916+
6917+
// new Subscriber so onStart it
6918+
subscriber.onStart();
6919+
69116920
/*
69126921
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
69136922
* to user code from within an Observer"

rxjava-core/src/main/java/rx/Subscriber.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,23 +52,11 @@ protected Subscriber() {
5252
this.cs = new SubscriptionList();
5353
}
5454

55-
protected Subscriber(int bufferRequest) {
56-
this.op = null;
57-
this.cs = new SubscriptionList();
58-
request(bufferRequest);
59-
}
60-
6155
protected Subscriber(Subscriber<?> op) {
6256
this.op = op;
6357
this.cs = op.cs;
6458
}
6559

66-
protected Subscriber(Subscriber<?> op, int bufferRequest) {
67-
this.op = op;
68-
this.cs = op.cs;
69-
request(bufferRequest);
70-
}
71-
7260
/**
7361
* Adds a {@link Subscription} to this Subscriber's list of subscriptions if this list is not marked as
7462
* unsubscribed. If the list <em>is</em> marked as unsubscribed, {@code add} will indicate this by
@@ -95,6 +83,10 @@ public final boolean isUnsubscribed() {
9583
return cs.isUnsubscribed();
9684
}
9785

86+
public void onStart() {
87+
// do nothing by default
88+
}
89+
9890
public final void request(int n) {
9991
Producer shouldRequest = null;
10092
synchronized (this) {

rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,21 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex
7777
*/
7878

7979
public MergeSubscriber(Subscriber<? super T> actual) {
80-
// we request backpressure so we can handle long-running Observables that are enqueueing, such as flatMap use cases
81-
// passing the request value in decouples the Producer chain while keeping the Subscription chain together (perf benefit)
82-
super(actual, RxRingBuffer.SIZE);
80+
super(actual);
8381
this.actual = actual;
8482
this.mergeProducer = new MergeProducer<T>(this);
8583
// decoupled the subscription chain because we need to decouple and control backpressure
8684
actual.add(this);
8785
actual.setProducer(mergeProducer);
8886
}
8987

88+
@Override
89+
public void onStart() {
90+
// we request backpressure so we can handle long-running Observables that are enqueueing, such as flatMap use cases
91+
// we decouple the Producer chain while keeping the Subscription chain together (perf benefit) via super(actual)
92+
request(RxRingBuffer.SIZE);
93+
}
94+
9095
/*
9196
* This is expected to be executed sequentially as per the Rx contract or it will not work.
9297
*/

rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
8383
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
8484
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
8585
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
86-
// signal that this is an async operator capable of receiving this many
87-
super(RxRingBuffer.SIZE);
8886
this.child = child;
8987
this.recursiveScheduler = scheduler.createWorker();
9088
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler, queue);
@@ -103,6 +101,12 @@ public void request(int n) {
103101
child.add(this);
104102

105103
}
104+
105+
@Override
106+
public void onStart() {
107+
// signal that this is an async operator capable of receiving this many
108+
request(RxRingBuffer.SIZE);
109+
}
106110

107111
@Override
108112
public void onNext(final T t) {

rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,14 @@ public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
4747
final QueueProducer<T> producer = new QueueProducer<T>(notification, deque, subscriber);
4848
subscriber.setProducer(producer);
4949

50-
// no backpressure up as it wants to receive and discard all but the last
51-
return new Subscriber<T>(subscriber, -1) {
50+
return new Subscriber<T>(subscriber) {
51+
52+
// no backpressure up as it wants to receive and discard all but the last
53+
@Override
54+
public void onStart() {
55+
// we do this to break the chain of the child subscriber being passed through
56+
request(-1);
57+
}
5258

5359
@Override
5460
public void onCompleted() {

rxjava-core/src/perf/java/rx/operators/OperatorRangePerf.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,13 @@ public void setup(final Blackhole bh) {
4040
}
4141

4242
public Subscriber<Integer> newSubscriber() {
43-
return new Subscriber<Integer>(size) {
43+
return new Subscriber<Integer>() {
4444

45+
@Override
46+
public void onStart() {
47+
request(size);
48+
}
49+
4550
@Override
4651
public void onCompleted() {
4752

rxjava-core/src/test/java/rx/BackpressureTests.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,12 @@ public Observable<Integer> call(Integer i) {
173173
}).take(NUM).subscribe(ts);
174174
ts.awaitTerminalEvent();
175175
ts.assertNoErrors();
176-
System.out.println("testFlatMapAsync => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c.get());
176+
System.out.println("testFlatMapAsync => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c.get() + " Size: " + RxRingBuffer.SIZE);
177177
assertEquals(NUM, ts.getOnNextEvents().size());
178-
// expect less than 1 buffer since the flatMap is emitting 10 each time, so it is NUM/10 that will be taken.
179-
assertTrue(c.get() <= RxRingBuffer.SIZE);
178+
// even though we only need 10, it will request at least RxRingBuffer.SIZE, and then as it drains keep requesting more
179+
// and then it will be non-deterministic when the take() causes the unsubscribe as it is scheduled on 10 different schedulers (threads)
180+
// normally this number is ~250 but can get up to ~1200 when RxRingBuffer.SIZE == 1024
181+
assertTrue(c.get() <= RxRingBuffer.SIZE * 2);
180182
}
181183

182184
@Ignore
@@ -303,8 +305,13 @@ public void testUserSubscriberUsingRequestSync() {
303305
final AtomicInteger totalReceived = new AtomicInteger();
304306
final AtomicInteger batches = new AtomicInteger();
305307
final AtomicInteger received = new AtomicInteger();
306-
incrementingIntegers(c).subscribe(new Subscriber<Integer>(100) {
308+
incrementingIntegers(c).subscribe(new Subscriber<Integer>() {
307309

310+
@Override
311+
public void onStart() {
312+
request(100);
313+
}
314+
308315
@Override
309316
public void onCompleted() {
310317

@@ -344,8 +351,14 @@ public void testUserSubscriberUsingRequestAsync() throws InterruptedException {
344351
final AtomicInteger received = new AtomicInteger();
345352
final AtomicInteger batches = new AtomicInteger();
346353
final CountDownLatch latch = new CountDownLatch(1);
347-
incrementingIntegers(c).subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>(100) {
354+
incrementingIntegers(c).subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {
348355

356+
@Override
357+
public void onStart() {
358+
request(100);
359+
}
360+
361+
349362
@Override
350363
public void onCompleted() {
351364
latch.countDown();

0 commit comments

Comments
 (0)