Skip to content

Commit 5b2dab3

Browse files
ExecutorScheduler Memory Leak Fix
- new InnerExecutorScheduler and childSubscription - improvements to unit tests
1 parent 9f35594 commit 5b2dab3

File tree

4 files changed

+162
-68
lines changed

4 files changed

+162
-68
lines changed

rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java

Lines changed: 123 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import rx.Scheduler;
2626
import rx.Subscription;
2727
import rx.subscriptions.CompositeSubscription;
28+
import rx.subscriptions.MultipleAssignmentSubscription;
2829
import rx.subscriptions.Subscriptions;
2930
import rx.util.functions.Func2;
3031

@@ -68,19 +69,18 @@ public void run() {
6869
@Override
6970
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
7071
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
71-
final Scheduler _scheduler = this;
72+
final InnerExecutorScheduler _scheduler = new InnerExecutorScheduler(executor);
73+
7274
// all subscriptions that may need to be unsubscribed
73-
final CompositeSubscription subscription = new CompositeSubscription(discardableAction);
75+
final CompositeSubscription subscription = new CompositeSubscription(discardableAction, _scheduler);
7476

7577
if (executor instanceof ScheduledExecutorService) {
7678
// we are a ScheduledExecutorService so can do proper scheduling
7779
ScheduledFuture<?> f = ((ScheduledExecutorService) executor).schedule(new Runnable() {
7880
@Override
7981
public void run() {
8082
// when the delay has passed we now do the work on the actual scheduler
81-
Subscription s = discardableAction.call(_scheduler);
82-
// add the subscription to the CompositeSubscription so it is unsubscribed
83-
subscription.add(s);
83+
discardableAction.call(_scheduler);
8484
}
8585
}, delayTime, unit);
8686
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
@@ -89,9 +89,7 @@ public void run() {
8989
// we are not a ScheduledExecutorService so can't directly schedule
9090
if (delayTime == 0) {
9191
// no delay so put on the thread-pool right now
92-
Subscription s = schedule(state, action);
93-
// add the subscription to the CompositeSubscription so it is unsubscribed
94-
subscription.add(s);
92+
return schedule(state, action);
9593
} else {
9694
// there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService
9795
// to handle the scheduling and once it's ready then execute on this Executor
@@ -100,9 +98,7 @@ public void run() {
10098
@Override
10199
public void run() {
102100
// now execute on the real Executor (by using the other overload that schedules for immediate execution)
103-
Subscription s = _scheduler.schedule(state, action);
104-
// add the subscription to the CompositeSubscription so it is unsubscribed
105-
subscription.add(s);
101+
_scheduler.schedule(state, action);
106102
}
107103
}, delayTime, unit);
108104
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
@@ -114,33 +110,138 @@ public void run() {
114110

115111
@Override
116112
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
113+
CompositeSubscription s = new CompositeSubscription();
117114
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
118-
final Scheduler _scheduler = this;
119-
// all subscriptions that may need to be unsubscribed
120-
final CompositeSubscription subscription = new CompositeSubscription(discardableAction);
115+
s.add(discardableAction);
116+
117+
final InnerExecutorScheduler _scheduler = new InnerExecutorScheduler(executor);
118+
s.add(_scheduler);
121119

122-
// work to be done on a thread
123-
Runnable r = new Runnable() {
120+
s.add(execute(executor, new Runnable() {
124121
@Override
125122
public void run() {
126-
Subscription s = discardableAction.call(_scheduler);
127-
// add the subscription to the CompositeSubscription so it is unsubscribed
128-
subscription.add(s);
123+
discardableAction.call(_scheduler);
129124
}
130-
};
125+
}));
126+
127+
return s;
128+
}
131129

130+
/**
131+
* Execute on the given Executor and retrieve a Subscription
132+
*
133+
* @param executor
134+
* @param r
135+
* @return
136+
*/
137+
private static Subscription execute(Executor executor, Runnable r) {
132138
// submit for immediate execution
133139
if (executor instanceof ExecutorService) {
134140
// we are an ExecutorService so get a Future back that supports unsubscribe
135141
Future<?> f = ((ExecutorService) executor).submit(r);
136142
// add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens
137-
subscription.add(Subscriptions.from(f));
143+
return Subscriptions.from(f);
138144
} else {
139145
// we are the lowest common denominator so can't unsubscribe once we execute
140146
executor.execute(r);
147+
return Subscriptions.empty();
141148
}
149+
}
142150

143-
return subscription;
151+
private static class InnerExecutorScheduler extends Scheduler implements Subscription {
152+
153+
private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription();
154+
private final Executor executor;
155+
156+
InnerExecutorScheduler(Executor executor) {
157+
this.executor = executor;
158+
}
159+
160+
@Override
161+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
162+
if(childSubscription.isUnsubscribed()) {
163+
return childSubscription;
164+
}
165+
166+
CompositeSubscription s = new CompositeSubscription();
167+
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
168+
s.add(discardableAction);
169+
170+
final Scheduler _scheduler = this;
171+
172+
s.add(execute(executor, new Runnable() {
173+
174+
@Override
175+
public void run() {
176+
discardableAction.call(_scheduler);
177+
}
178+
}));
179+
180+
// replace the InnerExecutorScheduler child subscription with this one
181+
childSubscription.set(s);
182+
/*
183+
* TODO: Consider what will happen if `schedule` is run concurrently instead of recursively
184+
* and we lose subscriptions as the `childSubscription` only remembers the last one scheduled.
185+
*
186+
* Not obvious that this should ever happen. Can it?
187+
*
188+
* benjchristensen => Haven't been able to come up with a valid test case to prove this as an issue
189+
* so it may not be.
190+
*/
191+
192+
return childSubscription;
193+
}
194+
195+
@Override
196+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
197+
if(childSubscription.isUnsubscribed()) {
198+
return childSubscription;
199+
}
200+
201+
CompositeSubscription s = new CompositeSubscription();
202+
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
203+
s.add(discardableAction);
204+
205+
final Scheduler _scheduler = this;
206+
207+
if (executor instanceof ScheduledExecutorService) {
208+
// we are a ScheduledExecutorService so can do proper scheduling
209+
ScheduledFuture<?> f = ((ScheduledExecutorService) executor).schedule(new Runnable() {
210+
@Override
211+
public void run() {
212+
// when the delay has passed we now do the work on the actual scheduler
213+
discardableAction.call(_scheduler);
214+
}
215+
}, delayTime, unit);
216+
// replace the InnerExecutorScheduler child subscription with this one
217+
childSubscription.set(Subscriptions.from(f));
218+
} else {
219+
// we are not a ScheduledExecutorService so can't directly schedule
220+
if (delayTime == 0) {
221+
// no delay so put on the thread-pool right now
222+
return schedule(state, action);
223+
} else {
224+
// there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService
225+
// to handle the scheduling and once it's ready then execute on this Executor
226+
ScheduledFuture<?> f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() {
227+
228+
@Override
229+
public void run() {
230+
// now execute on the real Executor (by using the other overload that schedules for immediate execution)
231+
_scheduler.schedule(state, action);
232+
}
233+
}, delayTime, unit);
234+
// replace the InnerExecutorScheduler child subscription with this one
235+
childSubscription.set(Subscriptions.from(f));
236+
}
237+
}
238+
return childSubscription;
239+
}
240+
241+
@Override
242+
public void unsubscribe() {
243+
childSubscription.unsubscribe();
244+
}
144245

145246
}
146247

rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ public Thread newThread(Runnable r) {
6464

6565
@Override
6666
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
67+
if (childSubscription.isUnsubscribed()) {
68+
return childSubscription;
69+
}
70+
6771
CompositeSubscription s = new CompositeSubscription();
6872
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
6973
s.add(discardableAction);
@@ -92,6 +96,10 @@ public void run() {
9296

9397
@Override
9498
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, final long delayTime, final TimeUnit unit) {
99+
if (childSubscription.isUnsubscribed()) {
100+
return childSubscription;
101+
}
102+
95103
// we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep
96104
// we will instead schedule the event then launch the thread after the delay has passed
97105
final Scheduler _scheduler = this;

rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java

Lines changed: 28 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,14 @@
33
import static org.junit.Assert.*;
44

55
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.TimeUnit;
67
import java.util.concurrent.atomic.AtomicInteger;
78

89
import org.junit.Test;
910

10-
import rx.Observable;
11-
import rx.Observable.OnSubscribeFunc;
12-
import rx.Observer;
1311
import rx.Scheduler;
1412
import rx.Subscription;
1513
import rx.operators.SafeObservableSubscription;
16-
import rx.subscriptions.BooleanSubscription;
1714
import rx.subscriptions.CompositeSubscription;
1815
import rx.subscriptions.Subscriptions;
1916
import rx.util.functions.Action0;
@@ -34,12 +31,11 @@ public void testUnsubscribeRecursiveScheduleWithStateAndFunc2() throws Interrupt
3431
final CountDownLatch latch = new CountDownLatch(1);
3532
final CountDownLatch unsubscribeLatch = new CountDownLatch(1);
3633
final AtomicInteger counter = new AtomicInteger();
37-
Subscription s = getScheduler().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
34+
Subscription s = getScheduler().schedule(1L, new Func2<Scheduler, Long, Subscription>() {
3835

3936
@Override
4037
public Subscription call(Scheduler innerScheduler, Long i) {
41-
i++;
42-
// System.out.println("i: " + i);
38+
System.out.println("Run: " + i);
4339
if (i == 10) {
4440
latch.countDown();
4541
try {
@@ -54,7 +50,7 @@ public Subscription call(Scheduler innerScheduler, Long i) {
5450
}
5551

5652
counter.incrementAndGet();
57-
return innerScheduler.schedule(i, this);
53+
return innerScheduler.schedule(i + 1, this);
5854
}
5955
});
6056

@@ -66,62 +62,51 @@ public Subscription call(Scheduler innerScheduler, Long i) {
6662
}
6763

6864
@Test
69-
public void scheduleMultipleTasksOnOuterForSequentialExecution() throws InterruptedException {
70-
final AtomicInteger countExecutions = new AtomicInteger();
65+
public void testUnsubscribeRecursiveScheduleWithStateAndFunc2AndDelay() throws InterruptedException {
7166
final CountDownLatch latch = new CountDownLatch(1);
72-
final SafeObservableSubscription outerSubscription = new SafeObservableSubscription();
73-
final Func2<Scheduler, Long, Subscription> fInner = new Func2<Scheduler, Long, Subscription>() {
67+
final CountDownLatch unsubscribeLatch = new CountDownLatch(1);
68+
final AtomicInteger counter = new AtomicInteger();
69+
Subscription s = getScheduler().schedule(1L, new Func2<Scheduler, Long, Subscription>() {
7470

7571
@Override
7672
public Subscription call(Scheduler innerScheduler, Long i) {
77-
countExecutions.incrementAndGet();
78-
i++;
79-
System.out.println("i: " + i);
80-
if (i == 1000) {
81-
outerSubscription.unsubscribe();
82-
latch.countDown();
83-
}
84-
if (i < 10000) {
85-
return innerScheduler.schedule(i, this);
86-
} else {
73+
if (i == 10) {
8774
latch.countDown();
88-
return Subscriptions.empty();
75+
try {
76+
// wait for unsubscribe to finish so we are not racing it
77+
unsubscribeLatch.await();
78+
} catch (InterruptedException e) {
79+
// we expect the countDown if unsubscribe is not working
80+
// or to be interrupted if unsubscribe is successful since
81+
// the unsubscribe will interrupt it as it is calling Future.cancel(true)
82+
// so we will ignore the stacktrace
83+
}
8984
}
90-
}
91-
};
92-
93-
Func2<Scheduler, Long, Subscription> fOuter = new Func2<Scheduler, Long, Subscription>() {
9485

95-
@Override
96-
public Subscription call(Scheduler innerScheduler, Long i) {
97-
CompositeSubscription s = new CompositeSubscription();
98-
s.add(innerScheduler.schedule(i, fInner));
99-
s.add(innerScheduler.schedule(i, fInner));
100-
return s;
86+
counter.incrementAndGet();
87+
return innerScheduler.schedule(i + 1, this, 10, TimeUnit.MILLISECONDS);
10188
}
102-
};
89+
}, 10, TimeUnit.MILLISECONDS);
10390

104-
outerSubscription.wrap(getScheduler().schedule(0L, fOuter));
10591
latch.await();
92+
s.unsubscribe();
93+
unsubscribeLatch.countDown();
10694
Thread.sleep(200); // let time pass to see if the scheduler is still doing work
107-
System.out.println("Count: " + countExecutions.get());
108-
// we unsubscribe on first to 1000 so we hit 1999 instead of 2000
109-
assertEquals(1999, countExecutions.get());
95+
assertEquals(10, counter.get());
11096
}
11197

11298
@Test(timeout = 8000)
11399
public void recursionUsingFunc2() throws InterruptedException {
114100
final CountDownLatch latch = new CountDownLatch(1);
115-
getScheduler().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
101+
getScheduler().schedule(1L, new Func2<Scheduler, Long, Subscription>() {
116102

117103
@Override
118104
public Subscription call(Scheduler innerScheduler, Long i) {
119-
i++;
120105
if (i % 100000 == 0) {
121106
System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory());
122107
}
123-
if (i < 5000000L) {
124-
return innerScheduler.schedule(i, this);
108+
if (i < 1000000L) {
109+
return innerScheduler.schedule(i + 1, this);
125110
} else {
126111
latch.countDown();
127112
return Subscriptions.empty();
@@ -145,7 +130,7 @@ public void call(Action0 self) {
145130
if (i % 100000 == 0) {
146131
System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory());
147132
}
148-
if (i < 5000000L) {
133+
if (i < 1000000L) {
149134
self.call();
150135
} else {
151136
latch.countDown();

rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ public class TestRecursionMemoryUsage {
1919
public static void main(String args[]) {
2020
usingFunc2(Schedulers.newThread());
2121
usingAction0(Schedulers.newThread());
22-
//
22+
2323
// usingFunc2(Schedulers.currentThread());
2424
// usingAction0(Schedulers.currentThread());
2525

26-
// usingFunc2(Schedulers.threadPoolForComputation());
27-
// usingAction0(Schedulers.threadPoolForComputation());
26+
usingFunc2(Schedulers.threadPoolForComputation());
27+
usingAction0(Schedulers.threadPoolForComputation());
2828
}
2929

3030
protected static void usingFunc2(final Scheduler scheduler) {

0 commit comments

Comments
 (0)