Skip to content

[Bug] kRPC leaves open web-sockets connections after flow collection. #384

@afTrolle

Description

@afTrolle

Describe the bug

Using kRPC leaves open web-sockets connections after flow collection.

Not sure if flow collection should create a new web-socket connection or use existing rpc-connection.

Either way it shouldn't leave open web-socket connections.

To Reproduce

@Rpc
interface Service { fun flow(): Flow<Int> }

class ServiceImpl(val logger: Logger) : Service {
    override fun flow(): Flow<Int> = flow {
        var counter = 0
        while (true) {
            emit(counter++)
            delay(100)
        }
    }.onCompletion {
        logger.debug { "on flow completion" }
    }
}

fun main() {
    embeddedServer(Netty, 8080) {
        server()
        client()
    }.start(wait = true)
}

private fun Application.server() {
    install(Krpc) { serialization { json() } }
    routing {
        rpc {
            log.debug("on krpcRoute - start")
            coroutineContext.job.invokeOnCompletion {
                log.debug("on krpcRoute - completion")
            }
            registerService<Service> {
                log.debug("on service")
                ServiceImpl(application.log)
            }
        }
    }
}

private fun Application.client() = launch {
    val ktorClient = HttpClient { installKrpc { serialization { json() } } }
    val rpcClient = ktorClient.rpc("ws://127.0.0.1:8080")
    while (true) {
        withTimeoutOrNull(2.seconds) {
            rpcClient.withService<Service>().flow().collect()
        }
        log.debug("restart collection")
    }
}

When running the sample through a reverse proxy (ngrok) shows 100 open connections..

Connections    ttl   opn   rt1   rt5   p50   p90                           
               178   100   0.05  0.17  52.06  185.08                         
                                                               
HTTP Requests                                                         
-------------                                                         
                                                               
23:15:23.178 CEST GET / 101 Switching Protocols                           
23:15:21.162 CEST GET / 101 Switching Protocols                           
23:15:19.167 CEST GET / 101 Switching Protocols
...

Log-output:

DEBUG io.ktor.server.Application -- on krpcRoute - start
DEBUG io.ktor.server.Application -- on service
DEBUG io.ktor.server.Application -- restart collection
DEBUG io.ktor.server.Application -- on flow completion
DEBUG io.ktor.server.Application -- on krpcRoute - start
DEBUG io.ktor.server.Application -- restart collection
DEBUG io.ktor.server.Application -- on flow completion
DEBUG io.ktor.server.Application -- on krpcRoute - start
DEBUG io.ktor.server.Application -- restart collection
DEBUG io.ktor.server.Application -- on flow completion
DEBUG io.ktor.server.Application -- on krpcRoute - start
DEBUG io.ktor.server.Application -- restart collection
DEBUG io.ktor.server.Application -- on flow completion
DEBUG io.ktor.server.Application -- on krpcRoute - start
DEBUG io.ktor.server.Application -- restart collection
....

Steps to reproduce the behavior:

  1. kotlinx-rpc - 0.8.0
  2. ktor - 3.2.1
  3. Kotlin version - 2.2.0
  4. Gradle version - 8.13
  5. OS - MacOs
  6. Minimal reproducer in code - link
  7. slack-thread

Expected behavior
I would expect the rpc web-socket connection to handle the flow collection over one web-socket or at least that it would close the web-socket after flow is completed.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions