Skip to content

Commit c1a8d97

Browse files
author
olme04
committed
WIP: rework configuration API
1 parent 68d449f commit c1a8d97

File tree

48 files changed

+1534
-1163
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1534
-1163
lines changed

benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package io.rsocket.kotlin.benchmarks
1818

1919
import io.ktor.utils.io.core.*
2020
import io.rsocket.kotlin.*
21-
import io.rsocket.kotlin.core.*
21+
import io.rsocket.kotlin.connect.*
2222
import io.rsocket.kotlin.payload.*
2323
import io.rsocket.kotlin.transport.local.*
2424
import kotlinx.coroutines.*
@@ -40,7 +40,7 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
4040
payload = createPayload(payloadSize)
4141
payloadsFlow = flow { repeat(5000) { emit(payloadCopy()) } }
4242
val server = RSocketServer().bindIn(CoroutineScope(benchJob + Dispatchers.Unconfined), LocalServerTransport()) {
43-
RSocket {
43+
responder {
4444
onRequestResponse {
4545
it.close()
4646
payloadCopy()

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/ConnectionAcceptor.kt

Lines changed: 0 additions & 35 deletions
This file was deleted.

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,6 @@ package io.rsocket.kotlin
1919
import io.ktor.utils.io.core.*
2020
import io.rsocket.kotlin.payload.*
2121
import kotlinx.coroutines.flow.*
22-
import kotlin.coroutines.*
23-
24-
public sealed interface ConnectedRSocket : RSocket {
25-
public val session: RSocketSession
26-
}
2722

2823
public interface RSocket {
2924

@@ -44,6 +39,8 @@ public interface RSocket {
4439

4540
}
4641

42+
public object EmptyRSocket : RSocket
43+
4744
public sealed interface RSocketBuilder {
4845
public fun onMetadataPush(block: suspend RSocket.(metadata: ByteReadPacket) -> Unit)
4946
public fun onFireAndForget(block: suspend RSocket.(payload: Payload) -> Unit)
@@ -82,14 +79,6 @@ public suspend fun <C : Closeable> FlowCollector<C>.emitOrClose(value: C) {
8279
}
8380
}
8481

85-
internal abstract class ConnectedRSocketImpl(
86-
final override val coroutineContext: CoroutineContext,
87-
) : ConnectedRSocket, RSocketSession {
88-
final override val session: RSocketSession get() = this
89-
}
90-
91-
internal object EmptyRSocket : RSocket
92-
9382
@PublishedApi
9483
internal class RSocketImpl : RSocketBuilder, RSocket {
9584
private var metadataPush: suspend RSocket.(metadata: ByteReadPacket) -> Unit =
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package io.rsocket.kotlin.configuration
2+
3+
internal interface ConfigurationState {
4+
fun checkConfigured()
5+
fun checkNotConfigured()
6+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.rsocket.kotlin.configuration
2+
3+
@DslMarker
4+
public annotation class ConnectConfigurationDsl
5+
6+
//marker interface
7+
@ConnectConfigurationDsl
8+
public sealed interface ConnectConfiguration
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.rsocket.kotlin.configuration
2+
3+
import kotlin.time.*
4+
import kotlin.time.Duration.Companion.seconds
5+
6+
public sealed interface KeepAliveConfiguration {
7+
public val interval: Duration
8+
public val maxLifetime: Duration
9+
}
10+
11+
public sealed interface KeepAliveConnectConfiguration : KeepAliveConfiguration, ConnectConfiguration
12+
13+
public sealed interface KeepAliveClientConnectConfiguration : KeepAliveConnectConfiguration {
14+
public fun interval(duration: Duration)
15+
public fun maxLifetime(duration: Duration)
16+
}
17+
18+
public sealed interface KeepAliveServerConnectConfiguration : KeepAliveConnectConfiguration
19+
20+
internal class KeepAliveClientConnectConfigurationImpl(
21+
private val configurationState: ConfigurationState,
22+
) : KeepAliveClientConnectConfiguration {
23+
override var interval: Duration = 20.seconds
24+
private set
25+
override var maxLifetime: Duration = 90.seconds
26+
private set
27+
28+
override fun interval(duration: Duration) {
29+
configurationState.checkNotConfigured()
30+
interval = duration
31+
}
32+
33+
override fun maxLifetime(duration: Duration) {
34+
configurationState.checkNotConfigured()
35+
maxLifetime = duration
36+
}
37+
}
38+
39+
internal class KeepAliveServerConnectConfigurationImpl(
40+
override val interval: Duration,
41+
override val maxLifetime: Duration,
42+
) : KeepAliveServerConnectConfiguration
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.rsocket.kotlin.configuration
2+
3+
import io.rsocket.kotlin.core.*
4+
5+
public sealed interface MimeTypeConfiguration {
6+
public val metadata: MimeTypeWithName
7+
public val data: MimeTypeWithName
8+
}
9+
10+
public sealed interface MimeTypeConnectConfiguration : MimeTypeConfiguration, ConnectConfiguration
11+
12+
public sealed interface MimeTypeClientConnectConfiguration : MimeTypeConnectConfiguration {
13+
public fun metadata(mimeType: MimeTypeWithName)
14+
public fun data(mimeType: MimeTypeWithName)
15+
}
16+
17+
public sealed interface MimeTypeServerConnectConfiguration : MimeTypeConnectConfiguration
18+
19+
internal class MimeTypeClientConnectConfigurationImpl(
20+
private val configurationState: ConfigurationState,
21+
) : MimeTypeClientConnectConfiguration {
22+
override var metadata: MimeTypeWithName = WellKnownMimeType.ApplicationOctetStream
23+
private set
24+
override var data: MimeTypeWithName = WellKnownMimeType.ApplicationOctetStream
25+
private set
26+
27+
override fun metadata(mimeType: MimeTypeWithName) {
28+
configurationState.checkNotConfigured()
29+
metadata = mimeType
30+
}
31+
32+
override fun data(mimeType: MimeTypeWithName) {
33+
configurationState.checkNotConfigured()
34+
data = mimeType
35+
}
36+
}
37+
38+
internal class MimeTypeServerConnectConfigurationImpl(
39+
override val metadata: MimeTypeWithName,
40+
override val data: MimeTypeWithName,
41+
) : MimeTypeServerConnectConfiguration
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.rsocket.kotlin.configuration
2+
3+
import io.rsocket.kotlin.core.*
4+
5+
public sealed interface PayloadConfiguration {
6+
//TODO: Long?
7+
//TODO: add later
8+
//max payload overall size
9+
// public val maxSize: Long
10+
11+
//TODO: rename
12+
//size of fragment, after fragmentation
13+
// f.e. if 100, and payload = 450, will create 5 fragments when sending
14+
public val maxFragmentSize: Int
15+
16+
public val mimeType: MimeTypeConfiguration
17+
}
18+
19+
public sealed interface PayloadConnectConfiguration : PayloadConfiguration, ConnectConfiguration {
20+
override val mimeType: MimeTypeConnectConfiguration
21+
22+
// public fun maxSize(value: Int)
23+
public fun maxFragmentSize(value: Int)
24+
}
25+
26+
public sealed interface PayloadClientConnectConfiguration : PayloadConnectConfiguration {
27+
override val mimeType: MimeTypeClientConnectConfiguration
28+
}
29+
30+
public sealed interface PayloadServerConnectConfiguration : PayloadConnectConfiguration {
31+
override val mimeType: MimeTypeServerConnectConfiguration
32+
}
33+
34+
internal abstract class PayloadConnectConfigurationImpl(
35+
private val configurationState: ConfigurationState,
36+
) : PayloadConnectConfiguration {
37+
final override var maxFragmentSize: Int = 0
38+
private set
39+
40+
final override fun maxFragmentSize(value: Int) {
41+
configurationState.checkNotConfigured()
42+
require(value == 0 || value >= 64) {
43+
"maxFragmentSize should be zero (no fragmentation) or greater than or equal to 64, but was $value"
44+
}
45+
maxFragmentSize = value
46+
}
47+
}
48+
49+
internal class PayloadClientConnectConfigurationImpl(
50+
configurationState: ConfigurationState,
51+
) : PayloadClientConnectConfiguration, PayloadConnectConfigurationImpl(configurationState) {
52+
override val mimeType: MimeTypeClientConnectConfigurationImpl =
53+
MimeTypeClientConnectConfigurationImpl(configurationState)
54+
}
55+
56+
internal class PayloadServerConnectConfigurationImpl(
57+
configurationState: ConfigurationState,
58+
metadataMimeType: MimeTypeWithName,
59+
dataMimeType: MimeTypeWithName,
60+
) : PayloadServerConnectConfiguration, PayloadConnectConfigurationImpl(configurationState) {
61+
override val mimeType: MimeTypeServerConnectConfigurationImpl =
62+
MimeTypeServerConnectConfigurationImpl(metadataMimeType, dataMimeType)
63+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.rsocket.kotlin.configuration
2+
3+
import io.ktor.utils.io.core.*
4+
import io.rsocket.kotlin.core.*
5+
import io.rsocket.kotlin.payload.*
6+
import kotlin.time.*
7+
8+
public sealed interface RSocketConfiguration {
9+
public val setup: SetupConfiguration
10+
public val payload: PayloadConfiguration
11+
public val keepAlive: KeepAliveConfiguration
12+
}
13+
14+
public sealed interface RSocketConnectConfiguration : RSocketConfiguration, ConnectConfiguration {
15+
override val setup: SetupConnectConfiguration
16+
override val payload: PayloadConnectConfiguration
17+
override val keepAlive: KeepAliveConnectConfiguration
18+
}
19+
20+
public sealed interface RSocketClientConnectConfiguration : RSocketConnectConfiguration {
21+
override val setup: SetupClientConnectConfiguration
22+
override val payload: PayloadClientConnectConfiguration
23+
override val keepAlive: KeepAliveClientConnectConfiguration
24+
25+
public val reconnect: ReconnectConfiguration
26+
}
27+
28+
public sealed interface RSocketServerConnectConfiguration : RSocketConnectConfiguration {
29+
override val setup: SetupServerConnectConfiguration
30+
override val payload: PayloadServerConnectConfiguration
31+
override val keepAlive: KeepAliveServerConnectConfiguration
32+
}
33+
34+
internal abstract class RSocketConnectConfigurationImpl : RSocketConnectConfiguration, Closeable {
35+
abstract override val setup: SetupConnectConfigurationImpl
36+
37+
override fun close() {
38+
setup.close()
39+
}
40+
}
41+
42+
internal class RSocketClientConnectConfigurationImpl(
43+
configurationState: ConfigurationState,
44+
) : RSocketClientConnectConfiguration, RSocketConnectConfigurationImpl() {
45+
override val setup: SetupClientConnectConfigurationImpl = SetupClientConnectConfigurationImpl(configurationState)
46+
override val payload: PayloadClientConnectConfigurationImpl = PayloadClientConnectConfigurationImpl(configurationState)
47+
override val keepAlive: KeepAliveClientConnectConfigurationImpl = KeepAliveClientConnectConfigurationImpl(configurationState)
48+
override val reconnect: ReconnectConfigurationImpl = ReconnectConfigurationImpl(configurationState)
49+
}
50+
51+
internal class RSocketServerConnectConfigurationImpl(
52+
configurationState: ConfigurationState,
53+
keepAliveInterval: Duration,
54+
keepAliveMaxLifetime: Duration,
55+
metadataMimeType: MimeTypeWithName,
56+
dataMimeType: MimeTypeWithName,
57+
setupPayload: Payload,
58+
) : RSocketServerConnectConfiguration, RSocketConnectConfigurationImpl() {
59+
override val setup: SetupServerConnectConfigurationImpl =
60+
SetupServerConnectConfigurationImpl(setupPayload)
61+
override val payload: PayloadServerConnectConfigurationImpl =
62+
PayloadServerConnectConfigurationImpl(configurationState, metadataMimeType, dataMimeType)
63+
override val keepAlive: KeepAliveServerConnectConfigurationImpl =
64+
KeepAliveServerConnectConfigurationImpl(keepAliveInterval, keepAliveMaxLifetime)
65+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.rsocket.kotlin.configuration
2+
3+
public sealed interface ReconnectConfiguration : ConnectConfiguration {
4+
//if connection failed, do we need reconnect?
5+
// depending on cause, f.e. if Setup error because of unsupported setup
6+
public fun reconnectOn(predicate: suspend (cause: Throwable?) -> Boolean)
7+
8+
//when connection establishment failed, f.e. due to network issue
9+
// if specified, reconnectOn will be implicitly all time equals true
10+
public fun retryWhen(predicate: suspend (cause: Throwable, attempt: Long) -> Boolean)
11+
}
12+
13+
internal class ReconnectConfigurationImpl(
14+
private val configurationState: ConfigurationState,
15+
) : ReconnectConfiguration {
16+
var reconnectOn: (suspend (cause: Throwable?) -> Boolean)? = null
17+
private set
18+
19+
var retryWhen: (suspend (cause: Throwable, attempt: Long) -> Boolean)? = null
20+
private set
21+
22+
override fun reconnectOn(predicate: suspend (cause: Throwable?) -> Boolean) {
23+
configurationState.checkNotConfigured()
24+
reconnectOn = predicate
25+
}
26+
27+
override fun retryWhen(predicate: suspend (cause: Throwable, attempt: Long) -> Boolean) {
28+
configurationState.checkNotConfigured()
29+
retryWhen = predicate
30+
}
31+
}

0 commit comments

Comments
 (0)