Skip to content

Commit f4b9b5a

Browse files
Merge pull request #1380 from benjchristensen/fixes
Variety of Fixes
2 parents be7cd75 + 875cfd7 commit f4b9b5a

File tree

10 files changed

+133
-112
lines changed

10 files changed

+133
-112
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7261,7 +7261,7 @@ public final Observable<List<T>> takeLastBuffer(long time, TimeUnit unit, Schedu
72617261
* @see <a href="https://github.com/Netflix/RxJava/wiki/Conditional-and-Boolean-Operators#takeuntil">RxJava Wiki: takeUntil()</a>
72627262
*/
72637263
public final <E> Observable<T> takeUntil(Observable<? extends E> other) {
7264-
return OperatorTakeUntil.takeUntil(this, other);
7264+
return lift(new OperatorTakeUntil<T, E>(other));
72657265
}
72667266

72677267
/**

rxjava-core/src/main/java/rx/internal/operators/BufferUntilSubscriber.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,6 @@ boolean casFirst(int expected, int next) {
8181
void setObserverRef(Observer<? super T> o) {
8282
observerRef = o;
8383
}
84-
boolean casObserverRef(Observer<? super T> expected, Observer<? super T> next) {
85-
return OBSERVER_UPDATER.compareAndSet(this, expected, next);
86-
}
8784
}
8885

8986
static final class OnSubscribeAction<T> implements OnSubscribe<T> {
@@ -188,7 +185,7 @@ private void drainIfNeededAndSwitchToActual() {
188185
}
189186
// now we can safely change over to the actual and get rid of the pass-thru
190187
// but only if not unsubscribed
191-
state.casObserverRef(this, actual);
188+
state.setObserverRef(actual);
192189
}
193190

194191
}

rxjava-core/src/main/java/rx/internal/operators/NotificationLite.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,18 @@ public static <T> NotificationLite<T> instance() {
5353

5454
private static final Object ON_COMPLETED_SENTINEL = new Serializable() {
5555
private static final long serialVersionUID = 1;
56+
57+
public String toString() {
58+
return "Notification=>Completed";
59+
}
5660
};
5761

5862
private static final Object ON_NEXT_NULL_SENTINEL = new Serializable() {
5963
private static final long serialVersionUID = 2;
64+
65+
public String toString() {
66+
return "Notification=>NULL";
67+
}
6068
};
6169

6270
private static class OnErrorSentinel implements Serializable {
@@ -66,6 +74,10 @@ private static class OnErrorSentinel implements Serializable {
6674
public OnErrorSentinel(Throwable e) {
6775
this.e = e;
6876
}
77+
78+
public String toString() {
79+
return "Notification=>Error:" + e.getMessage();
80+
}
6981
}
7082

7183
/**

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeDelay.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public Observable<T> call(T x) {
5555
worker.schedule(e, delay, unit);
5656
return Observable.create(e);
5757
}
58-
})).subscribe(child);
58+
})).unsafeSubscribe(child);
5959
}
6060

6161
/**

rxjava-core/src/main/java/rx/internal/operators/OperatorTakeUntil.java

Lines changed: 44 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -16,100 +16,64 @@
1616
package rx.internal.operators;
1717

1818
import rx.Observable;
19+
import rx.Observable.Operator;
1920
import rx.Subscriber;
20-
import rx.functions.Func1;
21-
22-
import static rx.Observable.Operator;
2321

2422
/**
2523
* Returns an Observable that emits the items from the source Observable until another Observable
2624
* emits an item.
2725
* <p>
2826
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/takeUntil.png">
2927
*/
30-
public final class OperatorTakeUntil {
31-
32-
/**
33-
* Returns the values from the source observable sequence until the other observable sequence produces a value.
34-
*
35-
* @param source
36-
* the source sequence to propagate elements for.
37-
* @param other
38-
* the observable sequence that terminates propagation of elements of the source sequence.
39-
* @param <T>
40-
* the type of source.
41-
* @param <E>
42-
* the other type.
43-
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
44-
*/
45-
public static <T, E> Observable<T> takeUntil(final Observable<? extends T> source, final Observable<? extends E> other) {
46-
Observable<Object> s = source.lift(new SourceObservable<T>());
47-
Observable<Object> o = other.lift(new OtherObservable<E>());
48-
49-
Observable<Object> result = Observable.merge(s, o);
50-
51-
final NotificationLite<T> notification = NotificationLite.instance();
52-
53-
return result.takeWhile(new Func1<Object, Boolean>() {
28+
public final class OperatorTakeUntil<T, E> implements Operator<T, T> {
29+
30+
private final Observable<? extends E> other;
31+
32+
public OperatorTakeUntil(final Observable<? extends E> other) {
33+
this.other = other;
34+
}
35+
36+
@Override
37+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
38+
final Subscriber<T> parent = new Subscriber<T>(child) {
39+
40+
@Override
41+
public void onCompleted() {
42+
child.onCompleted();
43+
}
44+
45+
@Override
46+
public void onError(Throwable e) {
47+
child.onError(e);
48+
}
49+
5450
@Override
55-
public Boolean call(Object args) {
56-
return !notification.isCompleted(args);
51+
public void onNext(T t) {
52+
child.onNext(t);
5753
}
58-
}).map(new Func1<Object, T>() {
54+
55+
};
56+
57+
other.unsafeSubscribe(new Subscriber<E>(child) {
58+
59+
@Override
60+
public void onCompleted() {
61+
parent.onCompleted();
62+
}
63+
5964
@Override
60-
public T call(Object args) {
61-
return notification.getValue(args);
65+
public void onError(Throwable e) {
66+
parent.onError(e);
6267
}
68+
69+
@Override
70+
public void onNext(E t) {
71+
parent.onCompleted();
72+
}
73+
6374
});
64-
}
6575

66-
private final static class SourceObservable<T> implements Operator<Object, T> {
67-
68-
private final NotificationLite<T> notification = NotificationLite.instance();
69-
70-
@Override
71-
public Subscriber<? super T> call(final Subscriber<? super Object> subscriber) {
72-
return new Subscriber<T>(subscriber) {
73-
@Override
74-
public void onCompleted() {
75-
subscriber.onNext(notification.completed());
76-
}
77-
78-
@Override
79-
public void onError(Throwable e) {
80-
subscriber.onError(e);
81-
}
82-
83-
@Override
84-
public void onNext(T args) {
85-
subscriber.onNext(notification.next(args));
86-
}
87-
};
88-
}
76+
return parent;
8977
}
9078

91-
private final static class OtherObservable<E> implements Operator<Object, E> {
92-
93-
private final NotificationLite<E> notification = NotificationLite.instance();
94-
95-
@Override
96-
public Subscriber<? super E> call(final Subscriber<? super Object> subscriber) {
97-
return new Subscriber<E>(subscriber) {
98-
@Override
99-
public void onCompleted() {
100-
subscriber.onNext(notification.completed());
101-
}
102-
103-
@Override
104-
public void onError(Throwable e) {
105-
subscriber.onError(e);
106-
}
107-
108-
@Override
109-
public void onNext(E args) {
110-
subscriber.onNext(notification.completed());
111-
}
112-
};
113-
}
114-
}
11579
}

rxjava-core/src/main/java/rx/observers/TestSubscriber.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,12 @@ public void assertUnsubscribed() {
150150
}
151151
}
152152

153+
public void assertNoErrors() {
154+
if (getOnErrorEvents().size() > 0) {
155+
throw new AssertionError("Unexpected onError events: " + getOnErrorEvents().size(), getOnErrorEvents().get(0));
156+
}
157+
}
158+
153159
/**
154160
* @warn javadoc missing
155161
*/

rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.PriorityQueue;
1919
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.atomic.AtomicInteger;
2021
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2122

2223
import rx.Scheduler;
@@ -44,15 +45,20 @@ public Worker createWorker() {
4445
/* package accessible for unit tests */TrampolineScheduler() {
4546
}
4647

47-
private static final ThreadLocal<PriorityQueue<TimedAction>> QUEUE = new ThreadLocal<PriorityQueue<TimedAction>>();
48+
private static final ThreadLocal<PriorityQueue<TimedAction>> QUEUE = new ThreadLocal<PriorityQueue<TimedAction>>() {
49+
@Override
50+
protected PriorityQueue<TimedAction> initialValue() {
51+
return new PriorityQueue<TimedAction>();
52+
}
53+
};
4854

4955
volatile int counter;
50-
static final AtomicIntegerFieldUpdater<TrampolineScheduler> COUNTER_UPDATER
51-
= AtomicIntegerFieldUpdater.newUpdater(TrampolineScheduler.class, "counter");
56+
static final AtomicIntegerFieldUpdater<TrampolineScheduler> COUNTER_UPDATER = AtomicIntegerFieldUpdater.newUpdater(TrampolineScheduler.class, "counter");
5257

5358
private class InnerCurrentThreadScheduler extends Scheduler.Worker implements Subscription {
5459

5560
private final BooleanSubscription innerSubscription = new BooleanSubscription();
61+
private final AtomicInteger wip = new AtomicInteger();
5662

5763
@Override
5864
public Subscription schedule(Action0 action) {
@@ -71,24 +77,16 @@ private Subscription enqueue(Action0 action, long execTime) {
7177
return Subscriptions.empty();
7278
}
7379
PriorityQueue<TimedAction> queue = QUEUE.get();
74-
boolean exec = queue == null;
75-
76-
if (exec) {
77-
queue = new PriorityQueue<TimedAction>();
78-
QUEUE.set(queue);
79-
}
80-
8180
final TimedAction timedAction = new TimedAction(action, execTime, COUNTER_UPDATER.incrementAndGet(TrampolineScheduler.this));
8281
queue.add(timedAction);
8382

84-
if (exec) {
85-
while (!queue.isEmpty()) {
83+
if (wip.getAndIncrement() == 0) {
84+
do {
8685
queue.poll().action.call();
87-
}
88-
89-
QUEUE.set(null);
86+
} while (wip.decrementAndGet() > 0);
9087
return Subscriptions.empty();
9188
} else {
89+
// queue wasn't empty, a parent is already processing so we just add to the end of the queue
9290
return Subscriptions.create(new Action0() {
9391

9492
@Override
@@ -118,9 +116,9 @@ public boolean isUnsubscribed() {
118116
private static class TimedAction implements Comparable<TimedAction> {
119117
final Action0 action;
120118
final Long execTime;
121-
final Integer count; // In case if time between enqueueing took less than 1ms
119+
final int count; // In case if time between enqueueing took less than 1ms
122120

123-
private TimedAction(Action0 action, Long execTime, Integer count) {
121+
private TimedAction(Action0 action, Long execTime, int count) {
124122
this.action = action;
125123
this.execTime = execTime;
126124
this.count = count;
@@ -130,10 +128,15 @@ private TimedAction(Action0 action, Long execTime, Integer count) {
130128
public int compareTo(TimedAction that) {
131129
int result = execTime.compareTo(that.execTime);
132130
if (result == 0) {
133-
return count.compareTo(that.count);
131+
return compare(count, that.count);
134132
}
135133
return result;
136134
}
137135
}
136+
137+
// because I can't use Integer.compare from Java 7
138+
private static int compare(int x, int y) {
139+
return (x < y) ? -1 : ((x == y) ? 0 : 1);
140+
}
138141

139142
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import static org.junit.Assert.*;
19+
20+
import org.junit.Test;
21+
22+
23+
public class NotificationLiteTest {
24+
25+
@Test
26+
public void testComplete() {
27+
NotificationLite<Object> on = NotificationLite.instance();
28+
Object n = on.next("Hello");
29+
Object c = on.completed();
30+
31+
assertTrue(on.isCompleted(c));
32+
assertFalse(on.isCompleted(n));
33+
34+
assertEquals("Hello", on.getValue(n));
35+
}
36+
37+
38+
}

rxjava-core/src/test/java/rx/internal/operators/OperatorMergeMapTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import rx.Observable;
3030
import rx.Observer;
3131
import rx.exceptions.TestException;
32+
import rx.functions.Action1;
3233
import rx.functions.Func0;
3334
import rx.functions.Func1;
3435
import rx.functions.Func2;
@@ -204,9 +205,10 @@ public void testFlatMapTransformsException() {
204205
Observable<Integer> onError = Observable.from(Arrays.asList(5));
205206

206207
Observable<Integer> source = Observable.concat(
207-
Observable.from(Arrays.asList(10, 20, 30))
208-
, Observable.<Integer> error(new RuntimeException("Forced failure!"))
208+
Observable.from(Arrays.asList(10, 20, 30)),
209+
Observable.<Integer> error(new RuntimeException("Forced failure!"))
209210
);
211+
210212

211213
@SuppressWarnings("unchecked")
212214
Observer<Object> o = mock(Observer.class);

0 commit comments

Comments
 (0)