Skip to content

Commit 9e26917

Browse files
akarnokdbenjchristensen
authored andcommitted
ObserveOn Merge from @akarnokd:OperatorRepeat2
1 parent d56b1b9 commit 9e26917

File tree

6 files changed

+195
-139
lines changed

6 files changed

+195
-139
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import rx.operators.OperationDematerialize;
5050
import rx.operators.OperationDistinct;
5151
import rx.operators.OperationDistinctUntilChanged;
52-
import rx.operators.OperatorDoOnEach;
5352
import rx.operators.OperationElementAt;
5453
import rx.operators.OperationFilter;
5554
import rx.operators.OperationFinally;
@@ -63,13 +62,11 @@
6362
import rx.operators.OperationMergeDelayError;
6463
import rx.operators.OperationMinMax;
6564
import rx.operators.OperationMulticast;
66-
import rx.operators.OperationObserveOn;
6765
import rx.operators.OperationOnErrorResumeNextViaFunction;
6866
import rx.operators.OperationOnErrorResumeNextViaObservable;
6967
import rx.operators.OperationOnErrorReturn;
7068
import rx.operators.OperationOnExceptionResumeNextViaObservable;
7169
import rx.operators.OperationParallelMerge;
72-
import rx.operators.OperatorRepeat;
7370
import rx.operators.OperationReplay;
7471
import rx.operators.OperationRetry;
7572
import rx.operators.OperationSample;
@@ -96,18 +93,21 @@
9693
import rx.operators.OperationToObservableFuture;
9794
import rx.operators.OperationUsing;
9895
import rx.operators.OperationWindow;
99-
import rx.operators.OperatorSubscribeOn;
100-
import rx.operators.OperatorZip;
10196
import rx.operators.OperatorCast;
97+
import rx.operators.OperatorDoOnEach;
10298
import rx.operators.OperatorFromIterable;
10399
import rx.operators.OperatorGroupBy;
104100
import rx.operators.OperatorMap;
105101
import rx.operators.OperatorMerge;
102+
import rx.operators.OperatorObserveOn;
106103
import rx.operators.OperatorParallel;
104+
import rx.operators.OperatorRepeat;
105+
import rx.operators.OperatorSubscribeOn;
107106
import rx.operators.OperatorTake;
108107
import rx.operators.OperatorTimestamp;
109108
import rx.operators.OperatorToObservableList;
110109
import rx.operators.OperatorToObservableSortedList;
110+
import rx.operators.OperatorZip;
111111
import rx.operators.OperatorZipIterable;
112112
import rx.plugins.RxJavaObservableExecutionHook;
113113
import rx.plugins.RxJavaPlugins;
@@ -5151,7 +5151,7 @@ public final <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends
51515151
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-observeon">RxJava Wiki: observeOn()</a>
51525152
*/
51535153
public final Observable<T> observeOn(Scheduler scheduler) {
5154-
return create(OperationObserveOn.observeOn(this, scheduler));
5154+
return lift(new OperatorObserveOn<T>(scheduler));
51555155
}
51565156

