Skip to content

Commit a252dca

Browse files
Merge pull request #631 from benjchristensen/NewThreadScheduler-Daemon
Make NewThreadScheduler create Daemon threads
2 parents 5f4ca71 + be9841a commit a252dca

File tree

2 files changed

+10
-24
lines changed

2 files changed

+10
-24
lines changed

rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ private EventLoopScheduler() {
5252

5353
@Override
5454
public Thread newThread(Runnable r) {
55-
return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet());
55+
Thread t = new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet());
56+
t.setDaemon(true);
57+
return t;
5658
}
5759
});
5860
}

rxjava-core/src/test/java/rx/operators/OperationParallelMergeTest.java

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -58,24 +58,8 @@ public void testParallelMerge() {
5858

5959
@Test
6060
public void testNumberOfThreads() {
61-
final ConcurrentHashMap<String, String> threads = new ConcurrentHashMap<String, String>();
62-
Observable.merge(getStreams())
63-
.toBlockingObservable().forEach(new Action1<String>() {
64-
65-
@Override
66-
public void call(String o) {
67-
System.out.println("o: " + o + " Thread: " + Thread.currentThread());
68-
threads.put(Thread.currentThread().getName(), Thread.currentThread().getName());
69-
}
70-
});
71-
72-
// without injecting anything, the getStream() method uses Interval which runs on a default scheduler
73-
assertEquals(Runtime.getRuntime().availableProcessors(), threads.keySet().size());
74-
75-
// clear
76-
threads.clear();
77-
78-
// now we parallelMerge into 3 streams and observeOn for each
61+
final ConcurrentHashMap<Long, Long> threads = new ConcurrentHashMap<Long, Long>();
62+
// parallelMerge into 3 streams and observeOn for each
7963
// we expect 3 threads in the output
8064
OperationParallelMerge.parallelMerge(getStreams(), 3)
8165
.flatMap(new Func1<Observable<String>, Observable<String>>() {
@@ -90,8 +74,8 @@ public Observable<String> call(Observable<String> o) {
9074

9175
@Override
9276
public void call(String o) {
93-
System.out.println("o: " + o + " Thread: " + Thread.currentThread());
94-
threads.put(Thread.currentThread().getName(), Thread.currentThread().getName());
77+
System.out.println("o: " + o + " Thread: " + Thread.currentThread().getId());
78+
threads.put(Thread.currentThread().getId(), Thread.currentThread().getId());
9579
}
9680
});
9781

@@ -100,7 +84,7 @@ public void call(String o) {
10084

10185
@Test
10286
public void testNumberOfThreadsOnScheduledMerge() {
103-
final ConcurrentHashMap<String, String> threads = new ConcurrentHashMap<String, String>();
87+
final ConcurrentHashMap<Long, Long> threads = new ConcurrentHashMap<Long, Long>();
10488

10589
// now we parallelMerge into 3 streams and observeOn for each
10690
// we expect 3 threads in the output
@@ -109,8 +93,8 @@ public void testNumberOfThreadsOnScheduledMerge() {
10993

11094
@Override
11195
public void call(String o) {
112-
System.out.println("o: " + o + " Thread: " + Thread.currentThread());
113-
threads.put(Thread.currentThread().getName(), Thread.currentThread().getName());
96+
System.out.println("o: " + o + " Thread: " + Thread.currentThread().getId());
97+
threads.put(Thread.currentThread().getId(), Thread.currentThread().getId());
11498
}
11599
});
116100

0 commit comments

Comments
 (0)