Skip to content

Commit df94c0d

Browse files
authored
2.x: cleanup and fixes 10/03-2 (#4663)
1 parent 50598b3 commit df94c0d

35 files changed

+2690
-217
lines changed

src/main/java/io/reactivex/Scheduler.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import io.reactivex.disposables.Disposable;
1919
import io.reactivex.exceptions.Exceptions;
20-
import io.reactivex.internal.disposables.SequentialDisposable;
20+
import io.reactivex.internal.disposables.*;
2121
import io.reactivex.internal.util.ExceptionHelper;
2222
import io.reactivex.plugins.RxJavaPlugins;
2323

@@ -163,7 +163,10 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo
163163

164164
PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
165165

166-
w.schedulePeriodically(periodicTask, initialDelay, period, unit);
166+
Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
167+
if (d == EmptyDisposable.INSTANCE) {
168+
return d;
169+
}
167170

168171
return periodicTask;
169172
}
@@ -235,8 +238,13 @@ public Disposable schedulePeriodically(Runnable run, final long initialDelay, fi
235238
final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
236239
final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
237240

238-
first.replace(schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
239-
periodInNanoseconds), initialDelay, unit));
241+
Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
242+
periodInNanoseconds), initialDelay, unit);
243+
244+
if (d == EmptyDisposable.INSTANCE) {
245+
return d;
246+
}
247+
first.replace(d);
240248

