Skip to content

Commit c12f753

Browse files
committed
Drop isClosed
1 parent 4752325 commit c12f753

File tree

21 files changed

+20
-51
lines changed

21 files changed

+20
-51
lines changed

rsocket-core/api/rsocket-core.api

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -763,14 +763,12 @@ public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedCo
763763
}
764764

765765
public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection$Stream : kotlinx/coroutines/CoroutineScope {
766-
public abstract fun isClosedForSend ()Z
767766
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
768767
public abstract fun sendFrame (Lkotlinx/io/Buffer;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
769768
public abstract fun setSendPriority (I)V
770769
}
771770

772771
public abstract interface class io/rsocket/kotlin/transport/RSocketSequentialConnection : io/rsocket/kotlin/transport/RSocketConnection {
773-
public abstract fun isClosedForSend ()Z
774772
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
775773
public abstract fun sendFrame (ILkotlinx/io/Buffer;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
776774
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionEstablishmentHandler.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ internal abstract class ConnectionEstablishmentHandler(
8989
}
9090
} catch (cause: Throwable) {
9191
connection.close()
92-
withContext(NonCancellable) {
92+
nonCancellable {
9393
connection.sendError(
9494
when (cause) {
9595
is RSocketError -> cause

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/LoggingConnection.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ private class SequentialLoggingConnection(
4040
private val delegate: RSocketSequentialConnection,
4141
private val logger: Logger,
4242
) : RSocketSequentialConnection {
43-
override val isClosedForSend: Boolean get() = delegate.isClosedForSend
4443
override val coroutineContext: CoroutineContext get() = delegate.coroutineContext
4544

4645
override suspend fun sendFrame(streamId: Int, frame: Buffer) {
@@ -86,7 +85,6 @@ private class MultiplexedLoggingStream(
8685
private val delegate: RSocketMultiplexedConnection.Stream,
8786
private val logger: Logger,
8887
) : RSocketMultiplexedConnection.Stream {
89-
override val isClosedForSend: Boolean get() = delegate.isClosedForSend
9088
override val coroutineContext: CoroutineContext get() = delegate.coroutineContext
9189

9290
override fun setSendPriority(priority: Int) {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/MultiplexedConnection.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,6 @@ internal class MultiplexedConnection(
226226
streamId: Int,
227227
private val stream: RSocketMultiplexedConnection.Stream,
228228
) : OperationOutbound(streamId, frameCodec) {
229-
override val isClosed: Boolean get() = stream.isClosedForSend
230229
override suspend fun sendFrame(frame: Buffer): Unit = stream.sendFrame(frame)
231230
}
232231

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/OldConnection.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ internal class OldConnection(
3232
) : RSocketSequentialConnection {
3333
private val outboundQueue = PrioritizationFrameQueue(Channel.BUFFERED)
3434

35-
override val isClosedForSend: Boolean get() = outboundQueue.isClosedForSend
36-
3735
override val coroutineContext: CoroutineContext get() = connection.coroutineContext
3836

3937
init {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/SequentialConnection.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ internal class SequentialConnection(
144144
}
145145

146146
private inner class Outbound(streamId: Int) : OperationOutbound(streamId, frameCodec) {
147-
override val isClosed: Boolean get() = !isActive || connection.isClosedForSend
148147
override suspend fun sendFrame(frame: Buffer): Unit = connection.sendFrame(streamId, frame)
149148
}
150149

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/OperationOutbound.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,8 +34,6 @@ internal abstract class OperationOutbound(
3434
// TODO: decide on it
3535
// private var firstRequestFrameSent: Boolean = false
3636

37-
abstract val isClosed: Boolean
38-
3937
protected abstract suspend fun sendFrame(frame: Buffer)
4038
private suspend fun sendFrame(frame: Frame): Unit = sendFrame(frameCodec.encodeFrame(frame))
4139

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterFireAndForgetOperation.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package io.rsocket.kotlin.operation
1818

1919
import io.rsocket.kotlin.frame.*
20+
import io.rsocket.kotlin.internal.io.*
2021
import io.rsocket.kotlin.payload.*
2122
import kotlinx.coroutines.*
2223
import kotlin.coroutines.*
@@ -36,7 +37,7 @@ internal class RequesterFireAndForgetOperation(
3637
requestSentCont.resume(Unit)
3738
} catch (cause: Throwable) {
3839
if (requestSentCont.isActive) requestSentCont.resumeWithException(cause)
39-
if (!outbound.isClosed) withContext(NonCancellable) { outbound.sendCancel() }
40+
nonCancellable { outbound.sendCancel() }
4041
throw cause
4142
}
4243
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestChannelOperation.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@ package io.rsocket.kotlin.operation
1818

1919
import io.rsocket.kotlin.frame.*
2020
import io.rsocket.kotlin.internal.*
21+
import io.rsocket.kotlin.internal.io.*
2122
import io.rsocket.kotlin.payload.*
2223
import kotlinx.atomicfu.*
2324
import kotlinx.coroutines.*
@@ -58,11 +59,11 @@ internal class RequesterRequestChannelOperation(
5859
try {
5960
while (true) outbound.sendRequestN(responsePayloads.nextRequestN() ?: break)
6061
} catch (cause: Throwable) {
61-
if (!currentCoroutineContext().isActive || !outbound.isClosed) throw cause
62+
if (!currentCoroutineContext().isActive) throw cause
6263
}
6364
}
6465
} catch (cause: Throwable) {
65-
if (!outbound.isClosed) withContext(NonCancellable) {
66+
nonCancellable {
6667
when (val error = failure) {
6768
null -> outbound.sendCancel()
6869
else -> outbound.sendError(error)

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestResponseOperation.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package io.rsocket.kotlin.operation
1818

1919
import io.rsocket.kotlin.frame.*
20+
import io.rsocket.kotlin.internal.io.*
2021
import io.rsocket.kotlin.payload.*
2122
import kotlinx.coroutines.*
2223

@@ -35,7 +36,7 @@ internal class RequesterRequestResponseOperation(
3536
responseDeferred.join()
3637
} catch (cause: Throwable) {
3738
// TODO: we don't need to send cancel if we have sent no frames
38-
if (!outbound.isClosed) withContext(NonCancellable) { outbound.sendCancel() }
39+
nonCancellable { outbound.sendCancel() }
3940
throw cause
4041
}
4142
}

0 commit comments

Comments
 (0)