51575157
/**

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 0 additions & 129 deletions
This file was deleted.
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Notification;
19+
import rx.Scheduler;
20+
import rx.Subscriber;
21+
import rx.subscriptions.CompositeSubscription;
22+
import rx.util.functions.Action0;
23+
24+
/**
25+
* Move the observation of events to another thread via Scheduler.
26+
* @param <T> the item type
27+
*/
28+
public class OperatorObserveOn<T> implements Operator<T, T> {
29+
final Scheduler scheduler;
30+
public OperatorObserveOn(Scheduler scheduler) {
31+
this.scheduler = scheduler;
32+
}
33+
34+
@Override
35+
public Subscriber<? super T> call(final Subscriber<? super T> t1) {
36+
final QueueDrain qd = new QueueDrain(t1);
37+
final CompositeSubscription csub = new CompositeSubscription();
38+
t1.add(csub);
39+
return new Subscriber<T>(t1) {
40+
/** Dispatch the notification value. */
41+
void run(final Notification<T> nt) {
42+
qd.enqueue(new Action0() {
43+
@Override
44+
public void call() {
45+
nt.accept(t1);
46+
}
47+
});
48+
qd.tryDrainAsync(scheduler, csub);
49+
}
50+
@Override
51+
public void onNext(final T args) {
52+
run(Notification.createOnNext(args));
53+
}
54+
55+
@Override
56+
public void onError(final Throwable e) {
57+
run(Notification.<T>createOnError(e));
58+
}
59+
60+
@Override
61+
public void onCompleted() {
62+
run(Notification.<T>createOnCompleted());
63+
}
64+
};
65+
}
66+
67+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.BlockingQueue;
19+
import java.util.concurrent.LinkedBlockingQueue;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
import rx.Scheduler;
22+
import rx.Subscription;
23+
import rx.subscriptions.CompositeSubscription;
24+
import rx.subscriptions.MultipleAssignmentSubscription;
25+
import rx.util.functions.Action0;
26+
import rx.util.functions.Action1;
27+
28+
/**
29+
* Action queue ensuring that only a single drain caller succeeds at a time.
30+
* This class can be used to execute work without the issues of reentrancy and
31+
* concurrency.
32+
*/
33+
public final class QueueDrain implements Runnable, Action0 {
34+
/** The number of work items. */
35+
private final AtomicInteger wip = new AtomicInteger();
36+
/** The action queue. */
37+
private final BlockingQueue<Action0> queue = new LinkedBlockingQueue<Action0>();
38+
/** The subscription to stop the queue processing. */
39+
private final Subscription k;
40+
/**
41+
* Constructor which takes a cancellation token.
42+
* @param k the cancellation token (aka subscription).
43+
*/
44+
public QueueDrain(Subscription k) {
45+
this.k = k;
46+
}
47+
/**
48+
* Enqueue an action.
49+
* To execute any queued action, call {@link #tryDrain()} or
50+
* submit this instance to a {@code Scheduler.schedule()} method.
51+
* @param action the action to enqueue, not null
52+
*/
53+
public void enqueue(Action0 action) {
54+
if (!k.isUnsubscribed()) {
55+
queue.add(action);
56+
}
57+
}
58+
/**
59+
* Try draining the queue and executing the actions in it.
60+
*/
61+
public void tryDrain() {
62+
if (wip.incrementAndGet() > 1 || k.isUnsubscribed()) {
63+
return;
64+
}
65+
do {
66+
queue.poll().call();
67+
} while (wip.decrementAndGet() > 0 && !k.isUnsubscribed());
68+
}
69+
/**
70+
* Try draining the queue on the given scheduler.
71+
* The method ensures that only one thread is actively draining the
72+
* queue on the given scheduler.
73+
* @param scheduler the scheduler where the draining should happen
74+
* @param cs the composite subscription to track the schedule
75+
*/
76+
public void tryDrainAsync(Scheduler scheduler, final CompositeSubscription cs) {
77+
if (cs.isUnsubscribed() || wip.incrementAndGet() > 1) {
78+
return;
79+
}
80+
// add tracking subscription only if schedule is run to avoid overfilling cs
81+
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
82+
cs.add(mas);
83+
mas.set(scheduler.schedule(new Action1<Scheduler.Inner>() {
84+
@Override
85+
public void call(Scheduler.Inner o) {
86+
if (!cs.isUnsubscribed()) {
87+
do {
88+
queue.poll().call();
89+
} while (wip.decrementAndGet() > 0 && !cs.isUnsubscribed());
90+
}
91+
cs.remove(mas);
92+
}
93+
}));
94+
}
95+
@Override
96+
public void run() {
97+
// to help the draining of the queue on a ThreadPool/Scheduler
98+
tryDrain();
99+
}
100+
101+
@Override
102+
public void call() {
103+
tryDrain();
104+
}
105+
106+
}

rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import static org.junit.Assert.*;
1919
import static org.mockito.Matchers.*;
2020
import static org.mockito.Mockito.*;
21-
import static rx.operators.OperationObserveOn.*;
2221

2322
import java.util.concurrent.CountDownLatch;
2423
import java.util.concurrent.TimeUnit;
@@ -37,7 +36,7 @@
3736
import rx.util.functions.Action1;
3837
import rx.util.functions.Func1;
3938

40-
public class OperationObserveOnTest {
39+
public class OperatorObserveOnTest {
4140

4241
/**
4342
* This is testing a no-op path since it uses Schedulers.immediate() which will not do scheduling.
@@ -46,7 +45,7 @@ public class OperationObserveOnTest {
4645
@SuppressWarnings("unchecked")
4746
public void testObserveOn() {
4847
Observer<Integer> observer = mock(Observer.class);
49-
Observable.create(observeOn(Observable.from(1, 2, 3), Schedulers.immediate())).subscribe(observer);
48+
Observable.from(1, 2, 3).observeOn(Schedulers.immediate()).subscribe(observer);
5049

5150
verify(observer, times(1)).onNext(1);
5251
verify(observer, times(1)).onNext(2);

0 commit comments

Comments
 (0)