Skip to content

A closed client can still receive events #638

@Jasper-M

Description

@Jasper-M

To be more precise, onClose can be called on the underlying Fs2StreamClientCallListener while the client (and its dispatcher) are already closed. I don't have a full end-to-end example, but it comes down to this:

val someServiceResource: Resource[IO, SomeServiceFs2Grpc[IO, Unit]] = ???

someServiceResource
  .map(
    _.someStream(SomeStreamRequest(), ()).interruptAfter(1.second).compile.drain
  )
  .useEval
  .unsafeRunSync()

Output:

Jun 07, 2023 1:52:23 PM io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed@13affa0e
java.lang.IllegalStateException: dispatcher already shutdown
        at cats.effect.std.Dispatcher$$anon$2.unsafeToFutureCancelable(Dispatcher.scala:422)
        at cats.effect.std.DispatcherPlatform.unsafeRunTimed(DispatcherPlatform.scala:59)
        at cats.effect.std.DispatcherPlatform.unsafeRunTimed$(DispatcherPlatform.scala:58)
        at cats.effect.std.Dispatcher$$anon$2.unsafeRunTimed(Dispatcher.scala:317)
        at cats.effect.std.DispatcherPlatform.unsafeRunSync(DispatcherPlatform.scala:51)
        at cats.effect.std.DispatcherPlatform.unsafeRunSync$(DispatcherPlatform.scala:50)
        at cats.effect.std.Dispatcher$$anon$2.unsafeRunSync(Dispatcher.scala:317)
        at fs2.grpc.client.Fs2StreamClientCallListener.onClose(Fs2StreamClientCallListener.scala:43)
        at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:468)
        at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:432)
        at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:465)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

So interruptAfter preemptively stops the stream after which the surrounding resource is immediately closed. But onClose still gets called afterwards and invokes the already closed dispatcher.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions