1616
1717package rx .schedulers ;
1818
19+ import java .lang .management .*;
20+ import java .util .concurrent .*;
21+
22+ import junit .framework .Assert ;
23+
1924import org .junit .Test ;
25+
2026import rx .Observable ;
2127import rx .Scheduler ;
22- import rx .functions .Action1 ;
23- import rx .functions .Func1 ;
24-
28+ import rx .functions .*;
2529import static org .junit .Assert .assertTrue ;
2630
2731public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {
@@ -66,4 +70,59 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup
6670 public final void testHandledErrorIsNotDeliveredToThreadHandler () throws InterruptedException {
6771 SchedulerTests .testHandledErrorIsNotDeliveredToThreadHandler (getScheduler ());
6872 }
73+
74+ @ Test (timeout = 30000 )
75+ public void testCancelledTaskRetention () throws InterruptedException {
76+ try {
77+ ScheduledThreadPoolExecutor .class .getMethod ("setRemoveOnCancelPolicy" , Boolean .TYPE );
78+
79+ System .out .println ("Wait before GC" );
80+ Thread .sleep (1000 );
81+
82+ System .out .println ("GC" );
83+ System .gc ();
84+
85+ Thread .sleep (1000 );
86+
87+
88+ MemoryMXBean memoryMXBean = ManagementFactory .getMemoryMXBean ();
89+ MemoryUsage memHeap = memoryMXBean .getHeapMemoryUsage ();
90+ long initial = memHeap .getUsed ();
91+
92+ System .out .printf ("Starting: %.3f MB%n" , initial / 1024.0 / 1024.0 );
93+
94+ Scheduler .Worker w = Schedulers .io ().createWorker ();
95+ for (int i = 0 ; i < 750000 ; i ++) {
96+ if (i % 50000 == 0 ) {
97+ System .out .println (" -> still scheduling: " + i );
98+ }
99+ w .schedule (Actions .empty (), 1 , TimeUnit .DAYS );
100+ }
101+
102+ memHeap = memoryMXBean .getHeapMemoryUsage ();
103+ long after = memHeap .getUsed ();
104+ System .out .printf ("Peak: %.3f MB%n" , after / 1024.0 / 1024.0 );
105+
106+ w .unsubscribe ();
107+
108+ System .out .println ("Wait before second GC" );
109+ Thread .sleep (1000 );
110+
111+ System .out .println ("Second GC" );
112+ System .gc ();
113+
114+ Thread .sleep (1000 );
115+
116+ memHeap = memoryMXBean .getHeapMemoryUsage ();
117+ long finish = memHeap .getUsed ();
118+ System .out .printf ("After: %.3f MB%n" , finish / 1024.0 / 1024.0 );
119+
120+ if (finish > initial * 5 ) {
121+ Assert .fail (String .format ("Tasks retained: %.3f -> %.3f -> %.3f" , initial / 1024 / 1024.0 , after / 1024 / 1024.0 , finish / 1024 / 1024d ));
122+ }
123+ } catch (NoSuchMethodException ex ) {
124+ // not supported, no reason to test for it
125+ }
126+ }
127+
69128}
0 commit comments