Skip to content

Commit b1d9c1c

Browse files
Fix Memory Leak in NewThreadScheduler Recursion
- the Action0 method did not have a leak - the Func2 method on inner scheduler recursion did have a leak
1 parent fdbf5ef commit b1d9c1c

File tree

1 file changed

+36
-10
lines changed

1 file changed

+36
-10
lines changed

rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import rx.Scheduler;
2626
import rx.Subscription;
2727
import rx.subscriptions.CompositeSubscription;
28+
import rx.subscriptions.MultipleAssignmentSubscription;
2829
import rx.subscriptions.Subscriptions;
30+
import rx.util.functions.Action0;
2931
import rx.util.functions.Func2;
3032

3133
/**
@@ -46,6 +48,7 @@ private NewThreadScheduler() {
4648

4749
private static class EventLoopScheduler extends Scheduler {
4850
private final ExecutorService executor;
51+
private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription();
4952

5053
private EventLoopScheduler() {
5154
executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
@@ -61,21 +64,30 @@ public Thread newThread(Runnable r) {
6164

6265
@Override
6366
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
67+
CompositeSubscription s = new CompositeSubscription();
6468
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
65-
// all subscriptions that may need to be unsubscribed
66-
final CompositeSubscription subscription = new CompositeSubscription(discardableAction);
67-
69+
s.add(discardableAction);
70+
6871
final Scheduler _scheduler = this;
69-
subscription.add(Subscriptions.from(executor.submit(new Runnable() {
72+
s.add(Subscriptions.from(executor.submit(new Runnable() {
7073

7174
@Override
7275
public void run() {
73-
Subscription s = discardableAction.call(_scheduler);
74-
subscription.add(s);
76+
discardableAction.call(_scheduler);
7577
}
7678
})));
77-
78-
return subscription;
79+
80+
// replace the EventLoopScheduler child subscription with this one
81+
childSubscription.set(s);
82+
/*
83+
* If `schedule` is run concurrently instead of recursively then we'd lose subscriptions as the `childSubscription`
84+
* only remembers the last one scheduled. However, the parent subscription will shutdown the entire EventLoopScheduler
85+
* and the ExecutorService which will terminate all outstanding tasks so this childSubscription is actually somewhat
86+
* superfluous for stopping and cleanup ... though childSubscription does ensure exactness as can be seen by
87+
* the `testUnSubscribeForScheduler()` unit test which fails if the `childSubscription` does not exist.
88+
*/
89+
90+
return childSubscription;
7991
}
8092

8193
@Override
@@ -103,12 +115,26 @@ public void run() {
103115
return subscription;
104116
}
105117

118+
private void shutdownNow() {
119+
executor.shutdownNow();
120+
}
121+
106122
}
107123

108124
@Override
109125
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
110-
EventLoopScheduler s = new EventLoopScheduler();
111-
return s.schedule(state, action);
126+
final EventLoopScheduler s = new EventLoopScheduler();
127+
CompositeSubscription cs = new CompositeSubscription();
128+
cs.add(s.schedule(state, action));
129+
cs.add(Subscriptions.create(new Action0() {
130+
131+
@Override
132+
public void call() {
133+
// shutdown the executor, all tasks queued to run and clean up resources
134+
s.shutdownNow();
135+
}
136+
}));
137+
return cs;
112138
}
113139

114140
@Override

0 commit comments

Comments
 (0)