Skip to content

Commit 20cb326

Browse files
newtorkCharlesDuboisSAPbot-sdk-jsMatKuhra-d
authored
[Streaming] Fix issues when not using try-with-resource (#49)
* OpenAI streaming * Added homepage and error handling todo * Renamed vars * Added todos * Made stream generic, try-with resources, TEXT_EVENT_STREAM, exception refactored * Formatting * close stream correctly * Formatting * Created OpenAiStreamOutput * Formatting * Renamed stream to streamChatCompletion, Added comments * Added total output * Total output is printed * Formatting * addDelta is propagated everywhere * addDelta is propagated everywhere * forgotten addDeltas * Added jackson dependencies * Added Javadoc * Removed 1 TODO * PMD * PMD again * Added OpenAiClientTest.streamChatCompletion() * Change return type of stream, added e2e test * Added documentation * Added documentation framework-agnostic + throw if finish reason is invalid * Added error handling test * Updates from pair review / discussion * Cleanup + streamChatCompletion doesn't throw * PMD * Added errorHandling test * Apply suggestions from code review Co-authored-by: Matthias Kuhr <[email protected]> * Dependency analyze * Review comments * Make client static * Formatting * PMD * Fix tests * Removed exception constructors no args * Refactor exception message * Readme sentences * Remove superfluous call super * reset httpclient-cache and -factory after each test case * Very minor code-style improvements in test * Minor code-style in OpenAIController * Reduce README sample code * Update OpenAiStreamingHandler.java (#43) * Fix import * Initial * Format * Improve type * Added stream_options to model * Change Executor#submit() to #execute() * Change Executor#submit() to #execute() * Added usage testing * Added beautiful Javadoc to enableStreaming * typo * Fix mistake * Syntax improvement to improve API stability. * Syntax improvement to improve API stability. * Make exception types similar to BufferedReader original logic * Format * Add nonnull characteristic to mirror BufferedReader original logic * Make buffer size accessible * Add test * Add assertion on stream count * Simplify e2e code * Simplify README * Partially revert * Add assertion * Partially revert * Minor code adjustments * Replace unnecessary nested types * Merge nested type to renamed parent type * Change code to ensure our lazy `hasNext()` has no unexpected side effect * Revert removing `emitter#complete()` * Add JavaDoc; Replace VAVR type * Address PMD warnings: change exception type * Add unhappy-path test cases * Revert code change in test app * Initial migrate coverage check to pom xml * Print FULL coverage report with git diff indicator * Update foundation-models/openai/src/test/java/com/sap/ai/sdk/foundationmodels/openai/IterableStreamConverterTest.java Co-authored-by: Charles Dubois <[email protected]> * Update foundation-models/openai/src/test/java/com/sap/ai/sdk/foundationmodels/openai/IterableStreamConverterTest.java Co-authored-by: Charles Dubois <[email protected]> * Reduce redundant method * Add comment * Fix merge error * Fix JavaDoc inaccessibility warning * Improve error message --------- Co-authored-by: I538344 <[email protected]> Co-authored-by: SAP Cloud SDK Bot <[email protected]> Co-authored-by: Matthias Kuhr <[email protected]> Co-authored-by: Charles Dubois <[email protected]> Co-authored-by: Matthias Kuhr <[email protected]> Co-authored-by: Alexander Dümont <[email protected]>
1 parent a56aa4d commit 20cb326

File tree

3 files changed

+233
-37
lines changed

3 files changed

+233
-37
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package com.sap.ai.sdk.foundationmodels.openai;
2+
3+
import static java.nio.charset.StandardCharsets.UTF_8;
4+
import static java.util.Spliterator.NONNULL;
5+
import static java.util.Spliterator.ORDERED;
6+
7+
import io.vavr.control.Try;
8+
import java.io.BufferedReader;
9+
import java.io.IOException;
10+
import java.io.InputStream;
11+
import java.io.InputStreamReader;
12+
import java.util.Iterator;
13+
import java.util.NoSuchElementException;
14+
import java.util.Spliterators;
15+
import java.util.concurrent.Callable;
16+
import java.util.function.Function;
17+
import java.util.stream.Stream;
18+
import java.util.stream.StreamSupport;
19+
import javax.annotation.Nonnull;
20+
import javax.annotation.Nullable;
21+
import lombok.AccessLevel;
22+
import lombok.RequiredArgsConstructor;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.apache.hc.core5.http.HttpEntity;
25+
26+
/**
27+
* Internal utility class to convert from a reading handler to {@link Iterable} and {@link Stream}.
28+
*
29+
* <p><strong>Note:</strong> All operations are sequential in nature. Thread safety is not
30+
* guaranteed.
31+
*
32+
* @param <T> Iterated item type.
33+
*/
34+
@Slf4j
35+
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
36+
class IterableStreamConverter<T> implements Iterator<T> {
37+
/** see DEFAULT_CHAR_BUFFER_SIZE in {@link BufferedReader} * */
38+
static final int BUFFER_SIZE = 8192;
39+
40+
/** Read next entry for Stream or {@code null} when no further entry can be read. */
41+
private final Callable<T> readHandler;
42+
43+
/** Close handler to be called when Stream terminated. */
44+
private final Runnable stopHandler;
45+
46+
/** Error handler to be called when Stream is interrupted. */
47+
private final Function<Exception, RuntimeException> errorHandler;
48+
49+
private boolean isDone = false;
50+
private boolean isNextFetched = false;
51+
private T next = null;
52+
53+
@SuppressWarnings("checkstyle:IllegalCatch")
54+
@Override
55+
public boolean hasNext() {
56+
if (isDone) {
57+
return false;
58+
}
59+
if (isNextFetched) {
60+
return true;
61+
}
62+
try {
63+
next = readHandler.call();
64+
isNextFetched = true;
65+
if (next == null) {
66+
isDone = true;
67+
stopHandler.run();
68+
}
69+
} catch (final Exception e) {
70+
isDone = true;
71+
stopHandler.run();
72+
log.debug("Error while reading next element.", e);
73+
throw errorHandler.apply(e);
74+
}
75+
return !isDone;
76+
}
77+
78+
@Override
79+
public T next() {
80+
if (next == null && !hasNext()) {
81+
throw new NoSuchElementException(); // normally not reached with Stream API
82+
}
83+
isNextFetched = false;
84+
return next;
85+
}
86+
87+
/**
88+
* Create a sequential Stream of lines from an HTTP response string (UTF-8). The underlying {@link
89+
* InputStream} is closed, when the resulting Stream is closed (e.g. via try-with-resources) or
90+
* when an exception occurred.
91+
*
92+
* @param entity The HTTP entity object.
93+
* @return A sequential Stream object.
94+
* @throws OpenAiClientException if the provided HTTP entity object is {@code null} or empty.
95+
*/
96+
@SuppressWarnings("PMD.CloseResource") // Stream is closed automatically when consumed
97+
@Nonnull
98+
static Stream<String> lines(@Nullable final HttpEntity entity) throws OpenAiClientException {
99+
if (entity == null) {
100+
throw new OpenAiClientException("OpenAI response was empty.");
101+
}
102+
103+
final InputStream inputStream;
104+
try {
105+
inputStream = entity.getContent();
106+
} catch (final IOException e) {
107+
throw new OpenAiClientException("Failed to read response content.", e);
108+
}
109+
110+
final var reader = new BufferedReader(new InputStreamReader(inputStream, UTF_8), BUFFER_SIZE);
111+
final Runnable closeHandler =
112+
() -> Try.run(reader::close).onFailure(e -> log.error("Could not close input stream", e));
113+
final Function<Exception, RuntimeException> errHandler =
114+
e -> new OpenAiClientException("Parsing response content was interrupted.", e);
115+
116+
final var iterator = new IterableStreamConverter<>(reader::readLine, closeHandler, errHandler);
117+
final var spliterator = Spliterators.spliteratorUnknownSize(iterator, ORDERED | NONNULL);
118+
return StreamSupport.stream(spliterator, /* NOT PARALLEL */ false).onClose(closeHandler);
119+
}
120+
}

foundation-models/openai/src/main/java/com/sap/ai/sdk/foundationmodels/openai/OpenAiStreamingHandler.java

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,57 +5,31 @@
55
import static com.sap.ai.sdk.foundationmodels.openai.OpenAiResponseHandler.parseErrorAndThrow;
66

77
import com.sap.ai.sdk.foundationmodels.openai.model.StreamedDelta;
8-
import io.vavr.control.Try;
9-
import java.io.BufferedReader;
108
import java.io.IOException;
11-
import java.io.InputStream;
12-
import java.io.InputStreamReader;
13-
import java.nio.charset.StandardCharsets;
149
import java.util.stream.Stream;
1510
import javax.annotation.Nonnull;
1611
import lombok.RequiredArgsConstructor;
1712
import lombok.extern.slf4j.Slf4j;
1813
import org.apache.hc.core5.http.ClassicHttpResponse;
19-
import org.apache.hc.core5.http.HttpEntity;
2014

2115
@Slf4j
2216
@RequiredArgsConstructor
2317
class OpenAiStreamingHandler<D extends StreamedDelta> {
2418

2519
@Nonnull private final Class<D> deltaType;
2620

27-
@Nonnull
28-
Stream<D> handleResponse(@Nonnull final ClassicHttpResponse response)
29-
throws OpenAiClientException {
30-
if (response.getCode() >= 300) {
31-
buildExceptionAndThrow(response);
32-
}
33-
return parseResponse(response);
34-
}
35-
3621
/**
3722
* @param response The response to process
3823
* @return A {@link Stream} of a model class instantiated from the response
39-
* @author stippi
4024
*/
41-
// The stream is closed by the user of the Stream
42-
@SuppressWarnings("PMD.CloseResource")
43-
private Stream<D> parseResponse(@Nonnull final ClassicHttpResponse response)
25+
@SuppressWarnings("PMD.CloseResource") // Stream is closed automatically when consumed
26+
@Nonnull
27+
Stream<D> handleResponse(@Nonnull final ClassicHttpResponse response)
4428
throws OpenAiClientException {
45-
final HttpEntity responseEntity = response.getEntity();
46-
if (responseEntity == null) {
47-
throw new OpenAiClientException("Response from OpenAI model was empty.");
48-
}
49-
final InputStream inputStream;
50-
try {
51-
inputStream = responseEntity.getContent();
52-
} catch (IOException e) {
53-
throw new OpenAiClientException("Failed to read response content.", e);
29+
if (response.getCode() >= 300) {
30+
buildExceptionAndThrow(response);
5431
}
55-
final var br = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
56-
57-
// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
58-
return br.lines()
32+
return IterableStreamConverter.lines(response.getEntity())
5933
// half of the lines are empty newlines, the last line is "data: [DONE]"
6034
.filter(line -> !line.isEmpty() && !"data: [DONE]".equals(line.trim()))
6135
.peek(
@@ -74,10 +48,6 @@ private Stream<D> parseResponse(@Nonnull final ClassicHttpResponse response)
7448
log.error("Failed to parse the following response from OpenAI model: {}", line);
7549
throw new OpenAiClientException("Failed to parse delta message: " + line, e);
7650
}
77-
})
78-
.onClose(
79-
() ->
80-
Try.run(inputStream::close)
81-
.onFailure(e -> log.error("Could not close HTTP input stream", e)));
51+
});
8252
}
8353
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package com.sap.ai.sdk.foundationmodels.openai;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5+
import static org.mockito.ArgumentMatchers.any;
6+
import static org.mockito.ArgumentMatchers.anyInt;
7+
import static org.mockito.ArgumentMatchers.eq;
8+
import static org.mockito.Mockito.mock;
9+
import static org.mockito.Mockito.never;
10+
import static org.mockito.Mockito.spy;
11+
import static org.mockito.Mockito.times;
12+
import static org.mockito.Mockito.verify;
13+
import static org.mockito.Mockito.when;
14+
15+
import java.io.ByteArrayInputStream;
16+
import java.io.IOException;
17+
import java.io.InputStream;
18+
import java.nio.charset.StandardCharsets;
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
import lombok.SneakyThrows;
21+
import org.apache.hc.core5.http.ContentType;
22+
import org.apache.hc.core5.http.io.entity.InputStreamEntity;
23+
import org.junit.jupiter.api.DisplayName;
24+
import org.junit.jupiter.api.Test;
25+
26+
public class IterableStreamConverterTest {
27+
@SneakyThrows
28+
@Test
29+
@DisplayName("Stream is fully consumed")
30+
void testLines() {
31+
final var TEMPLATE = "THIS\nIS\nA\nTEST\n";
32+
final var input = TEMPLATE.repeat(IterableStreamConverter.BUFFER_SIZE);
33+
final var inputStream = spy(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)));
34+
final var entity = new InputStreamEntity(inputStream, ContentType.TEXT_PLAIN);
35+
36+
final var sut = IterableStreamConverter.lines(entity);
37+
verify(inputStream, never()).read();
38+
verify(inputStream, never()).read(any());
39+
verify(inputStream, never()).read(any(), anyInt(), anyInt());
40+
41+
final var streamCounter = new AtomicInteger(0);
42+
sut.peek(s -> streamCounter.incrementAndGet())
43+
.forEach(
44+
s ->
45+
assertThat(s)
46+
.containsAnyOf("THIS", "IS", "A", "TEST")
47+
.doesNotContainAnyWhitespaces());
48+
49+
assertThat(streamCounter).hasValue(IterableStreamConverter.BUFFER_SIZE * 4);
50+
verify(inputStream, times(TEMPLATE.length() + 1))
51+
.read(any(), anyInt(), eq(IterableStreamConverter.BUFFER_SIZE));
52+
verify(inputStream, times(1)).close();
53+
}
54+
55+
@SneakyThrows
56+
@Test
57+
@DisplayName("Stream may only read first entry without closing")
58+
void testLinesFindFirst() {
59+
final var TEMPLATE = "Foo Bar\n";
60+
final var inputStream = mock(InputStream.class);
61+
when(inputStream.read(any(), anyInt(), anyInt()))
62+
.thenAnswer(
63+
arg -> {
64+
byte[] ar = arg.getArgument(0, byte[].class);
65+
byte[] bytes = TEMPLATE.getBytes(StandardCharsets.UTF_8);
66+
for (int i = 0; i < ar.length; i++) ar[i] = bytes[i % bytes.length];
67+
return ar.length;
68+
});
69+
70+
final var entity = new InputStreamEntity(inputStream, ContentType.TEXT_PLAIN);
71+
72+
final var sut = IterableStreamConverter.lines(entity);
73+
assertThat(sut.findFirst()).contains("Foo Bar");
74+
verify(inputStream, times(1)).read(any(), anyInt(), anyInt());
75+
verify(inputStream, never()).close();
76+
}
77+
78+
@SneakyThrows
79+
@Test
80+
@DisplayName("Stream may close unexpectedly")
81+
void testLinesThrows() {
82+
final var TEMPLATE = "Foo Bar\n";
83+
final var inputStream = mock(InputStream.class);
84+
when(inputStream.read(any(), anyInt(), anyInt()))
85+
.thenAnswer(
86+
arg -> {
87+
byte[] ar = arg.getArgument(0, byte[].class);
88+
byte[] bytes = TEMPLATE.getBytes(StandardCharsets.UTF_8);
89+
for (int i = 0; i < ar.length; i++) ar[i] = bytes[i % bytes.length];
90+
return ar.length;
91+
})
92+
.thenThrow(new IOException("Ups!"));
93+
94+
final var entity = new InputStreamEntity(inputStream, ContentType.TEXT_PLAIN);
95+
96+
final var sut = IterableStreamConverter.lines(entity);
97+
assertThatThrownBy(sut::count)
98+
.isInstanceOf(OpenAiClientException.class)
99+
.hasMessage("Parsing response content was interrupted.")
100+
.cause()
101+
.isInstanceOf(IOException.class)
102+
.hasMessage("Ups!");
103+
verify(inputStream, times(2)).read(any(), anyInt(), anyInt());
104+
verify(inputStream, times(1)).close();
105+
}
106+
}

0 commit comments

Comments
 (0)