Skip to content

Commit 97d3a3c

Browse files
fix(client): prevent IOException when closing stream early (#241)
1 parent f92bf99 commit 97d3a3c

File tree

1 file changed

+28
-1
lines changed

1 file changed

+28
-1
lines changed

openai-java-core/src/main/kotlin/com/openai/core/handlers/StreamHandler.kt

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,44 @@ internal fun <T> streamHandler(
1616
object : Handler<StreamResponse<T>> {
1717
override fun handle(response: HttpResponse): StreamResponse<T> {
1818
val reader = response.body().bufferedReader()
19-
val sequence = sequence { reader.useLines { block(it) } }.constrainOnce()
19+
val sequence =
20+
// Wrap in a `CloseableSequence` to avoid performing a read on the `reader`
21+
// after it has been closed, which would throw an `IOException`.
22+
CloseableSequence(sequence { reader.useLines { block(it) } }.constrainOnce())
2023

2124
return PhantomReachableClosingStreamResponse(
2225
object : StreamResponse<T> {
2326
override fun stream(): Stream<T> = sequence.asStream()
2427

2528
override fun close() {
29+
sequence.close()
2630
reader.close()
2731
response.close()
2832
}
2933
}
3034
)
3135
}
3236
}
37+
38+
/**
39+
* A sequence that can be closed.
40+
*
41+
* Once [close] is called, it will not yield more elements. It will also no longer consult the
42+
* underlying [Iterator.hasNext] method.
43+
*/
44+
private class CloseableSequence<T>(private val sequence: Sequence<T>) : Sequence<T> {
45+
private var isClosed: Boolean = false
46+
47+
override fun iterator(): Iterator<T> {
48+
val iterator = sequence.iterator()
49+
return object : Iterator<T> {
50+
override fun next(): T = iterator.next()
51+
52+
override fun hasNext(): Boolean = !isClosed && iterator.hasNext()
53+
}
54+
}
55+
56+
fun close() {
57+
isClosed = true
58+
}
59+
}

0 commit comments

Comments
 (0)