Skip to content

Commit 18def88

Browse files
committed
Merge pull request #2465 from akarnokd/SchedulerPurgeForJDK6
ScheduledExecutorService: call purge periodically on JDK 6 to avoid
2 parents 82f5d06 + b5aaf82 commit 18def88

File tree

3 files changed

+157
-66
lines changed

3 files changed

+157
-66
lines changed

src/main/java/rx/internal/schedulers/NewThreadWorker.java

Lines changed: 105 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
package rx.internal.schedulers;
1717

1818
import java.lang.reflect.Method;
19+
import java.util.Iterator;
1920
import java.util.concurrent.*;
21+
import java.util.concurrent.atomic.AtomicReference;
2022

2123
import rx.*;
24+
import rx.exceptions.Exceptions;
2225
import rx.functions.Action0;
26+
import rx.internal.util.RxThreadFactory;
2327
import rx.plugins.*;
2428
import rx.subscriptions.Subscriptions;
2529

@@ -30,24 +34,111 @@ public class NewThreadWorker extends Scheduler.Worker implements Subscription {
3034
private final ScheduledExecutorService executor;
3135
private final RxJavaSchedulersHook schedulersHook;
3236
volatile boolean isUnsubscribed;
33-
37+
/** The purge frequency in milliseconds. */
38+
private static final String FREQUENCY_KEY = "io.reactivex.rxjava.scheduler.jdk6.purge-frequency-millis";
39+
/** Force the use of purge (true/false). */
40+
private static final String PURGE_FORCE_KEY = "io.reactivex.rxjava.scheduler.jdk6.purge-force";
41+
private static final String PURGE_THREAD_PREFIX = "RxSchedulerPurge-";
42+
/** Forces the use of purge even if setRemoveOnCancelPolicy is available. */
43+
private static final boolean PURGE_FORCE;
44+
/** The purge frequency in milliseconds. */
45+
public static final int PURGE_FREQUENCY;
46+
private static final ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> EXECUTORS;
47+
private static final AtomicReference<ScheduledExecutorService> PURGE;
48+
static {
49+
EXECUTORS = new ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor>();
50+
PURGE = new AtomicReference<ScheduledExecutorService>();
51+
PURGE_FORCE = Boolean.getBoolean(PURGE_FORCE_KEY);
52+
PURGE_FREQUENCY = Integer.getInteger(FREQUENCY_KEY, 1000);
53+
}
54+
/**
55+
* Registers the given executor service and starts the purge thread if not already started.
56+
* <p>{@code public} visibility reason: called from other package(s) within RxJava
57+
* @param service a scheduled thread pool executor instance
58+
*/
59+
public static void registerExecutor(ScheduledThreadPoolExecutor service) {
60+
do {
61+
ScheduledExecutorService exec = PURGE.get();
62+
if (exec != null) {
63+
break;
64+
}
65+
exec = Executors.newScheduledThreadPool(1, new RxThreadFactory(PURGE_THREAD_PREFIX));
66+
if (PURGE.compareAndSet(null, exec)) {
67+
exec.scheduleAtFixedRate(new Runnable() {
68+
@Override
69+
public void run() {
70+
purgeExecutors();
71+
}
72+
}, PURGE_FREQUENCY, PURGE_FREQUENCY, TimeUnit.MILLISECONDS);
73+
74+
break;
75+
}
76+
} while (true);
77+
78+
EXECUTORS.putIfAbsent(service, service);
79+
}
80+
/**
81+
* Deregisters the executor service.
82+
* <p>{@code public} visibility reason: called from other package(s) within RxJava
83+
* @param service a scheduled thread pool executor instance
84+
*/
85+
public static void deregisterExecutor(ScheduledExecutorService service) {
86+
EXECUTORS.remove(service);
87+
}
88+
/** Purges each registered executor and eagerly evicts shutdown executors. */
89+
static void purgeExecutors() {
90+
try {
91+
Iterator<ScheduledThreadPoolExecutor> it = EXECUTORS.keySet().iterator();
92+
while (it.hasNext()) {
93+
ScheduledThreadPoolExecutor exec = it.next();
94+
if (!exec.isShutdown()) {
95+
exec.purge();
96+
} else {
97+
it.remove();
98+
}
99+
}
100+
} catch (Throwable t) {
101+
Exceptions.throwIfFatal(t);
102+
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
103+
}
104+
}
105+
106+
/**
107+
* Tries to enable the Java 7+ setRemoveOnCancelPolicy.
108+
* <p>{@code public} visibility reason: called from other package(s) within RxJava.
109+
* If the method returns false, the {@link #registerExecutor(ScheduledThreadPoolExecutor)} may
110+
* be called to enable the backup option of purging the executors.
111+
* @param exec the executor to call setRemoveOnCaneclPolicy if available.
112+
* @return true if the policy was successfully enabled
113+
*/
114+
public static boolean tryEnableCancelPolicy(ScheduledExecutorService exec) {
115+
if (!PURGE_FORCE) {
116+
for (Method m : exec.getClass().getMethods()) {
117+
if (m.getName().equals("setRemoveOnCancelPolicy")
118+
&& m.getParameterTypes().length == 1
119+
&& m.getParameterTypes()[0] == Boolean.TYPE) {
120+
try {
121+
m.invoke(exec, true);
122+
return true;
123+
} catch (Exception ex) {
124+
RxJavaPlugins.getInstance().getErrorHandler().handleError(ex);
125+
}
126+
}
127+
}
128+
}
129+
return false;
130+
}
131+
34132
/* package */
35133
public NewThreadWorker(ThreadFactory threadFactory) {
36-
executor = Executors.newScheduledThreadPool(1, threadFactory);
134+
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
37135
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
38-
for (Method m : executor.getClass().getMethods()) {
39-
if (m.getName().equals("setRemoveOnCancelPolicy")
40-
&& m.getParameterTypes().length == 1
41-
&& m.getParameterTypes()[0] == Boolean.TYPE) {
42-
try {
43-
m.invoke(executor, true);
44-
} catch (Exception ex) {
45-
RxJavaPlugins.getInstance().getErrorHandler().handleError(ex);
46-
}
47-
break;
48-
}
136+
boolean cancelSupported = tryEnableCancelPolicy(exec);
137+
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
138+
registerExecutor((ScheduledThreadPoolExecutor)exec);
49139
}
50140
schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
141+
executor = exec;
51142
}
52143

53144
@Override
@@ -88,6 +179,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
88179
public void unsubscribe() {
89180
isUnsubscribed = true;
90181
executor.shutdownNow();
182+
deregisterExecutor(executor);
91183
}
92184

93185
@Override

src/main/java/rx/schedulers/GenericScheduledExecutorService.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@
1616
package rx.schedulers;
1717

1818
import rx.Scheduler;
19+
import rx.internal.schedulers.NewThreadWorker;
1920
import rx.internal.util.RxThreadFactory;
2021

21-
import java.util.concurrent.Executor;
22-
import java.util.concurrent.ExecutorService;
23-
import java.util.concurrent.Executors;
24-
import java.util.concurrent.ScheduledExecutorService;
22+
import java.util.concurrent.*;
2523

2624
/**
2725
* A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability.
@@ -49,7 +47,13 @@ private GenericScheduledExecutorService() {
4947
if (count > 8) {
5048
count = 8;
5149
}
52-
executor = Executors.newScheduledThreadPool(count, THREAD_FACTORY);
50+
ScheduledExecutorService exec = Executors.newScheduledThreadPool(count, THREAD_FACTORY);
51+
if (!NewThreadWorker.tryEnableCancelPolicy(exec)) {
52+
if (exec instanceof ScheduledThreadPoolExecutor) {
53+
NewThreadWorker.registerExecutor((ScheduledThreadPoolExecutor)exec);
54+
}
55+
}
56+
executor = exec;
5357
}
5458

5559
/**

src/test/java/rx/schedulers/CachedThreadSchedulerTest.java

Lines changed: 43 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import rx.Observable;
2727
import rx.Scheduler;
2828
import rx.functions.*;
29+
import rx.internal.schedulers.NewThreadWorker;
2930
import static org.junit.Assert.assertTrue;
3031

3132
public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {
@@ -73,55 +74,49 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru
7374

7475
@Test(timeout = 30000)
7576
public void testCancelledTaskRetention() throws InterruptedException {
76-
try {
77-
ScheduledThreadPoolExecutor.class.getMethod("setRemoveOnCancelPolicy", Boolean.TYPE);
78-
79-
System.out.println("Wait before GC");
80-
Thread.sleep(1000);
81-
82-
System.out.println("GC");
83-
System.gc();
84-
85-
Thread.sleep(1000);
86-
87-
88-
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
89-
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
90-
long initial = memHeap.getUsed();
91-
92-
System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
93-
94-
Scheduler.Worker w = Schedulers.io().createWorker();
95-
for (int i = 0; i < 750000; i++) {
96-
if (i % 50000 == 0) {
97-
System.out.println(" -> still scheduling: " + i);
98-
}
99-
w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
77+
System.out.println("Wait before GC");
78+
Thread.sleep(1000);
79+
80+
System.out.println("GC");
81+
System.gc();
82+
83+
Thread.sleep(1000);
84+
85+
86+
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
87+
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
88+
long initial = memHeap.getUsed();
89+
90+
System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
91+
92+
Scheduler.Worker w = Schedulers.io().createWorker();
93+
for (int i = 0; i < 750000; i++) {
94+
if (i % 50000 == 0) {
95+
System.out.println(" -> still scheduling: " + i);
10096
}
101-
102-
memHeap = memoryMXBean.getHeapMemoryUsage();
103-
long after = memHeap.getUsed();
104-
System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0);
105-
106-
w.unsubscribe();
107-
108-
System.out.println("Wait before second GC");
109-
Thread.sleep(1000);
110-
111-
System.out.println("Second GC");
112-
System.gc();
113-
114-
Thread.sleep(1000);
115-
116-
memHeap = memoryMXBean.getHeapMemoryUsage();
117-
long finish = memHeap.getUsed();
118-
System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0);
119-
120-
if (finish > initial * 5) {
121-
Assert.fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d));
122-
}
123-
} catch (NoSuchMethodException ex) {
124-
// not supported, no reason to test for it
97+
w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
98+
}
99+
100+
memHeap = memoryMXBean.getHeapMemoryUsage();
101+
long after = memHeap.getUsed();
102+
System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0);
103+
104+
w.unsubscribe();
105+
106+
System.out.println("Wait before second GC");
107+
Thread.sleep(NewThreadWorker.PURGE_FREQUENCY + 2000);
108+
109+
System.out.println("Second GC");
110+
System.gc();
111+
112+
Thread.sleep(1000);
113+
114+
memHeap = memoryMXBean.getHeapMemoryUsage();
115+
long finish = memHeap.getUsed();
116+
System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0);
117+
118+
if (finish > initial * 5) {
119+
Assert.fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d));
125120
}
126121
}
127122

0 commit comments

Comments
 (0)