241249
return sd;
242250
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnEach.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void onError(Throwable t) {
101101
onError.accept(t);
102102
} catch (Throwable e) {
103103
Exceptions.throwIfFatal(e);
104-
actual.onError(new CompositeException(e, t));
104+
actual.onError(new CompositeException(t, e));
105105
relay = false;
106106
}
107107
if (relay) {
@@ -121,14 +121,14 @@ public void onComplete() {
121121
if (done) {
122122
return;
123123
}
124-
done = true;
125124
try {
126125
onComplete.run();
127126
} catch (Throwable e) {
128127
fail(e);
129128
return;
130129
}
131130

131+
done = true;
132132
actual.onComplete();
133133

134134
try {
@@ -211,10 +211,6 @@ public boolean tryOnNext(T t) {
211211
return false;
212212
}
213213

214-
if (sourceMode != NONE) {
215-
return actual.tryOnNext(null);
216-
}
217-
218214
try {
219215
onNext.accept(t);
220216
} catch (Throwable e) {
@@ -237,7 +233,7 @@ public void onError(Throwable t) {
237233
onError.accept(t);
238234
} catch (Throwable e) {
239235
Exceptions.throwIfFatal(e);
240-
actual.onError(new CompositeException(e, t));
236+
actual.onError(new CompositeException(t, e));
241237
relay = false;
242238
}
243239
if (relay) {
@@ -257,14 +253,14 @@ public void onComplete() {
257253
if (done) {
258254
return;
259255
}
260-
done = true;
261256
try {
262257
onComplete.run();
263258
} catch (Throwable e) {
264259
fail(e);
265260
return;
266261
}
267262

263+
done = true;
268264
actual.onComplete();
269265

270266
try {

src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java

Lines changed: 18 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -45,33 +45,14 @@ public FlowableObserveOn(
4545

4646
@Override
4747
public void subscribeActual(Subscriber<? super T> s) {
48-
49-
// FIXME add macro-optimization
50-
// if (PublisherSubscribeOnValue.scalarScheduleOn(source, s, scheduler)) {
51-
// return;
52-
// }
53-
54-
Worker worker;
55-
56-
try {
57-
worker = scheduler.createWorker();
58-
} catch (Throwable e) {
59-
Exceptions.throwIfFatal(e);
60-
EmptySubscription.error(e, s);
61-
return;
62-
}
63-
64-
if (worker == null) {
65-
EmptySubscription.error(new NullPointerException("The scheduler returned a null Function"), s);
66-
return;
67-
}
48+
Worker worker = scheduler.createWorker();
6849

6950
if (s instanceof ConditionalSubscriber) {
70-
ConditionalSubscriber<? super T> cs = (ConditionalSubscriber<? super T>) s;
71-
source.subscribe(new PublisherObserveOnConditionalSubscriber<T>(cs, worker, delayError, prefetch));
72-
return;
51+
source.subscribe(new PublisherObserveOnConditionalSubscriber<T>(
52+
(ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
53+
} else {
54+
source.subscribe(new PublisherObserveOnSubscriber<T>(s, worker, delayError, prefetch));
7355
}
74-
source.subscribe(new PublisherObserveOnSubscriber<T>(s, worker, delayError, prefetch));
7556
}
7657

7758
abstract static class BaseObserveOnSubscriber<T>
@@ -113,20 +94,7 @@ abstract static class BaseObserveOnSubscriber<T>
11394
this.delayError = delayError;
11495
this.prefetch = prefetch;
11596
this.requested = new AtomicLong();
116-
117-
if (prefetch != Integer.MAX_VALUE) {
118-
this.limit = prefetch - (prefetch >> 2);
119-
} else {
120-
this.limit = Integer.MAX_VALUE;
121-
}
122-
}
123-
124-
final void initialRequest() {
125-
if (prefetch == Integer.MAX_VALUE) {
126-
s.request(Long.MAX_VALUE);
127-
} else {
128-
s.request(prefetch);
129-
}
97+
this.limit = prefetch - (prefetch >> 2);
13098
}
13199

132100
@Override
@@ -206,7 +174,7 @@ public final void run() {
206174

207175
final boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) {
208176
if (cancelled) {
209-
queue.clear();
177+
clear();
210178
return true;
211179
}
212180
if (d) {
@@ -223,7 +191,7 @@ final boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) {
223191
} else {
224192
Throwable e = error;
225193
if (e != null) {
226-
queue.clear();
194+
clear();
227195
doError(a, e);
228196
return true;
229197
} else
@@ -314,7 +282,7 @@ public void onSubscribe(Subscription s) {
314282

315283
actual.onSubscribe(this);
316284

317-
initialRequest();
285+
s.request(prefetch);
318286

319287
return;
320288
}
@@ -324,7 +292,7 @@ public void onSubscribe(Subscription s) {
324292

325293
actual.onSubscribe(this);
326294

327-
initialRequest();
295+
s.request(prefetch);
328296
}
329297
}
330298

@@ -370,17 +338,7 @@ void runSync() {
370338
return;
371339
}
372340

373-
boolean empty;
374-
375-
try {
376-
empty = q.isEmpty();
377-
} catch (Throwable ex) {
378-
Exceptions.throwIfFatal(ex);
379-
doError(a, ex);
380-
return;
381-
}
382-
383-
if (empty) {
341+
if (q.isEmpty()) {
384342
doComplete(a);
385343
return;
386344
}
@@ -450,24 +408,8 @@ void runAsync() {
450408
}
451409
}
452410

453-
if (e == r) {
454-
boolean d = done;
455-
boolean empty;
456-
try {
457-
empty = q.isEmpty();
458-
} catch (Throwable ex) {
459-
Exceptions.throwIfFatal(ex);
460-
461-
s.cancel();
462-
q.clear();
463-
464-
doError(a, ex);
465-
return;
466-
}
467-
468-
if (checkTerminated(d, empty, a)) {
469-
return;
470-
}
411+
if (e == r && checkTerminated(done, q.isEmpty(), a)) {
412+
return;
471413
}
472414

473415
int w = get();
@@ -574,7 +516,7 @@ public void onSubscribe(Subscription s) {
574516

575517
actual.onSubscribe(this);
576518

577-
initialRequest();
519+
s.request(prefetch);
578520

579521
return;
580522
}
@@ -584,7 +526,7 @@ public void onSubscribe(Subscription s) {
584526

585527
actual.onSubscribe(this);
586528

587-
initialRequest();
529+
s.request(prefetch);
588530
}
589531
}
590532

@@ -629,17 +571,7 @@ void runSync() {
629571
return;
630572
}
631573

632-
boolean empty;
633-
634-
try {
635-
empty = q.isEmpty();
636-
} catch (Throwable ex) {
637-
Exceptions.throwIfFatal(ex);
638-
doError(a, ex);
639-
return;
640-
}
641-
642-
if (empty) {
574+
if (q.isEmpty()) {
643575
doComplete(a);
644576
return;
645577
}
@@ -708,24 +640,8 @@ void runAsync() {
708640
}
709641
}
710642

711-
if (emitted == r) {
712-
boolean d = done;
713-
boolean empty;
714-
try {
715-
empty = q.isEmpty();
716-
} catch (Throwable ex) {
717-
Exceptions.throwIfFatal(ex);
718-
719-
s.cancel();
720-
q.clear();
721-
722-
doError(a, ex);
723-
return;
724-
}
725-
726-
if (checkTerminated(d, empty, a)) {
727-
return;
728-
}
643+
if (emitted == r && checkTerminated(done, q.isEmpty(), a)) {
644+
return;
729645
}
730646

731647
int w = get();

src/main/java/io/reactivex/internal/operators/flowable/FlowableScalarXMap.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@
2727
/**
2828
* Utility classes to work with scalar-sourced XMap operators (where X == { flat, concat, switch }).
2929
*/
30-
public enum FlowableScalarXMap {
31-
;
30+
public final class FlowableScalarXMap {
31+
32+
/** Utility class. */
33+
private FlowableScalarXMap() {
34+
throw new IllegalStateException("No instances!");
35+
}
3236

3337
/**
3438
* Tries to subscribe to a possibly Callable source's mapped Publisher.

src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ public R poll() throws Exception {
277277

278278
if (iter != null) {
279279
R v = ObjectHelper.requireNonNull(iter.next(), "The iterator returned a null value");
280-
if (iter.hasNext()) {
280+
if (!iter.hasNext()) {
281281
it = null;
282282
}
283283
return v;

src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public R poll() throws Exception {
219219

220220
if (iter != null) {
221221
R v = ObjectHelper.requireNonNull(iter.next(), "The iterator returned a null value");
222-
if (iter.hasNext()) {
222+
if (!iter.hasNext()) {
223223
it = null;
224224
}
225225
return v;

src/main/java/io/reactivex/internal/operators/observable/ObservableDoOnEach.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void onError(Throwable t) {
114114
onError.accept(t);
115115
} catch (Throwable e) {
116116
Exceptions.throwIfFatal(e);
117-
t = new CompositeException(e, t);
117+
t = new CompositeException(t, e);
118118
}
119119
actual.onError(t);
120120

@@ -131,7 +131,6 @@ public void onComplete() {
131131
if (done) {
132132
return;
133133
}
134-
done = true;
135134
try {
136135
onComplete.run();
137136
} catch (Throwable e) {
@@ -140,6 +139,7 @@ public void onComplete() {
140139
return;
141140
}
142141

142+
done = true;
143143
actual.onComplete();
144144

145145
try {

src/main/java/io/reactivex/internal/operators/observable/ObservableLift.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,6 @@ public ObservableLift(ObservableSource<T> source, ObservableOperator<? extends R
3636
this.operator = operator;
3737
}
3838

39-
/**
40-
* Returns the operator of this lift publisher.
41-
* @return the operator of this lift publisher
42-
*/
43-
public ObservableOperator<? extends R, ? super T> operator() {
44-
return operator;
45-
}
46-
4739
@Override
4840
public void subscribeActual(Observer<? super R> s) {
4941
Observer<? super T> observer;

0 commit comments

Comments
 (0)