@@ -2,6 +2,7 @@ package com.openai.core.http
22
33import com.openai.core.closeWhenPhantomReachable
44import com.openai.core.http.AsyncStreamResponse.Handler
5+ import java.util.Optional
56import java.util.concurrent.Executor
67
78internal class PhantomReachableClosingAsyncStreamResponse <T >(
@@ -12,13 +13,30 @@ internal class PhantomReachableClosingAsyncStreamResponse<T>(
1213 }
1314
1415 override fun subscribe (handler : Handler <T >): AsyncStreamResponse <T > = apply {
15- asyncStreamResponse.subscribe(handler)
16+ asyncStreamResponse.subscribe(HandlerReferencingAsyncStreamResponse ( handler, this ) )
1617 }
1718
1819 override fun subscribe (handler : Handler <T >, executor : Executor ): AsyncStreamResponse <T > =
1920 apply {
20- asyncStreamResponse.subscribe(handler, executor)
21+ asyncStreamResponse.subscribe(
22+ HandlerReferencingAsyncStreamResponse (handler, this ),
23+ executor
24+ )
2125 }
2226
2327 override fun close () = asyncStreamResponse.close()
2428}
29+
30+ /* *
31+ * A wrapper around a `Handler` that also references an `AsyncStreamResponse` so that the latter
32+ * will not only be phantom reachable and get reclaimed early while the handler itself is reachable
33+ * and subscribed to the response.
34+ */
35+ private class HandlerReferencingAsyncStreamResponse <T >(
36+ private val handler : Handler <T >,
37+ private val asyncStreamResponse : AsyncStreamResponse <T >
38+ ) : Handler<T> {
39+ override fun onNext (value : T ) = handler.onNext(value)
40+
41+ override fun onComplete (error : Optional <Throwable >) = handler.onComplete(error)
42+ }
0 commit comments