Skip to content

Commit 965a285

Browse files
committed
KRPC-133 Don't require suspend modifier in functions returning Flow
1 parent d702ecf commit 965a285

File tree

14 files changed

+352
-51
lines changed

14 files changed

+352
-51
lines changed

compiler-plugin/compiler-plugin-backend/src/main/core/kotlinx/rpc/codegen/extension/RpcIrContext.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.rpc.codegen.extension
@@ -176,6 +176,10 @@ internal class RpcIrContext(
176176
rpcClient.namedFunction("call")
177177
}
178178

179+
val rpcClientCallServerStreaming by lazy {
180+
rpcClient.namedFunction("callServerStreaming")
181+
}
182+
179183
val provideStubContext by lazy {
180184
rpcClient.namedFunction("provideStubContext")
181185
}

compiler-plugin/compiler-plugin-backend/src/main/core/kotlinx/rpc/codegen/extension/RpcStubGenerator.kt

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.rpc.codegen.extension
@@ -530,7 +530,7 @@ internal class RpcStubGenerator(
530530
returnType = method.function.returnType
531531
modality = Modality.OPEN
532532

533-
isSuspend = true
533+
isSuspend = method.function.isSuspend
534534
}.apply {
535535
val functionThisReceiver = vsApi {
536536
stubClassThisReceiver.copyToVS(this@apply, origin = IrDeclarationOrigin.DEFINED)
@@ -550,6 +550,20 @@ internal class RpcStubGenerator(
550550
overriddenSymbols = listOf(method.function.symbol)
551551

552552
body = irBuilder(symbol).irBlockBody {
553+
if (method.function.isNonSuspendingWithFlowReturn()) {
554+
+irReturn(
555+
irRpcMethodClientCall(
556+
method = method,
557+
functionThisReceiver = functionThisReceiver,
558+
isMethodObject = isMethodObject,
559+
methodClass = methodClass,
560+
arguments = arguments,
561+
)
562+
)
563+
564+
return@irBlockBody
565+
}
566+
553567
+irReturn(
554568
irCall(
555569
callee = ctx.functions.scopedClientCall,
@@ -742,8 +756,14 @@ internal class RpcStubGenerator(
742756
methodClass: IrClass,
743757
arguments: List<IrValueParameter>,
744758
): IrCall {
759+
val callee = if (method.function.isNonSuspendingWithFlowReturn()) {
760+
ctx.functions.rpcClientCallServerStreaming.symbol
761+
} else {
762+
ctx.functions.rpcClientCall.symbol
763+
}
764+
745765
val call = irCall(
746-
callee = ctx.functions.rpcClientCall.symbol,
766+
callee = callee,
747767
type = method.function.returnType,
748768
typeArgumentsCount = 1,
749769
).apply {
@@ -1218,13 +1238,15 @@ internal class RpcStubGenerator(
12181238
* ),
12191239
* ...
12201240
* ),
1241+
* isNonSuspendFunction = !function.isSuspend,
12211242
* )
12221243
*```
12231244
*
12241245
* Where:
12251246
* - `<callable-name>` - the name of the method (field)
12261247
* - `<callable-data-type>` - a method class for a method and `FieldDataObject` for fields
1227-
* - `<callable-return-type>` - the return type for the method and the field type for a field
1248+
* - `<callable-return-type>` - the return type for the method and the field type for a field.
1249+
* For a non-suspending flow the return type is its element type
12281250
* - `<callable-invokator>` - an invokator, previously generated by [generateInvokators]
12291251
* - `<method-parameter-name-k>` - if a method, its k-th parameter name
12301252
* - `<method-parameter-type-k>` - if a method, its k-th parameter type
@@ -1253,7 +1275,16 @@ internal class RpcStubGenerator(
12531275
putValueArgument(1, irRpcTypeCall(dataType))
12541276

12551277
val returnType = when (callable) {
1256-
is ServiceDeclaration.Method -> callable.function.returnType
1278+
is ServiceDeclaration.Method -> when {
1279+
callable.function.isNonSuspendingWithFlowReturn() -> {
1280+
(callable.function.returnType as IrSimpleType).arguments.single().typeOrFail
1281+
}
1282+
1283+
else -> {
1284+
callable.function.returnType
1285+
}
1286+
}
1287+
12571288
is ServiceDeclaration.FlowField -> callable.property.getterOrFail.returnType
12581289
}
12591290

@@ -1321,9 +1352,14 @@ internal class RpcStubGenerator(
13211352
}
13221353

13231354
putValueArgument(4, arrayOfCall)
1355+
putValueArgument(5, booleanConst(callable is ServiceDeclaration.Method && !callable.function.isSuspend))
13241356
}
13251357
}
13261358

1359+
private fun IrSimpleFunction.isNonSuspendingWithFlowReturn(): Boolean {
1360+
return returnType.classOrNull == ctx.flow && !isSuspend
1361+
}
1362+
13271363
/**
13281364
* Accessor function for the `callableMap` property
13291365
* Defined in `RpcServiceDescriptor`
@@ -1525,7 +1561,7 @@ internal class RpcStubGenerator(
15251561
}
15261562

15271563
/**
1528-
* IR call of the `RpcType(KType, Array<Annotation>)` function
1564+
* IR call of the `RpcType(KType)` function
15291565
*/
15301566
private fun irRpcTypeCall(type: IrType): IrConstructorCallImpl {
15311567
return vsApi {
@@ -1644,6 +1680,13 @@ internal class RpcStubGenerator(
16441680
value = value,
16451681
)
16461682

1683+
private fun booleanConst(value: Boolean) = IrConstImpl.boolean(
1684+
startOffset = UNDEFINED_OFFSET,
1685+
endOffset = UNDEFINED_OFFSET,
1686+
type = ctx.irBuiltIns.booleanType,
1687+
value = value,
1688+
)
1689+
16471690
private fun <T> vsApi(body: VersionSpecificApi.() -> T): T {
16481691
return ctx.versionSpecificApi.body()
16491692
}

compiler-plugin/compiler-plugin-k2/src/main/core/kotlinx/rpc/codegen/StrictMode.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.rpc.codegen
@@ -52,8 +52,8 @@ fun CompilerConfiguration.strictModeAggregator(): StrictModeAggregator {
5252
stateFlow = get(StrictModeConfigurationKeys.STATE_FLOW, StrictMode.WARNING),
5353
sharedFlow = get(StrictModeConfigurationKeys.SHARED_FLOW, StrictMode.WARNING),
5454
nestedFlow = get(StrictModeConfigurationKeys.NESTED_FLOW, StrictMode.WARNING),
55-
streamScopedFunctions = get(StrictModeConfigurationKeys.STREAM_SCOPED_FUNCTIONS, StrictMode.NONE),
56-
suspendingServerStreaming = get(StrictModeConfigurationKeys.SUSPENDING_SERVER_STREAMING, StrictMode.NONE),
55+
streamScopedFunctions = get(StrictModeConfigurationKeys.STREAM_SCOPED_FUNCTIONS, StrictMode.WARNING),
56+
suspendingServerStreaming = get(StrictModeConfigurationKeys.SUSPENDING_SERVER_STREAMING, StrictMode.WARNING),
5757
notTopLevelServerFlow = get(StrictModeConfigurationKeys.NOT_TOP_LEVEL_SERVER_FLOW, StrictMode.WARNING),
5858
fields = get(StrictModeConfigurationKeys.FIELDS, StrictMode.WARNING),
5959
)

core/src/commonMain/kotlin/kotlinx/rpc/RpcClient.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.rpc
66

77
import kotlinx.coroutines.CoroutineScope
88
import kotlinx.coroutines.Deferred
9+
import kotlinx.coroutines.flow.Flow
910
import kotlin.coroutines.CoroutineContext
1011

1112
@Deprecated("Use RpcClient instead", ReplaceWith("RpcClient"), level = DeprecationLevel.ERROR)
@@ -36,8 +37,25 @@ public interface RpcClient : CoroutineScope {
3637
* that is needed to route it properly to the server.
3738
* @return actual result of the call, for example, data from the server
3839
*/
40+
@Deprecated(
41+
"This method was primarily used for fields in RPC services, which are now deprecated. " +
42+
"See https://kotlin.github.io/kotlinx-rpc/strict-mode.html fields guide for more information"
43+
)
3944
public fun <T> callAsync(serviceScope: CoroutineScope, call: RpcCall): Deferred<T>
4045

46+
/**
47+
* This method is used by generated clients to perform a call to the server.
48+
*
49+
* @param T type of the result
50+
* @param serviceScope service's coroutine scope
51+
* @param call an object that contains all required information about the called method,
52+
* that is needed to route it properly to the server.
53+
* @return actual result of the call, for example, data from the server
54+
*/
55+
public fun <T> callServerStreaming(call: RpcCall): Flow<T> {
56+
error("Non-suspending server streaming is not supported by this client")
57+
}
58+
4159
/**
4260
* Provides child [CoroutineContext] for a new [RemoteService] service stub.
4361
*

core/src/commonMain/kotlin/kotlinx/rpc/descriptor/RpcServiceDescriptor.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class RpcCallable<@Rpc T : Any>(
6262
public val returnType: RpcType,
6363
public val invokator: RpcInvokator<T>,
6464
public val parameters: Array<RpcParameter>,
65+
public val isNonSuspendFunction: Boolean,
6566
)
6667

6768
@ExperimentalRpcApi

gradle-plugin/src/main/kotlin/kotlinx/rpc/Extensions.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
@file:Suppress("unused")
@@ -64,16 +64,16 @@ open class RpcStrictModeExtension @Inject constructor(objects: ObjectFactory) {
6464
val nestedFlow: Property<RpcStrictMode> = objects.strictModeProperty()
6565

6666
/**
67-
* WIP: https://youtrack.jetbrains.com/issue/KRPC-133
68-
* Will be enabled later, when an alternative is ready.
67+
* StreamScoped functions are deprecated.
6968
*/
70-
private val streamScopedFunctions: Property<RpcStrictMode> = objects.strictModeProperty(RpcStrictMode.NONE)
69+
val streamScopedFunctions: Property<RpcStrictMode> = objects.strictModeProperty()
7170

7271
/**
73-
* WIP: https://youtrack.jetbrains.com/issue/KRPC-133
74-
* Will be enabled later, when an alternative is ready.
72+
* Suspending functions with server-streaming are deprecated in RPC.
73+
*
74+
* Consider returning a Flow in a non-suspending function.
7575
*/
76-
private val suspendingServerStreaming: Property<RpcStrictMode> = objects.strictModeProperty(RpcStrictMode.NONE)
76+
val suspendingServerStreaming: Property<RpcStrictMode> = objects.strictModeProperty()
7777

7878
/**
7979
* Not top-level flows in the return value are deprecated in RPC for streaming.

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.rpc.krpc.client
66

77
import kotlinx.atomicfu.atomic
88
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.channels.Channel
10+
import kotlinx.coroutines.channels.ClosedReceiveChannelException
11+
import kotlinx.coroutines.flow.Flow
12+
import kotlinx.coroutines.flow.flow
913
import kotlinx.rpc.RpcCall
1014
import kotlinx.rpc.RpcClient
1115
import kotlinx.rpc.descriptor.RpcCallable
@@ -23,6 +27,7 @@ import kotlinx.serialization.SerialFormat
2327
import kotlinx.serialization.StringFormat
2428
import kotlin.coroutines.CoroutineContext
2529
import kotlin.coroutines.cancellation.CancellationException
30+
import kotlin.error
2631

2732
@Deprecated("Use KrpcClient instead", ReplaceWith("KrpcClient"), level = DeprecationLevel.ERROR)
2833
public typealias KRPCClient = KrpcClient
@@ -140,6 +145,10 @@ public abstract class KrpcClient(
140145
}
141146
}
142147

148+
@Deprecated(
149+
"This method was primarily used for fields in RPC services, which are now deprecated. " +
150+
"See https://kotlin.github.io/kotlinx-rpc/strict-mode.html fields guide for more information"
151+
)
143152
override fun <T> callAsync(
144153
serviceScope: CoroutineScope,
145154
call: RpcCall,
@@ -305,6 +314,95 @@ public abstract class KrpcClient(
305314
connector.sendMessage(firstMessage)
306315
}
307316

317+
private val nonSuspendingSerialFormat = config.serialFormatInitializer.build()
318+
319+
override fun <T> callServerStreaming(call: RpcCall): Flow<T> {
320+
return flow {
321+
awaitHandshakeCompletion()
322+
323+
val id = callCounter.incrementAndGet()
324+
val callable = call.descriptor.getCallable(call.callableName)
325+
?: error("Unexpected callable '${call.callableName}' for ${call.descriptor.fqName} service")
326+
327+
val dataTypeString = callable.dataType.toString()
328+
329+
val callId = "$connectionId:$dataTypeString:$id"
330+
331+
val channel = Channel<T>()
332+
333+
val request = serializeRequest(
334+
callId = callId,
335+
call = call,
336+
callable = callable,
337+
serialFormat = nonSuspendingSerialFormat,
338+
pluginParams = mapOf(KrpcPluginKey.NON_SUSPENDING_SERVER_FLOW_MARKER to ""),
339+
)
340+
341+
connector.sendMessage(request)
342+
343+
connector.subscribeToCallResponse(call.descriptor.fqName, callId) { message ->
344+
when (message) {
345+
is KrpcCallMessage.CallData -> {
346+
error("Unexpected message")
347+
}
348+
349+
is KrpcCallMessage.CallException -> {
350+
val cause = runCatching {
351+
message.cause.deserialize()
352+
}
353+
354+
val result = if (cause.isFailure) {
355+
cause.exceptionOrNull()!!
356+
} else {
357+
cause.getOrNull()!!
358+
}
359+
360+
channel.close(result)
361+
}
362+
363+
is KrpcCallMessage.CallSuccess, is KrpcCallMessage.StreamMessage -> {
364+
val value = runCatching {
365+
val serializerResult =
366+
nonSuspendingSerialFormat.serializersModule.rpcSerializerForType(callable.returnType)
367+
368+
decodeMessageData(nonSuspendingSerialFormat, serializerResult, message)
369+
}
370+
371+
@Suppress("UNCHECKED_CAST")
372+
channel.send(value.getOrNull() as T)
373+
}
374+
375+
is KrpcCallMessage.StreamFinished -> {
376+
connector.unsubscribeFromMessages(call.descriptor.fqName, callId)
377+
channel.close()
378+
}
379+
380+
is KrpcCallMessage.StreamCancel -> {
381+
connector.unsubscribeFromMessages(call.descriptor.fqName, callId)
382+
val cause = message.cause.deserialize()
383+
channel.close(cause)
384+
}
385+
}
386+
}
387+
388+
try {
389+
while (true) {
390+
val element = channel.receiveCatching()
391+
if (element.isClosed) {
392+
val ex = element.exceptionOrNull() ?: break
393+
error(ex)
394+
}
395+
396+
if (!element.isFailure) {
397+
emit(element.getOrThrow())
398+
}
399+
}
400+
} catch (_: ClosedReceiveChannelException) {
401+
// ignore
402+
}
403+
}
404+
}
405+
308406
private suspend fun handleMessage(
309407
message: KrpcCallMessage,
310408
streamContext: LazyKrpcStreamContext,
@@ -385,6 +483,7 @@ public abstract class KrpcClient(
385483
call: RpcCall,
386484
callable: RpcCallable<*>,
387485
serialFormat: SerialFormat,
486+
pluginParams: Map<KrpcPluginKey, String> = emptyMap(),
388487
): KrpcCallMessage {
389488
val serializerData = serialFormat.serializersModule.rpcSerializerForType(callable.dataType)
390489
return when (serialFormat) {
@@ -398,6 +497,7 @@ public abstract class KrpcClient(
398497
data = stringValue,
399498
connectionId = connectionId,
400499
serviceId = call.serviceId,
500+
pluginParams = pluginParams,
401501
)
402502
}
403503

@@ -411,6 +511,7 @@ public abstract class KrpcClient(
411511
data = binaryValue,
412512
connectionId = connectionId,
413513
serviceId = call.serviceId,
514+
pluginParams = pluginParams,
414515
)
415516
}
416517

0 commit comments

Comments
 (0)