Skip to content

Commit 48146ad

Browse files
committed
review comments
1 parent 224dc7a commit 48146ad

File tree

2 files changed

+72
-60
lines changed

2 files changed

+72
-60
lines changed

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 62 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import kotlinx.coroutines.flow.Flow
1212
import kotlinx.coroutines.flow.flow
1313
import kotlinx.rpc.RpcCall
1414
import kotlinx.rpc.RpcClient
15+
import kotlinx.rpc.annotations.Rpc
1516
import kotlinx.rpc.descriptor.RpcCallable
1617
import kotlinx.rpc.internal.serviceScopeOrNull
1718
import kotlinx.rpc.internal.utils.InternalRpcApi
@@ -27,7 +28,6 @@ import kotlinx.serialization.SerialFormat
2728
import kotlinx.serialization.StringFormat
2829
import kotlin.coroutines.CoroutineContext
2930
import kotlin.coroutines.cancellation.CancellationException
30-
import kotlin.error
3131

3232
@Deprecated("Use KrpcClient instead", ReplaceWith("KrpcClient"), level = DeprecationLevel.ERROR)
3333
public typealias KRPCClient = KrpcClient
@@ -343,64 +343,19 @@ public abstract class KrpcClient(
343343

344344
try {
345345
connector.subscribeToCallResponse(call.descriptor.fqName, callId) { message ->
346-
when (message) {
347-
is KrpcCallMessage.CallData -> {
348-
error("Unexpected message")
349-
}
350-
351-
is KrpcCallMessage.CallException -> {
352-
val cause = runCatching {
353-
message.cause.deserialize()
354-
}
355-
356-
val result = if (cause.isFailure) {
357-
cause.exceptionOrNull()!!
358-
} else {
359-
cause.getOrNull()!!
360-
}
361-
362-
channel.close(result)
363-
}
364-
365-
is KrpcCallMessage.CallSuccess, is KrpcCallMessage.StreamMessage -> {
366-
val value = runCatching {
367-
val serializerResult = nonSuspendingSerialFormat.serializersModule
368-
.rpcSerializerForType(callable.returnType)
369-
370-
decodeMessageData(nonSuspendingSerialFormat, serializerResult, message)
371-
}
372-
373-
@Suppress("UNCHECKED_CAST")
374-
channel.send(value.getOrNull() as T)
375-
}
376-
377-
is KrpcCallMessage.StreamFinished -> {
378-
connector.unsubscribeFromMessages(call.descriptor.fqName, callId)
379-
channel.close()
380-
}
381-
382-
is KrpcCallMessage.StreamCancel -> {
383-
connector.unsubscribeFromMessages(call.descriptor.fqName, callId)
384-
val cause = message.cause.deserialize()
385-
channel.close(cause)
386-
}
387-
}
346+
handleServerStreamingMessage(message, channel, callable, call, callId)
388347
}
389348

390-
try {
391-
while (true) {
392-
val element = channel.receiveCatching()
393-
if (element.isClosed) {
394-
val ex = element.exceptionOrNull() ?: break
395-
error(ex)
396-
}
397-
398-
if (!element.isFailure) {
399-
emit(element.getOrThrow())
400-
}
349+
while (true) {
350+
val element = channel.receiveCatching()
351+
if (element.isClosed) {
352+
val ex = element.exceptionOrNull() ?: break
353+
throw ex
354+
}
355+
356+
if (!element.isFailure) {
357+
emit(element.getOrThrow())
401358
}
402-
} catch (_: ClosedReceiveChannelException) {
403-
// ignore
404359
}
405360
} catch (e: CancellationException) {
406361
// sendCancellation is not suspending, so no need for NonCancellable
@@ -411,6 +366,57 @@ public abstract class KrpcClient(
411366
}
412367
}
413368

369+
private suspend fun <T, @Rpc R: Any> handleServerStreamingMessage(
370+
message: KrpcCallMessage,
371+
channel: Channel<T>,
372+
callable: RpcCallable<R>,
373+
call: RpcCall,
374+
callId: String,
375+
) {
376+
when (message) {
377+
is KrpcCallMessage.CallData -> {
378+
error("Unexpected message")
379+
}
380+
381+
is KrpcCallMessage.CallException -> {
382+
val cause = runCatching {
383+
message.cause.deserialize()
384+
}
385+
386+
val result = if (cause.isFailure) {
387+
cause.exceptionOrNull()!!
388+
} else {
389+
cause.getOrNull()!!
390+
}
391+
392+
channel.close(result)
393+
}
394+
395+
is KrpcCallMessage.CallSuccess, is KrpcCallMessage.StreamMessage -> {
396+
val value = runCatching {
397+
val serializerResult = nonSuspendingSerialFormat.serializersModule
398+
.rpcSerializerForType(callable.returnType)
399+
400+
decodeMessageData(nonSuspendingSerialFormat, serializerResult, message)
401+
}
402+
403+
@Suppress("UNCHECKED_CAST")
404+
channel.send(value.getOrNull() as T)
405+
}
406+
407+
is KrpcCallMessage.StreamFinished -> {
408+
connector.unsubscribeFromMessages(call.descriptor.fqName, callId)
409+
channel.close()
410+
}
411+
412+
is KrpcCallMessage.StreamCancel -> {
413+
connector.unsubscribeFromMessages(call.descriptor.fqName, callId)
414+
val cause = message.cause.deserialize()
415+
channel.close(cause)
416+
}
417+
}
418+
}
419+
414420
private suspend fun handleMessage(
415421
message: KrpcCallMessage,
416422
streamContext: LazyKrpcStreamContext,

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ internal class KrpcServerService<@Rpc T : Any>(
286286
data = stringValue,
287287
connectionId = callData.connectionId,
288288
serviceId = callData.serviceId,
289-
streamId = "1",
289+
streamId = SINGLE_STREAM_ID,
290290
)
291291
}
292292

@@ -298,7 +298,7 @@ internal class KrpcServerService<@Rpc T : Any>(
298298
data = binaryValue,
299299
connectionId = callData.connectionId,
300300
serviceId = callData.serviceId,
301-
streamId = "1",
301+
streamId = SINGLE_STREAM_ID,
302302
)
303303
}
304304

@@ -316,7 +316,7 @@ internal class KrpcServerService<@Rpc T : Any>(
316316
serviceType = descriptor.fqName,
317317
connectionId = callData.connectionId,
318318
serviceId = callData.serviceId,
319-
streamId = "1",
319+
streamId = SINGLE_STREAM_ID,
320320
)
321321
)
322322
} catch (cause: CancellationException) {
@@ -329,7 +329,7 @@ internal class KrpcServerService<@Rpc T : Any>(
329329
serviceType = descriptor.fqName,
330330
connectionId = callData.connectionId,
331331
serviceId = callData.serviceId,
332-
streamId = "1",
332+
streamId = SINGLE_STREAM_ID,
333333
cause = serializedCause,
334334
)
335335
)
@@ -356,6 +356,12 @@ internal class KrpcServerService<@Rpc T : Any>(
356356
)
357357
)
358358
}
359+
360+
companion object {
361+
// streams in non-suspend server functions are unique in each call, so no separate is in needed
362+
// this one is provided as a way to interact with the old code around streams
363+
private const val SINGLE_STREAM_ID = "1"
364+
}
359365
}
360366

361367
internal class RpcRequest(val handlerJob: Job, val streamContext: LazyKrpcStreamContext) {

0 commit comments

Comments
 (0)