Skip to content

Commit 5c58df9

Browse files
authored
Payload releasing (#108)
* add leak detection * use leak detection in all core tests + transport test for local connection * fix payload leaks * mark some more functions as internal * remove suppress for some functions
1 parent 3f79945 commit 5c58df9

File tree

68 files changed

+934
-568
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+934
-568
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ RSocket interface contains 5 methods:
3939
`suspend fun metadataPush(metadata: ByteReadPacket)`
4040

4141
## Using in your projects
42-
The `master` branch is now dedicated to development of multiplatform rsocket-kotlin.
43-
For now only snapshots are available via [oss.jfrog.org](oss.jfrog.org) (OJO).
42+
The `master` branch is now dedicated to development of multiplatform rsocket-kotlin. For now only snapshots are available
43+
via [oss.jfrog.org](https://oss.jfrog.org/artifactory/oss-snapshot-local/io/rsocket/kotlin/) (OJO).
4444

45-
Make sure, that you use Kotlin 1.4.
45+
Make sure, that you use Kotlin 1.4.X.
4646

4747
### Gradle:
4848

@@ -225,7 +225,7 @@ val bufferedStream: Flow<Payload> = stream.buffer(10) //here buffer is 10, if `b
225225
bufferedStream.collect { payload: Payload ->
226226
println(payload.data.readText())
227227
}
228-
```
228+
```
229229

230230
## Bugs and Feedback
231231

build.gradle.kts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,6 @@ subprojects {
195195

196196
//common configuration
197197
extensions.configure<KotlinMultiplatformExtension> {
198-
// explicitApiWarning() //TODO change to strict before release
199198
sourceSets.all {
200199
languageSettings.apply {
201200
progressiveMode = true
@@ -213,11 +212,13 @@ subprojects {
213212
useExperimentalAnnotation("kotlinx.coroutines.FlowPreview")
214213
useExperimentalAnnotation("io.ktor.util.KtorExperimentalAPI")
215214
useExperimentalAnnotation("io.ktor.util.InternalAPI")
215+
useExperimentalAnnotation("io.ktor.utils.io.core.internal.DangerousInternalIoApi")
216216
}
217217
}
218218
}
219219

220220
if (project.name != "rsocket-test") {
221+
explicitApiWarning() //TODO change to strict before release
221222
sourceSets["commonTest"].dependencies {
222223
implementation(project(":rsocket-test"))
223224
}

examples/multiplatform-chat/src/clientMain/kotlin/PayloadWithRoute.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import io.ktor.utils.io.core.*
1818
import io.rsocket.kotlin.payload.*
1919

20-
@Suppress("FunctionName")
2120
fun Payload(route: String, packet: ByteReadPacket): Payload = Payload {
2221
data(packet)
2322
metadata(route)

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/Connection.kt

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,38 @@
1717
package io.rsocket.kotlin.connection
1818

1919
import io.ktor.utils.io.core.*
20+
import io.ktor.utils.io.core.internal.*
21+
import io.ktor.utils.io.pool.*
2022
import io.rsocket.kotlin.*
2123
import io.rsocket.kotlin.core.*
24+
import io.rsocket.kotlin.frame.*
2225
import kotlinx.coroutines.*
2326

27+
/**
28+
* That interface isn't stable for inheritance.
29+
*/
2430
interface Connection : Cancelable {
31+
32+
@DangerousInternalIoApi
33+
val pool: ObjectPool<ChunkBuffer>
34+
get() = ChunkBuffer.Pool
35+
2536
suspend fun send(packet: ByteReadPacket)
2637
suspend fun receive(): ByteReadPacket
2738
}
2839

2940
suspend fun Connection.connectClient(
30-
configuration: RSocketConnectorConfiguration = RSocketConnectorConfiguration()
41+
configuration: RSocketConnectorConfiguration = RSocketConnectorConfiguration(),
3142
): RSocket = RSocketConnector(ConnectionProvider(this), configuration).connect()
3243

3344
suspend fun Connection.startServer(
3445
configuration: RSocketServerConfiguration = RSocketServerConfiguration(),
35-
acceptor: RSocketAcceptor
46+
acceptor: RSocketAcceptor,
3647
): Job = RSocketServer(ConnectionProvider(this), configuration).start(acceptor)
48+
49+
50+
@OptIn(DangerousInternalIoApi::class)
51+
internal suspend fun Connection.receiveFrame(): Frame = receive().readFrame(pool)
52+
53+
@OptIn(DangerousInternalIoApi::class)
54+
internal suspend fun Connection.sendFrame(frame: Frame): Unit = send(frame.toPacket(pool))

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/LoggingConnection.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,23 @@
1717
package io.rsocket.kotlin.connection
1818

1919
import io.ktor.utils.io.core.*
20+
import io.ktor.utils.io.core.internal.*
2021
import io.rsocket.kotlin.frame.*
2122
import io.rsocket.kotlin.logging.*
22-
import kotlinx.coroutines.*
2323

2424
internal fun Connection.logging(logger: Logger): Connection =
2525
if (logger.isLoggable(LoggingLevel.DEBUG)) LoggingConnection(this, logger) else this
2626

27+
@OptIn(DangerousInternalIoApi::class)
2728
private class LoggingConnection(
2829
private val delegate: Connection,
2930
private val logger: Logger,
30-
) : Connection {
31-
override val job: Job get() = delegate.job
31+
) : Connection by delegate {
32+
33+
private fun ByteReadPacket.dumpFrameToString(): String {
34+
val length = remaining
35+
return copy().use { it.readFrame(pool).use { it.dump(length) } }
36+
}
3237

3338
override suspend fun send(packet: ByteReadPacket) {
3439
logger.debug { "Send: ${packet.dumpFrameToString()}" }

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ class RSocketConnector(
4646
connection = connection,
4747
plugin = configuration.plugin,
4848
setupFrame = setupFrame,
49-
ignoredFrameConsumer = configuration.ignoredFrameConsumer,
5049
acceptor = configuration.acceptor
5150
)
5251
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorConfiguration.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.rsocket.kotlin.core
1818

1919
import io.rsocket.kotlin.*
20-
import io.rsocket.kotlin.frame.*
2120
import io.rsocket.kotlin.keepalive.*
2221
import io.rsocket.kotlin.logging.*
2322
import io.rsocket.kotlin.payload.*
@@ -29,6 +28,5 @@ data class RSocketConnectorConfiguration(
2928
val keepAlive: KeepAlive = KeepAlive(),
3029
val payloadMimeType: PayloadMimeType = PayloadMimeType(),
3130
val setupPayload: Payload = Payload.Empty,
32-
val ignoredFrameConsumer: (Frame) -> Unit = {},
3331
val acceptor: RSocketAcceptor = { RSocketRequestHandler { } },
3432
)

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class RSocketServer(
3535
.let(configuration.plugin::wrapConnection)
3636
.logging(configuration.loggerFactory.logger("io.rsocket.kotlin.frame.Frame"))
3737

38-
val setupFrame = connection.receive().toFrame()
38+
val setupFrame = connection.receiveFrame()
3939
if (setupFrame !is SetupFrame)
4040
connection.failSetup(RSocketError.Setup.Invalid("Invalid setup frame: ${setupFrame.type}"))
4141
if (setupFrame.version != Version.Current)
@@ -46,7 +46,6 @@ class RSocketServer(
4646
connection = connection,
4747
plugin = configuration.plugin,
4848
setupFrame = setupFrame,
49-
ignoredFrameConsumer = configuration.ignoredFrameConsumer,
5049
acceptor = acceptor
5150
)
5251
} catch (e: Throwable) {
@@ -55,7 +54,7 @@ class RSocketServer(
5554
}
5655

5756
private suspend fun Connection.failSetup(error: RSocketError.Setup): Nothing {
58-
send(ErrorFrame(0, error).toPacket())
57+
sendFrame(ErrorFrame(0, error))
5958
cancel("Setup failed", error)
6059
throw error
6160
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerConfiguration.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@
1616

1717
package io.rsocket.kotlin.core
1818

19-
import io.rsocket.kotlin.frame.*
2019
import io.rsocket.kotlin.logging.*
2120
import io.rsocket.kotlin.plugin.*
2221

2322
data class RSocketServerConfiguration(
2423
val plugin: Plugin = Plugin(),
2524
val loggerFactory: LoggerFactory = DefaultLoggerFactory,
26-
val ignoredFrameConsumer: (Frame) -> Unit = {},
2725
)

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/CancelFrame.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ package io.rsocket.kotlin.frame
1818

1919
import io.ktor.utils.io.core.*
2020

21-
class CancelFrame(
21+
internal class CancelFrame(
2222
override val streamId: Int,
2323
) : Frame(FrameType.Cancel) {
2424
override val flags: Int get() = 0
25+
26+
override fun release(): Unit = Unit
27+
2528
override fun BytePacketBuilder.writeSelf(): Unit = Unit
2629

2730
override fun StringBuilder.appendFlags(): Unit = Unit

0 commit comments

Comments
 (0)