File tree Expand file tree Collapse file tree 1 file changed +28
-1
lines changed
openai-java-core/src/main/kotlin/com/openai/core/handlers Expand file tree Collapse file tree 1 file changed +28
-1
lines changed Original file line number Diff line number Diff line change @@ -16,17 +16,44 @@ internal fun <T> streamHandler(
1616 object : Handler <StreamResponse <T >> {
1717 override fun handle (response : HttpResponse ): StreamResponse <T > {
1818 val reader = response.body().bufferedReader()
19- val sequence = sequence { reader.useLines { block(it) } }.constrainOnce()
19+ val sequence =
20+ // Wrap in a `CloseableSequence` to avoid performing a read on the `reader`
21+ // after it has been closed, which would throw an `IOException`.
22+ CloseableSequence (sequence { reader.useLines { block(it) } }.constrainOnce())
2023
2124 return PhantomReachableClosingStreamResponse (
2225 object : StreamResponse <T > {
2326 override fun stream (): Stream <T > = sequence.asStream()
2427
2528 override fun close () {
29+ sequence.close()
2630 reader.close()
2731 response.close()
2832 }
2933 }
3034 )
3135 }
3236 }
37+
38+ /* *
39+ * A sequence that can be closed.
40+ *
41+ * Once [close] is called, it will not yield more elements. It will also no longer consult the
42+ * underlying [Iterator.hasNext] method.
43+ */
44+ private class CloseableSequence <T >(private val sequence : Sequence <T >) : Sequence<T> {
45+ private var isClosed: Boolean = false
46+
47+ override fun iterator (): Iterator <T > {
48+ val iterator = sequence.iterator()
49+ return object : Iterator <T > {
50+ override fun next (): T = iterator.next()
51+
52+ override fun hasNext (): Boolean = ! isClosed && iterator.hasNext()
53+ }
54+ }
55+
56+ fun close () {
57+ isClosed = true
58+ }
59+ }
You can’t perform that action at this time.
0 commit comments