Skip to content

Slow flow subscriber causes backpressure even though buffer is Int.MAX_VALUE #492

@maxdroz

Description

@maxdroz

Describe the bug
When client is sending data to the server, which proxies it back via the flow, backpressure occurs even though intermediate flow on a server has extraBufferCapacity = Int.MAX_VALUE and should not produce backpressure-like behaviour.

Explanation with a sample:
We have a simple Rpc "proxy":

@Rpc
interface SampleService {
    fun receiveInt(): Flow<Int>
    suspend fun sendInt(data: Int)
}

class SampleServiceImpl : SampleService {
  private val sharedFlow = MutableSharedFlow<Int>(extraBufferCapacity = Int.MAX_VALUE)

  override fun receiveInt(): Flow<Int> {
    return sharedFlow
  }

  override suspend fun sendInt(data: Int) {
    val duration = measureTime {
      sharedFlow.emit(data)
    }
    println("Emission of item $data took $duration")
  }
}

Note MutableSharedFlow has extraBufferCapacity = Int.MAX_VALUE here which means emit will never suspend as data will always be added to a buffer.

And a client:

suspend fun main(): Unit = coroutineScope {
  launch { collectIntEvery10Seconds() }
  launch {
    delay(1000) //give collector a head start
    emitIntEvery1Second()
  }
}

private suspend fun collectIntEvery10Seconds() {
  sampleRpc.receiveInt().collect { int ->
    delay(10_000)
  }
}

private suspend fun emitIntEvery1Second() {
  var i = 0
  while (true) {
    val payload = i++
    val duration = measureTime {
      sampleRpc.sendInt(payload)
    }
    println("Emission of item $payload took $duration")
    delay(1000)
  }
}

Full sample: https://github.com/maxdroz/KrpcBugSample/tree/main

Expected behaviour:
Emitter emits item exactly each 1 second without any backpressure (since there is extraBufferCapacity = Int.MAX_VALUE on a server side).
Console outputs in both client and server would print small durations (<0.1 seconds)

Actual behaviour:
Emitter is periodically backpressured and has to wait 10-20 seconds depending on the cycle.
Console outputs in server print small durations (<0.1 seconds), but in client it sometimes takes 10-20 seconds to emit value.

What makes this even more confusing to me, no obvious delays that could be causing this backpressure are present anywhere on the server. On the server items are added to MutableSharedFlow instantaneously. The sendInt function on the server executes instantaneously, while sampleRpc.sendInt(payload) call on a client somehow sometimes takes 10-20 seconds which comes out of backpressure for slow collector.

To Reproduce
Steps to reproduce the behavior:

  1. Kotlin version - 2.2.0
  2. Gradle version - 8.14.3
  3. OS (Or at least KMP platform) - Mac OS, JVM
  4. Sample - https://github.com/maxdroz/KrpcBugSample
  5. Launch sample server - ./gradlew :server:run
  6. Launch sample client - ./gradlew :client:run
  7. Observe anomalies in execution times in console

Sample outputs:
Server:

Emission of item 0 took 250.333us
Emission of item 1 took 31.334us
Emission of item 2 took 43.583us
Emission of item 3 took 21.292us
Emission of item 4 took 48.416us
Emission of item 5 took 58.458us
Emission of item 6 took 25.291us
Emission of item 7 took 50.125us
Emission of item 8 took 30.542us
Emission of item 9 took 25.916us
Emission of item 10 took 31us
Emission of item 11 took 79.916us
Emission of item 12 took 29.25us

Client:

Emission of item 0 took 18.006ms
Emission of item 1 took 8.998559250s
Emission of item 2 took 8.997953125s
Emission of item 3 took 9.002433s
Emission of item 4 took 8.999296917s
Emission of item 5 took 8.022959ms
Emission of item 6 took 17.990416917s
Emission of item 7 took 9.021209ms
Emission of item 8 took 7.981177083s
Emission of item 9 took 9.008235s
Emission of item 10 took 18.946279500s
Emission of item 11 took 9.053777s
Emission of item 12 took 7.357333ms

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions