@@ -48,8 +48,8 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup
4848 public final void testHandledErrorIsNotDeliveredToThreadHandler () throws InterruptedException {
4949 SchedulerTests .testHandledErrorIsNotDeliveredToThreadHandler (getScheduler ());
5050 }
51- @ Test ( timeout = 30000 )
52- public void testCancelledTaskRetention ( ) throws InterruptedException {
51+
52+ public static void testCancelledRetention ( Scheduler . Worker w , boolean periodic ) throws InterruptedException {
5353 System .out .println ("Wait before GC" );
5454 Thread .sleep (1000 );
5555
@@ -64,13 +64,32 @@ public void testCancelledTaskRetention() throws InterruptedException {
6464 long initial = memHeap .getUsed ();
6565
6666 System .out .printf ("Starting: %.3f MB%n" , initial / 1024.0 / 1024.0 );
67-
68- Scheduler .Worker w = Schedulers .io ().createWorker ();
69- for (int i = 0 ; i < 500000 ; i ++) {
70- if (i % 50000 == 0 ) {
71- System .out .println (" -> still scheduling: " + i );
67+
68+ int n = 500 * 1000 ;
69+ if (periodic ) {
70+ final CountDownLatch cdl = new CountDownLatch (n );
71+ final Action0 action = new Action0 () {
72+ @ Override
73+ public void call () {
74+ cdl .countDown ();
75+ }
76+ };
77+ for (int i = 0 ; i < n ; i ++) {
78+ if (i % 50000 == 0 ) {
79+ System .out .println (" -> still scheduling: " + i );
80+ }
81+ w .schedulePeriodically (action , 0 , 1 , TimeUnit .DAYS );
82+ }
83+
84+ System .out .println ("Waiting for the first round to finish..." );
85+ cdl .await ();
86+ } else {
87+ for (int i = 0 ; i < n ; i ++) {
88+ if (i % 50000 == 0 ) {
89+ System .out .println (" -> still scheduling: " + i );
90+ }
91+ w .schedule (Actions .empty (), 1 , TimeUnit .DAYS );
7292 }
73- w .schedule (Actions .empty (), 1 , TimeUnit .DAYS );
7493 }
7594
7695 memHeap = memoryMXBean .getHeapMemoryUsage ();
@@ -95,7 +114,30 @@ public void testCancelledTaskRetention() throws InterruptedException {
95114 fail (String .format ("Tasks retained: %.3f -> %.3f -> %.3f" , initial / 1024 / 1024.0 , after / 1024 / 1024.0 , finish / 1024 / 1024d ));
96115 }
97116 }
98-
117+
118+ @ Test (timeout = 30000 )
119+ public void testCancelledTaskRetention () throws InterruptedException {
120+ ExecutorService exec = Executors .newSingleThreadExecutor ();
121+ Scheduler s = Schedulers .from (exec );
122+ try {
123+ Scheduler .Worker w = s .createWorker ();
124+ try {
125+ testCancelledRetention (w , false );
126+ } finally {
127+ w .unsubscribe ();
128+ }
129+
130+ w = s .createWorker ();
131+ try {
132+ testCancelledRetention (w , true );
133+ } finally {
134+ w .unsubscribe ();
135+ }
136+ } finally {
137+ exec .shutdownNow ();
138+ }
139+ }
140+
99141 /** A simple executor which queues tasks and executes them one-by-one if executeOne() is called. */
100142 static final class TestExecutor implements Executor {
101143 final ConcurrentLinkedQueue <Runnable > queue = new ConcurrentLinkedQueue <Runnable >();
@@ -204,4 +246,33 @@ public void execute(Runnable command) {
204246
205247 assertFalse (w .tasks .hasSubscriptions ());
206248 }
249+
250+ @ Test
251+ public void testNoPeriodicTimedTaskPartRetention () throws InterruptedException {
252+ Executor e = new Executor () {
253+ @ Override
254+ public void execute (Runnable command ) {
255+ command .run ();
256+ }
257+ };
258+ ExecutorSchedulerWorker w = (ExecutorSchedulerWorker )Schedulers .from (e ).createWorker ();
259+
260+ final CountDownLatch cdl = new CountDownLatch (1 );
261+ final Action0 action = new Action0 () {
262+ @ Override
263+ public void call () {
264+ cdl .countDown ();
265+ }
266+ };
267+
268+ Subscription s = w .schedulePeriodically (action , 0 , 1 , TimeUnit .DAYS );
269+
270+ assertTrue (w .tasks .hasSubscriptions ());
271+
272+ cdl .await ();
273+
274+ s .unsubscribe ();
275+
276+ assertFalse (w .tasks .hasSubscriptions ());
277+ }
207278}
0 commit comments