Skip to content

Commit a078522

Browse files
Merge branch 'master' into idiomaticscala
2 parents b236f89 + 9ec8b0e commit a078522

22 files changed

+741
-135
lines changed

CHANGES.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
# RxJava Releases #
22

3+
### Version 0.13.4 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.13.4%22)) ###
4+
5+
* [Pull 393](https://github.com/Netflix/RxJava/pull/393) Parallel Operator & ObserveOn/ScheduledObserver Fixes
6+
* [Pull 394](https://github.com/Netflix/RxJava/pull/394) Change Interval and Sample default Scheduler
7+
* [Pull 391](https://github.com/Netflix/RxJava/pull/391) Fix OSGI support for rxjava-scala
8+
9+
### Version 0.13.3
10+
11+
* Upload to Sonatype failed so version skipped
12+
313
### Version 0.13.2 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.13.2%22)) ###
414

515
* [Pull 389](https://github.com/Netflix/RxJava/pull/389) Scala Adaptor Improvements

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.13.3-SNAPSHOT
1+
version=0.13.5-SNAPSHOT
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package rx.lang.groovy
2+
3+
import org.junit.Test
4+
5+
import rx.Observable
6+
import rx.Scheduler
7+
import rx.concurrency.Schedulers
8+
import rx.util.functions.Func1
9+
10+
class TestParallel {
11+
12+
@Test
13+
public void testParallelOperator() {
14+
Observable.range(0, 100)
15+
.parallel({
16+
it.map({ return it; })
17+
})
18+
.toBlockingObservable()
19+
.forEach({ println("T: " + it + " Thread: " + Thread.currentThread()); });
20+
}
21+
}

language-adaptors/rxjava-scala/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ jar {
4949
name = 'rxjava-scala'
5050
instruction 'Bundle-Vendor', 'Netflix'
5151
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
52-
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
52+
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,!org.scalatest.*,*'
5353
instruction 'Fragment-Host', 'com.netflix.rxjava.core'
5454
}
5555
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
176176
* Observable, and that synchronously notifies its {@link Observer}s
177177
*/
178178
def synchronize: Observable[T] = {
179-
Observable[T](JObservable.synchronize(asJava))
179+
Observable[T](asJava.synchronize)
180180
}
181181

182182
/**

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 205 additions & 40 deletions
Large diffs are not rendered by default.

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,12 +212,23 @@ public Subscription call(Scheduler scheduler, Void state) {
212212
}
213213

214214
/**
215-
* Returns the scheduler's notion of current absolute time in milliseconds.
215+
* @return the scheduler's notion of current absolute time in milliseconds.
216216
*/
217217
public long now() {
218218
return System.currentTimeMillis();
219219
}
220220

221+
/**
222+
* Parallelism available to a Scheduler.
223+
* <p>
224+
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
225+
*
226+
* @return the scheduler's available degree of parallelism.
227+
*/
228+
public int degreeOfParallelism() {
229+
return Runtime.getRuntime().availableProcessors();
230+
}
231+
221232
public static class UnitTest {
222233
@SuppressWarnings("unchecked") // mocking is unchecked, unfortunately
223234
@Test

rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
*/
1616
package rx.concurrency;
1717

18+
import java.util.concurrent.ExecutorService;
19+
import java.util.concurrent.Executors;
1820
import java.util.concurrent.ScheduledFuture;
21+
import java.util.concurrent.ThreadFactory;
1922
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicLong;
2024

2125
import rx.Scheduler;
2226
import rx.Subscription;
23-
import rx.operators.SafeObservableSubscription;
2427
import rx.subscriptions.CompositeSubscription;
2528
import rx.subscriptions.Subscriptions;
2629
import rx.util.functions.Func2;
@@ -29,27 +32,74 @@
2932
* Schedules work on a new thread.
3033
*/
3134
public class NewThreadScheduler extends Scheduler {
32-
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
35+
36+
private final static NewThreadScheduler INSTANCE = new NewThreadScheduler();
37+
private final static AtomicLong count = new AtomicLong();
3338

3439
public static NewThreadScheduler getInstance() {
3540
return INSTANCE;
3641
}
3742

38-
@Override
39-
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
40-
final SafeObservableSubscription subscription = new SafeObservableSubscription();
41-
final Scheduler _scheduler = this;
43+
private NewThreadScheduler() {
4244

43-
Thread t = new Thread(new Runnable() {
44-
@Override
45-
public void run() {
46-
subscription.wrap(action.call(_scheduler, state));
47-
}
48-
}, "RxNewThreadScheduler");
45+
}
4946

50-
t.start();
47+
private static class EventLoopScheduler extends Scheduler {
48+
private final ExecutorService executor;
5149

52-
return subscription;
50+
private EventLoopScheduler() {
51+
executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
52+
53+
@Override
54+
public Thread newThread(Runnable r) {
55+
return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet());
56+
}
57+
});
58+
}
59+
60+
@Override
61+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
62+
final Scheduler _scheduler = this;
63+
return Subscriptions.from(executor.submit(new Runnable() {
64+
65+
@Override
66+
public void run() {
67+
action.call(_scheduler, state);
68+
}
69+
}));
70+
}
71+
72+
@Override
73+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, final long delayTime, final TimeUnit unit) {
74+
// we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep
75+
// we will instead schedule the event then launch the thread after the delay has passed
76+
final Scheduler _scheduler = this;
77+
final CompositeSubscription subscription = new CompositeSubscription();
78+
ScheduledFuture<?> f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() {
79+
80+
@Override
81+
public void run() {
82+
if (!subscription.isUnsubscribed()) {
83+
// when the delay has passed we now do the work on the actual scheduler
84+
Subscription s = _scheduler.schedule(state, action);
85+
// add the subscription to the CompositeSubscription so it is unsubscribed
86+
subscription.add(s);
87+
}
88+
}
89+
}, delayTime, unit);
90+
91+
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
92+
subscription.add(Subscriptions.create(f));
93+
94+
return subscription;
95+
}
96+
97+
}
98+
99+
@Override
100+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
101+
EventLoopScheduler s = new EventLoopScheduler();
102+
return s.schedule(state, action);
53103
}
54104

55105
@Override

rxjava-core/src/main/java/rx/operators/OperationInterval.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public final class OperationInterval {
4646
* Creates an event each time interval.
4747
*/
4848
public static OnSubscribeFunc<Long> interval(long interval, TimeUnit unit) {
49-
return interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor()));
49+
return interval(interval, unit, Schedulers.threadPoolForComputation());
5050
}
5151

5252
/**

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.concurrent.CountDownLatch;
2222
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicReference;
2324

2425
import org.junit.Test;
2526
import org.mockito.InOrder;
@@ -33,6 +34,8 @@
3334
import rx.Subscription;
3435
import rx.concurrency.ImmediateScheduler;
3536
import rx.concurrency.Schedulers;
37+
import rx.subscriptions.CompositeSubscription;
38+
import rx.util.functions.Func2;
3639

3740
/**
3841
* Asynchronously notify Observers on the specified Scheduler.
@@ -60,7 +63,9 @@ public Subscription onSubscribe(final Observer<? super T> observer) {
6063
// do nothing if we request ImmediateScheduler so we don't invoke overhead
6164
return source.subscribe(observer);
6265
} else {
63-
return source.subscribe(new ScheduledObserver<T>(observer, scheduler));
66+
CompositeSubscription s = new CompositeSubscription();
67+
s.add(source.subscribe(new ScheduledObserver<T>(s, observer, scheduler)));
68+
return s;
6469
}
6570
}
6671
}

0 commit comments

Comments
 (0)