Skip to content

Commit 3d9fb68

Browse files
committed
Support cancellation in NIO extensions and provide single-threaded echo-example
1 parent 834af46 commit 3d9fb68

File tree

3 files changed

+183
-30
lines changed

3 files changed

+183
-30
lines changed
Lines changed: 120 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,162 @@
11
package kotlinx.coroutines.experimental.nio
22

3+
import kotlinx.coroutines.experimental.CancellableContinuation
4+
import kotlinx.coroutines.experimental.CancellationException
5+
import kotlinx.coroutines.experimental.Job
6+
import kotlinx.coroutines.experimental.suspendCancellableCoroutine
37
import java.net.SocketAddress
48
import java.nio.ByteBuffer
5-
import java.nio.channels.AsynchronousFileChannel
6-
import java.nio.channels.AsynchronousServerSocketChannel
7-
import java.nio.channels.AsynchronousSocketChannel
8-
import java.nio.channels.CompletionHandler
9+
import java.nio.channels.*
910
import java.util.concurrent.TimeUnit
10-
import kotlin.coroutines.Continuation
11-
import kotlin.coroutines.suspendCoroutine
1211

12+
/**
13+
* Performs [AsynchronousFileChannel.lock] without blocking a thread and resumes when asynchronous operation completes.
14+
* This suspending function is cancellable.
15+
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
16+
* *closes the underlying channel* and immediately resumes with [CancellationException].
17+
*/
18+
suspend fun AsynchronousFileChannel.aLock() = suspendCancellableCoroutine<FileLock> { cont ->
19+
lock(cont, asyncIOHandler())
20+
closeOnCancel(cont)
21+
}
22+
23+
/**
24+
* Performs [AsynchronousFileChannel.lock] without blocking a thread and resumes when asynchronous operation completes.
25+
* This suspending function is cancellable.
26+
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
27+
* *closes the underlying channel* and immediately resumes with [CancellationException].
28+
*/
29+
suspend fun AsynchronousFileChannel.aLock(
30+
position: Long,
31+
size: Long,
32+
shared: Boolean
33+
) = suspendCancellableCoroutine<FileLock> { cont ->
34+
lock(position, size, shared, cont, asyncIOHandler())
35+
closeOnCancel(cont)
36+
}
37+
38+
/**
39+
* Performs [AsynchronousFileChannel.read] without blocking a thread and resumes when asynchronous operation completes.
40+
* This suspending function is cancellable.
41+
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
42+
* *closes the underlying channel* and immediately resumes with [CancellationException].
43+
*/
1344
suspend fun AsynchronousFileChannel.aRead(
1445
buf: ByteBuffer,
1546
position: Long
16-
) = suspendCoroutine<Int> { c ->
17-
this.read(buf, position, null, AsyncIOHandler(c))
47+
) = suspendCancellableCoroutine<Int> { cont ->
48+
read(buf, position, cont, asyncIOHandler())
49+
closeOnCancel(cont)
1850
}
1951

52+
/**
53+
* Performs [AsynchronousFileChannel.write] without blocking a thread and resumes when asynchronous operation completes.
54+
* This suspending function is cancellable.
55+
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
56+
* *closes the underlying channel* and immediately resumes with [CancellationException].
57+
*/
2058
suspend fun AsynchronousFileChannel.aWrite(
2159
buf: ByteBuffer,
2260
position: Long
23-
) = suspendCoroutine<Int> { c ->
24-
this.write(buf, position, null, AsyncIOHandler(c))
61+
) = suspendCancellableCoroutine<Int> { cont ->
62+
write(buf, position, cont, asyncIOHandler())
63+
closeOnCancel(cont)
2564
}
2665

27-
suspend fun AsynchronousServerSocketChannel.aAccept() =
28-
suspendCoroutine<AsynchronousSocketChannel> { c ->
29-
this.accept(null, AsyncIOHandler(c))
30-
}
66+
/**
67+
* Performs [AsynchronousServerSocketChannel.accept] without blocking a thread and resumes when asynchronous operation completes.
68+
* This suspending function is cancellable.
69+
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
70+
* *closes the underlying channel* and immediately resumes with [CancellationException].
71+
*/
72+
suspend fun AsynchronousServerSocketChannel.aAccept() = suspendCancellableCoroutine<AsynchronousSocketChannel> { cont ->
73+
accept(cont, asyncIOHandler())
74+
closeOnCancel(cont)
75+
}
3176

77+
/**
78+
* Performs [AsynchronousServerSocketChannel.connect] without blocking a thread and resumes when asynchronous operation completes.
79+
* This suspending function is cancellable.
80+
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
81+
* *closes the underlying channel* and immediately resumes with [CancellationException].
82+
*/
3283
suspend fun AsynchronousSocketChannel.aConnect(
3384
socketAddress: SocketAddress
34-
) = suspendCoroutine<Unit> { c ->
35-
this.connect(socketAddress, null, AsyncVoidIOHandler(c))
85+
) = suspendCancellableCoroutine<Unit> { cont ->
86+
connect(socketAddress, cont, AsyncVoidIOHandler)
87+
closeOnCancel(cont)
3688
}
3789

90+
/**
91+
* Performs [AsynchronousServerSocketChannel.read] without blocking a thread and resumes when asynchronous operation completes.
92+
* This suspending function is cancellable.
93+
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
94+
* *closes the underlying channel* and immediately resumes with [CancellationException].
95+
*/
3896
suspend fun AsynchronousSocketChannel.aRead(
3997
buf: ByteBuffer,
4098
timeout: Long = 0L,
4199
timeUnit: TimeUnit = TimeUnit.MILLISECONDS
42-
) = suspendCoroutine<Int> { c ->
43-
this.read(buf, timeout, timeUnit, null, AsyncIOHandler(c))
100+
) = suspendCancellableCoroutine<Int> { cont ->
101+
read(buf, timeout, timeUnit, cont, asyncIOHandler())
102+
closeOnCancel(cont)
44103
}
45104

105+
/**
106+
* Performs [AsynchronousServerSocketChannel.write] without blocking a thread and resumes when asynchronous operation completes.
107+
* This suspending function is cancellable.
108+
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
109+
* *closes the underlying channel* and immediately resumes with [CancellationException].
110+
*/
46111
suspend fun AsynchronousSocketChannel.aWrite(
47112
buf: ByteBuffer,
48113
timeout: Long = 0L,
49114
timeUnit: TimeUnit = TimeUnit.MILLISECONDS
50-
) = suspendCoroutine<Int> { c ->
51-
this.write(buf, timeout, timeUnit, null, AsyncIOHandler(c))
115+
) = suspendCancellableCoroutine<Int> { cont ->
116+
write(buf, timeout, timeUnit, cont, asyncIOHandler())
117+
closeOnCancel(cont)
52118
}
53119

54-
private class AsyncIOHandler<E>(val c: Continuation<E>) : CompletionHandler<E, Nothing?> {
55-
override fun completed(result: E, attachment: Nothing?) {
56-
c.resume(result)
120+
// ---------------- private details ----------------
121+
122+
private fun Channel.closeOnCancel(cont: CancellableContinuation<*>) {
123+
cont.onCompletion {
124+
if (cont.isCancelled)
125+
try {
126+
close()
127+
} catch (ex: Throwable) {
128+
// Specification says that it is Ok to call it any time, but reality is different,
129+
// so we have just to ignore exception
130+
}
131+
}
132+
}
133+
134+
@Suppress("UNCHECKED_CAST")
135+
private fun <T> asyncIOHandler(): CompletionHandler<T, CancellableContinuation<T>> =
136+
AsyncIOHandlerAny as CompletionHandler<T, CancellableContinuation<T>>
137+
138+
private object AsyncIOHandlerAny : CompletionHandler<Any, CancellableContinuation<Any>> {
139+
override fun completed(result: Any, cont: CancellableContinuation<Any>) {
140+
cont.resume(result)
57141
}
58142

59-
override fun failed(exc: Throwable, attachment: Nothing?) {
60-
c.resumeWithException(exc)
143+
override fun failed(ex: Throwable, cont: CancellableContinuation<Any>) {
144+
// just return if already cancelled and got an expected exception for that case
145+
if (ex is AsynchronousCloseException && cont.isCancelled) return
146+
cont.resumeWithException(ex)
61147
}
62148
}
63149

64-
private class AsyncVoidIOHandler(val c: Continuation<Unit>) : CompletionHandler<Void?, Nothing?> {
65-
override fun completed(result: Void?, attachment: Nothing?) {
66-
c.resume(Unit)
150+
private object AsyncVoidIOHandler : CompletionHandler<Void?, CancellableContinuation<Unit>> {
151+
override fun completed(result: Void?, cont: CancellableContinuation<Unit>) {
152+
cont.resume(Unit)
67153
}
68154

69-
override fun failed(exc: Throwable, attachment: Nothing?) {
70-
c.resumeWithException(exc)
155+
override fun failed(ex: Throwable, cont: CancellableContinuation<Unit>) {
156+
// just return if already cancelled and got an expected exception for that case
157+
if (ex is AsynchronousCloseException && cont.isCancelled) return
158+
cont.resumeWithException(ex)
71159
}
72160
}
161+
162+
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package examples
2+
3+
import kotlinx.coroutines.experimental.launch
4+
import kotlinx.coroutines.experimental.nio.aAccept
5+
import kotlinx.coroutines.experimental.nio.aRead
6+
import kotlinx.coroutines.experimental.nio.aWrite
7+
import kotlinx.coroutines.experimental.runBlocking
8+
import kotlinx.coroutines.experimental.withTimeout
9+
import java.net.InetSocketAddress
10+
import java.nio.ByteBuffer
11+
import java.nio.channels.AsynchronousServerSocketChannel
12+
import java.nio.channels.AsynchronousSocketChannel
13+
14+
val PORT = 12345
15+
val CLIENT_READ_TIMEOUT = 5000L // 5 sec
16+
val CLIENT_WRITE_TIMEOUT = 1000L // 1 sec
17+
val BUFFER_SIZE = 1024
18+
19+
fun main(args: Array<String>) = runBlocking {
20+
val serverChannel =
21+
AsynchronousServerSocketChannel
22+
.open()
23+
.bind(InetSocketAddress(PORT))
24+
log("Listening on port $PORT")
25+
// loop and accept connections forever
26+
while (true) {
27+
val client = serverChannel.aAccept()
28+
val address = try {
29+
val ia = client.remoteAddress as InetSocketAddress
30+
"${ia.address.hostAddress}:${ia.port}"
31+
} catch (ex: Throwable) {
32+
log("Accepted client connection but failed to get its address because of $ex")
33+
continue /* accept next connection */
34+
}
35+
log("Accepted client connection from $address")
36+
// just start a new coroutine for each client connection
37+
launch(context) {
38+
try {
39+
handleClient(client)
40+
log("Client connection from $address has terminated normally")
41+
} catch (ex: Throwable) {
42+
log("Client connection from $address has terminated because of $ex")
43+
}
44+
}
45+
}
46+
}
47+
48+
private suspend fun handleClient(client: AsynchronousSocketChannel) {
49+
val buffer = ByteBuffer.allocate(BUFFER_SIZE)
50+
while (true) {
51+
val bytes = withTimeout(CLIENT_READ_TIMEOUT) { client.aRead(buffer) }
52+
if (bytes < 0) break
53+
buffer.flip()
54+
withTimeout(CLIENT_WRITE_TIMEOUT) { client.aWrite(buffer) }
55+
buffer.clear()
56+
}
57+
}
58+
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package examples
2+
3+
import java.time.Instant
4+
5+
fun log(msg: String) = println("${Instant.now()} [${Thread.currentThread().name}] $msg")

0 commit comments

Comments
 (0)