Skip to content

Commit 9a94fd2

Browse files
CurrentThreadScheduler Memory Leak Fixed
- Current/Immediate/NewThread/Executor Schedulers are passing unit tests - Current/NewThread/Executor Schedulers do not leak memory on the recursion test (Immediate can’t be used for recursion otherwise it stack overflows)
1 parent 5b2dab3 commit 9a94fd2

File tree

5 files changed

+234
-72
lines changed

5 files changed

+234
-72
lines changed

rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,20 @@
1717

1818
import java.util.PriorityQueue;
1919
import java.util.concurrent.TimeUnit;
20-
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.concurrent.atomic.AtomicLong;
2121

2222
import rx.Scheduler;
2323
import rx.Subscription;
24+
import rx.subscriptions.CompositeSubscription;
25+
import rx.subscriptions.MultipleAssignmentSubscription;
2426
import rx.util.functions.Func2;
2527

2628
/**
2729
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed.
2830
*/
2931
public class CurrentThreadScheduler extends Scheduler {
3032
private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler();
33+
private static final AtomicLong counter = new AtomicLong(0);
3134

3235
public static CurrentThreadScheduler getInstance() {
3336
return INSTANCE;
@@ -38,25 +41,27 @@ public static CurrentThreadScheduler getInstance() {
3841
/* package accessible for unit tests */CurrentThreadScheduler() {
3942
}
4043

41-
private final AtomicInteger counter = new AtomicInteger(0);
42-
4344
@Override
4445
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
46+
// immediately move to the InnerCurrentThreadScheduler
47+
InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler();
4548
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
46-
enqueue(discardableAction, now());
47-
return discardableAction;
49+
enqueue(innerScheduler, discardableAction, now());
50+
return innerScheduler;
4851
}
4952

5053
@Override
5154
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long dueTime, TimeUnit unit) {
5255
long execTime = now() + unit.toMillis(dueTime);
5356

57+
// immediately move to the InnerCurrentThreadScheduler
58+
InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler();
5459
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, new SleepingAction<T>(action, this, execTime));
55-
enqueue(discardableAction, execTime);
60+
enqueue(innerScheduler, discardableAction, execTime);
5661
return discardableAction;
5762
}
5863

59-
private void enqueue(DiscardableAction<?> action, long execTime) {
64+
private static void enqueue(Scheduler scheduler, DiscardableAction<?> action, long execTime) {
6065
PriorityQueue<TimedAction> queue = QUEUE.get();
6166
boolean exec = queue == null;
6267

@@ -69,19 +74,50 @@ private void enqueue(DiscardableAction<?> action, long execTime) {
6974

7075
if (exec) {
7176
while (!queue.isEmpty()) {
72-
queue.poll().action.call(this);
77+
queue.poll().action.call(scheduler);
7378
}
7479

7580
QUEUE.set(null);
7681
}
7782
}
7883

84+
private static class InnerCurrentThreadScheduler extends Scheduler implements Subscription {
85+
private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription();
86+
87+
@Override
88+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
89+
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
90+
childSubscription.set(discardableAction);
91+
enqueue(this, discardableAction, now());
92+
return childSubscription;
93+
}
94+
95+
@Override
96+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
97+
long execTime = now() + unit.toMillis(delayTime);
98+
99+
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
100+
childSubscription.set(discardableAction);
101+
enqueue(this, discardableAction, execTime);
102+
return childSubscription;
103+
}
104+
105+
@Override
106+
public void unsubscribe() {
107+
childSubscription.unsubscribe();
108+
}
109+
110+
}
111+
112+
/**
113+
* Use time to sort items so delayed actions are sorted to their appropriate position in the queue.
114+
*/
79115
private static class TimedAction implements Comparable<TimedAction> {
80116
final DiscardableAction<?> action;
81117
final Long execTime;
82-
final Integer count; // In case if time between enqueueing took less than 1ms
118+
final Long count; // In case if time between enqueueing took less than 1ms
83119

84-
private TimedAction(DiscardableAction<?> action, Long execTime, Integer count) {
120+
private TimedAction(DiscardableAction<?> action, Long execTime, Long count) {
85121
this.action = action;
86122
this.execTime = execTime;
87123
this.count = count;

rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88

99
import org.junit.Test;
1010

11+
import rx.Observable;
12+
import rx.Observer;
1113
import rx.Scheduler;
1214
import rx.Subscription;
1315
import rx.operators.SafeObservableSubscription;
14-
import rx.subscriptions.CompositeSubscription;
1516
import rx.subscriptions.Subscriptions;
1617
import rx.util.functions.Action0;
1718
import rx.util.functions.Action1;
19+
import rx.util.functions.Func1;
1820
import rx.util.functions.Func2;
1921

2022
/**
@@ -26,6 +28,55 @@
2628
*/
2729
public abstract class AbstractSchedulerConcurrencyTests extends AbstractSchedulerTests {
2830

31+
/**
32+
* Bug report: https://github.com/Netflix/RxJava/issues/431
33+
*/
34+
@Test
35+
public final void testUnSubscribeForScheduler() throws InterruptedException {
36+
final AtomicInteger countReceived = new AtomicInteger();
37+
final AtomicInteger countGenerated = new AtomicInteger();
38+
final SafeObservableSubscription s = new SafeObservableSubscription();
39+
final CountDownLatch latch = new CountDownLatch(1);
40+
41+
s.wrap(Observable.interval(50, TimeUnit.MILLISECONDS)
42+
.map(new Func1<Long, Long>() {
43+
@Override
44+
public Long call(Long aLong) {
45+
countGenerated.incrementAndGet();
46+
return aLong;
47+
}
48+
})
49+
.subscribeOn(getScheduler())
50+
.observeOn(getScheduler())
51+
.subscribe(new Observer<Long>() {
52+
@Override
53+
public void onCompleted() {
54+
System.out.println("--- completed");
55+
}
56+
57+
@Override
58+
public void onError(Throwable e) {
59+
System.out.println("--- onError");
60+
}
61+
62+
@Override
63+
public void onNext(Long args) {
64+
if (countReceived.incrementAndGet() == 2) {
65+
s.unsubscribe();
66+
latch.countDown();
67+
}
68+
System.out.println("==> Received " + args);
69+
}
70+
}));
71+
72+
latch.await(1000, TimeUnit.MILLISECONDS);
73+
74+
System.out.println("----------- it thinks it is finished ------------------ ");
75+
Thread.sleep(100);
76+
77+
assertEquals(2, countGenerated.get());
78+
}
79+
2980
@Test
3081
public void testUnsubscribeRecursiveScheduleWithStateAndFunc2() throws InterruptedException {
3182
final CountDownLatch latch = new CountDownLatch(1);

0 commit comments

Comments
 (0)