Skip to content

Commit 1f8389f

Browse files
committed
Cleanup the worker.
1 parent 8d6a186 commit 1f8389f

File tree

1 file changed

+33
-30
lines changed

1 file changed

+33
-30
lines changed

src/test/java/rx/schedulers/NewThreadSchedulerTest.java

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -48,36 +48,39 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru
4848
@Test(timeout = 3000)
4949
public void testNoSelfInterrupt() throws InterruptedException {
5050
Scheduler.Worker worker = Schedulers.newThread().createWorker();
51-
52-
final CountDownLatch run = new CountDownLatch(1);
53-
final CountDownLatch done = new CountDownLatch(1);
54-
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
55-
final AtomicBoolean interruptFlag = new AtomicBoolean();
56-
57-
ScheduledAction sa = (ScheduledAction)worker.schedule(new Action0() {
58-
@Override
59-
public void call() {
60-
try {
61-
run.await();
62-
} catch (InterruptedException ex) {
63-
exception.set(ex);
51+
try {
52+
final CountDownLatch run = new CountDownLatch(1);
53+
final CountDownLatch done = new CountDownLatch(1);
54+
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
55+
final AtomicBoolean interruptFlag = new AtomicBoolean();
56+
57+
ScheduledAction sa = (ScheduledAction)worker.schedule(new Action0() {
58+
@Override
59+
public void call() {
60+
try {
61+
run.await();
62+
} catch (InterruptedException ex) {
63+
exception.set(ex);
64+
}
6465
}
65-
}
66-
});
67-
68-
sa.add(Subscriptions.create(new Action0() {
69-
@Override
70-
public void call() {
71-
interruptFlag.set(Thread.currentThread().isInterrupted());
72-
done.countDown();
73-
}
74-
}));
75-
76-
run.countDown();
77-
78-
done.await();
79-
80-
Assert.assertEquals(null, exception.get());
81-
Assert.assertFalse("Interrupted?!", interruptFlag.get());
66+
});
67+
68+
sa.add(Subscriptions.create(new Action0() {
69+
@Override
70+
public void call() {
71+
interruptFlag.set(Thread.currentThread().isInterrupted());
72+
done.countDown();
73+
}
74+
}));
75+
76+
run.countDown();
77+
78+
done.await();
79+
80+
Assert.assertEquals(null, exception.get());
81+
Assert.assertFalse("Interrupted?!", interruptFlag.get());
82+
} finally {
83+
worker.unsubscribe();
84+
}
8285
}
8386
}

0 commit comments

Comments
 (0)