Skip to content

Commit b7b8223

Browse files
committed
chore: Improve task cancellation.
1 parent 80117ab commit b7b8223

File tree

2 files changed

+6
-14
lines changed

2 files changed

+6
-14
lines changed

src/main/java/io/github/nstdio/http/ext/ByteArraySubscription.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919
import java.nio.ByteBuffer;
2020
import java.util.List;
2121
import java.util.concurrent.Executor;
22-
import java.util.concurrent.ExecutorService;
2322
import java.util.concurrent.Flow.Subscriber;
2423
import java.util.concurrent.Flow.Subscription;
2524
import java.util.concurrent.Future;
2625
import java.util.concurrent.atomic.AtomicBoolean;
2726
import java.util.function.Function;
2827
import java.util.function.Supplier;
2928

29+
import static java.util.concurrent.CompletableFuture.runAsync;
30+
3031
class ByteArraySubscription<T> implements Subscription {
3132
private final Subscriber<T> subscriber;
3233
private final Executor executor;
@@ -35,7 +36,7 @@ class ByteArraySubscription<T> implements Subscription {
3536
private final Supplier<byte[]> bytes;
3637

3738
private final AtomicBoolean completed = new AtomicBoolean(false);
38-
private Future<?> result;
39+
Future<?> result;
3940

4041
ByteArraySubscription(Subscriber<T> subscriber, Executor executor, Supplier<byte[]> bytes, Function<byte[], T> mapper) {
4142
this.subscriber = subscriber;
@@ -83,11 +84,7 @@ public void cancel() {
8384
}
8485

8586
private void submit(Runnable r) {
86-
if (executor instanceof ExecutorService) {
87-
result = ((ExecutorService) executor).submit(r);
88-
} else {
89-
executor.execute(r);
90-
}
87+
result = runAsync(r, executor);
9188
}
9289

9390
private enum DirectExecutor implements Executor {

src/test/kotlin/io/github/nstdio/http/ext/ByteArraySubscriptionTest.kt

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.github.nstdio.http.ext
1717

1818
import io.kotest.assertions.timing.eventually
19+
import io.kotest.matchers.shouldBe
1920
import io.kotest.property.Arb
2021
import io.kotest.property.arbitrary.next
2122
import io.kotest.property.arbitrary.string
@@ -28,7 +29,6 @@ import org.junit.jupiter.api.Test
2829
import org.junit.jupiter.api.extension.ExtendWith
2930
import org.mockito.ArgumentMatchers
3031
import org.mockito.ArgumentMatchers.any
31-
import org.mockito.BDDMockito.given
3232
import org.mockito.Mock
3333
import org.mockito.Mockito.inOrder
3434
import org.mockito.Mockito.mock
@@ -42,7 +42,6 @@ import java.nio.ByteBuffer
4242
import java.nio.charset.StandardCharsets
4343
import java.util.concurrent.ExecutorService
4444
import java.util.concurrent.ForkJoinPool
45-
import java.util.concurrent.Future
4645
import kotlin.time.Duration.Companion.seconds
4746

4847
@ExtendWith(MockitoExtension::class)
@@ -142,10 +141,6 @@ internal class ByteArraySubscriptionTest {
142141
fun `Should cancel result`() {
143142
//given
144143
val mockExecutor = mock(ExecutorService::class.java)
145-
val mockFuture = mock(Future::class.java)
146-
147-
given(mockExecutor.submit(any(Runnable::class.java)))
148-
.willReturn(mockFuture)
149144

150145
val bytes = Arb.byteArray(8).next()
151146
val item = listOf(bytes.toBuffer())
@@ -156,7 +151,7 @@ internal class ByteArraySubscriptionTest {
156151
subscription.cancel()
157152

158153
//then
159-
verify(mockFuture).cancel(false)
154+
subscription.result.isCancelled shouldBe true
160155
}
161156

162157
private fun runAsyncAwait(block: suspend CoroutineScope.() -> Unit) {

0 commit comments

Comments
 (0)