Skip to content

Commit f35de10

Browse files
feat(client): add an AsyncStreamResponse#onCompleteFuture() method (#239)
docs: add more documentation to `AsyncStreamResponse`
1 parent 891c2f0 commit f35de10

File tree

4 files changed

+185
-3
lines changed

4 files changed

+185
-3
lines changed

README.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,25 @@ client.async().chat().completions().createStreaming(params).subscribe(new AsyncS
207207
System.out.println("Something went wrong!");
208208
throw new RuntimeException(error.get());
209209
} else {
210-
System.out.println("Something went wrong!");
210+
System.out.println("No more chunks!");
211211
}
212212
}
213213
});
214+
215+
// Or use futures
216+
client.async().chat().completions().createStreaming(params)
217+
.subscribe(chunk -> {
218+
System.out.println(chunk);
219+
})
220+
.onCompleteFuture();
221+
.whenComplete((unused, error) -> {
222+
if (error != null) {
223+
System.out.println("Something went wrong!");
224+
throw new RuntimeException(error);
225+
} else {
226+
System.out.println("No more chunks!");
227+
}
228+
});
214229
```
215230

216231
Async streaming uses a dedicated per-client cached thread pool `Executor` to stream without blocking the current thread. This default is suitable for most purposes.

openai-java-core/src/main/kotlin/com/openai/core/http/AsyncStreamResponse.kt

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,37 @@ import java.util.concurrent.CompletableFuture
66
import java.util.concurrent.Executor
77
import java.util.concurrent.atomic.AtomicReference
88

9+
/**
10+
* A class providing access to an API response as an asynchronous stream of chunks of type [T],
11+
* where each chunk can be individually processed as soon as it arrives instead of waiting on the
12+
* full response.
13+
*/
914
interface AsyncStreamResponse<T> {
1015

16+
/**
17+
* Registers [handler] to be called for events of this stream.
18+
*
19+
* [handler]'s methods will be called in the client's configured or default thread pool.
20+
*
21+
* @throws IllegalStateException if [subscribe] has already been called.
22+
*/
1123
fun subscribe(handler: Handler<T>): AsyncStreamResponse<T>
1224

25+
/**
26+
* Registers [handler] to be called for events of this stream.
27+
*
28+
* [handler]'s methods will be called in the given [executor].
29+
*
30+
* @throws IllegalStateException if [subscribe] has already been called.
31+
*/
1332
fun subscribe(handler: Handler<T>, executor: Executor): AsyncStreamResponse<T>
1433

34+
/**
35+
* Returns a future that completes when a stream is fully consumed, errors, or gets closed
36+
* early.
37+
*/
38+
fun onCompleteFuture(): CompletableFuture<Void?>
39+
1540
/**
1641
* Closes this resource, relinquishing any underlying resources.
1742
*
@@ -20,10 +45,19 @@ interface AsyncStreamResponse<T> {
2045
*/
2146
fun close()
2247

48+
/** A class for handling streaming events. */
2349
fun interface Handler<in T> {
2450

51+
/** Called whenever a chunk is received. */
2552
fun onNext(value: T)
2653

54+
/**
55+
* Called when a stream is fully consumed, errors, or gets closed early.
56+
*
57+
* [onNext] will not be called once this method is called.
58+
*
59+
* @param error Non-empty if the stream completed due to an error.
60+
*/
2761
fun onComplete(error: Optional<Throwable>) {}
2862
}
2963
}
@@ -33,8 +67,17 @@ internal fun <T> CompletableFuture<StreamResponse<T>>.toAsync(streamHandlerExecu
3367
PhantomReachableClosingAsyncStreamResponse(
3468
object : AsyncStreamResponse<T> {
3569

70+
private val onCompleteFuture = CompletableFuture<Void?>()
3671
private val state = AtomicReference(State.NEW)
3772

73+
init {
74+
this@toAsync.whenComplete { _, error ->
75+
// If an error occurs from the original future, then we should resolve the
76+
// `onCompleteFuture` even if `subscribe` has not been called.
77+
error?.let(onCompleteFuture::completeExceptionally)
78+
}
79+
}
80+
3881
override fun subscribe(handler: Handler<T>): AsyncStreamResponse<T> =
3982
subscribe(handler, streamHandlerExecutor)
4083

@@ -72,20 +115,37 @@ internal fun <T> CompletableFuture<StreamResponse<T>>.toAsync(streamHandlerExecu
72115
try {
73116
handler.onComplete(Optional.ofNullable(streamError))
74117
} finally {
75-
close()
118+
try {
119+
// Notify completion via the `onCompleteFuture` as well. This is in
120+
// a separate `try-finally` block so that we still complete the
121+
// future if `handler.onComplete` throws.
122+
if (streamError == null) {
123+
onCompleteFuture.complete(null)
124+
} else {
125+
onCompleteFuture.completeExceptionally(streamError)
126+
}
127+
} finally {
128+
close()
129+
}
76130
}
77131
},
78132
executor,
79133
)
80134
}
81135

136+
override fun onCompleteFuture(): CompletableFuture<Void?> = onCompleteFuture
137+
82138
override fun close() {
83139
val previousState = state.getAndSet(State.CLOSED)
84140
if (previousState == State.CLOSED) {
85141
return
86142
}
87143

88-
this@toAsync.whenComplete { streamResponse, _ -> streamResponse?.close() }
144+
this@toAsync.whenComplete { streamResponse, error -> streamResponse?.close() }
145+
// When the stream is closed, we should always consider it closed. If it closed due
146+
// to an error, then we will have already completed the future earlier, and this
147+
// will be a no-op.
148+
onCompleteFuture.complete(null)
89149
}
90150
}
91151
)

openai-java-core/src/main/kotlin/com/openai/core/http/PhantomReachableClosingAsyncStreamResponse.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.openai.core.http
33
import com.openai.core.closeWhenPhantomReachable
44
import com.openai.core.http.AsyncStreamResponse.Handler
55
import java.util.Optional
6+
import java.util.concurrent.CompletableFuture
67
import java.util.concurrent.Executor
78

89
/**
@@ -33,6 +34,9 @@ internal class PhantomReachableClosingAsyncStreamResponse<T>(
3334
asyncStreamResponse.subscribe(TrackedHandler(handler, reachabilityTracker), executor)
3435
}
3536

37+
override fun onCompleteFuture(): CompletableFuture<Void?> =
38+
asyncStreamResponse.onCompleteFuture()
39+
3640
override fun close() = asyncStreamResponse.close()
3741
}
3842

openai-java-core/src/test/kotlin/com/openai/core/http/AsyncStreamResponseTest.kt

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,109 @@ internal class AsyncStreamResponseTest {
131131
verify(executor, times(1)).execute(any())
132132
}
133133

134+
@Test
135+
fun onCompleteFuture_whenStreamResponseFutureNotCompleted_onCompleteFutureNotCompleted() {
136+
val future = CompletableFuture<StreamResponse<String>>()
137+
val asyncStreamResponse = future.toAsync(executor)
138+
139+
val onCompletableFuture = asyncStreamResponse.onCompleteFuture()
140+
141+
assertThat(onCompletableFuture).isNotCompleted
142+
}
143+
144+
@Test
145+
fun onCompleteFuture_whenStreamResponseFutureErrors_onCompleteFutureCompletedExceptionally() {
146+
val future = CompletableFuture<StreamResponse<String>>()
147+
val asyncStreamResponse = future.toAsync(executor)
148+
future.completeExceptionally(ERROR)
149+
150+
val onCompletableFuture = asyncStreamResponse.onCompleteFuture()
151+
152+
assertThat(onCompletableFuture).isCompletedExceptionally
153+
}
154+
155+
@Test
156+
fun onCompleteFuture_whenStreamResponseFutureCompletedButStillStreaming_onCompleteFutureNotCompleted() {
157+
val future = CompletableFuture<StreamResponse<String>>()
158+
val asyncStreamResponse = future.toAsync(executor)
159+
future.complete(streamResponse)
160+
161+
val onCompletableFuture = asyncStreamResponse.onCompleteFuture()
162+
163+
assertThat(onCompletableFuture).isNotCompleted
164+
}
165+
166+
@Test
167+
fun onCompleteFuture_whenStreamResponseFutureCompletedAndStreamErrors_onCompleteFutureCompletedExceptionally() {
168+
val future = CompletableFuture<StreamResponse<String>>()
169+
val asyncStreamResponse = future.toAsync(executor)
170+
asyncStreamResponse.subscribe(handler)
171+
future.complete(erroringStreamResponse)
172+
173+
val onCompletableFuture = asyncStreamResponse.onCompleteFuture()
174+
175+
assertThat(onCompletableFuture).isCompletedExceptionally
176+
}
177+
178+
@Test
179+
fun onCompleteFuture_whenStreamResponseFutureCompletedAndStreamCompleted_onCompleteFutureCompleted() {
180+
val future = CompletableFuture<StreamResponse<String>>()
181+
val asyncStreamResponse = future.toAsync(executor)
182+
asyncStreamResponse.subscribe(handler)
183+
future.complete(streamResponse)
184+
185+
val onCompletableFuture = asyncStreamResponse.onCompleteFuture()
186+
187+
assertThat(onCompletableFuture).isCompleted
188+
}
189+
190+
@Test
191+
fun onCompleteFuture_whenHandlerOnCompleteWithoutThrowableThrows_onCompleteFutureCompleted() {
192+
val future = CompletableFuture<StreamResponse<String>>()
193+
val asyncStreamResponse = future.toAsync(executor)
194+
asyncStreamResponse.subscribe(
195+
object : AsyncStreamResponse.Handler<String> {
196+
override fun onNext(value: String) {}
197+
198+
override fun onComplete(error: Optional<Throwable>) = throw ERROR
199+
}
200+
)
201+
future.complete(streamResponse)
202+
203+
val onCompletableFuture = asyncStreamResponse.onCompleteFuture()
204+
205+
assertThat(onCompletableFuture).isCompleted
206+
}
207+
208+
@Test
209+
fun onCompleteFuture_whenHandlerOnCompleteWithThrowableThrows_onCompleteFutureCompletedExceptionally() {
210+
val future = CompletableFuture<StreamResponse<String>>()
211+
val asyncStreamResponse = future.toAsync(executor)
212+
asyncStreamResponse.subscribe(
213+
object : AsyncStreamResponse.Handler<String> {
214+
override fun onNext(value: String) {}
215+
216+
override fun onComplete(error: Optional<Throwable>) = throw ERROR
217+
}
218+
)
219+
future.complete(erroringStreamResponse)
220+
221+
val onCompletableFuture = asyncStreamResponse.onCompleteFuture()
222+
223+
assertThat(onCompletableFuture).isCompletedExceptionally
224+
}
225+
226+
@Test
227+
fun onCompleteFuture_whenClosed_onCompleteFutureCompleted() {
228+
val future = CompletableFuture<StreamResponse<String>>()
229+
val asyncStreamResponse = future.toAsync(executor)
230+
asyncStreamResponse.close()
231+
232+
val onCompletableFuture = asyncStreamResponse.onCompleteFuture()
233+
234+
assertThat(onCompletableFuture).isCompleted
235+
}
236+
134237
@Test
135238
fun close_whenNotClosed_closesStreamResponse() {
136239
val future = CompletableFuture<StreamResponse<String>>()

0 commit comments

Comments
 (0)