Skip to content

Commit a9a7929

Browse files
Reduce duplication of StreamingResponseHandlers
1 parent fbad7a5 commit a9a7929

File tree

13 files changed

+252
-185
lines changed

13 files changed

+252
-185
lines changed

core/pom.xml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@
3030
</developers>
3131
<properties>
3232
<project.rootdir>${project.basedir}/../</project.rootdir>
33-
<coverage.complexity>60%</coverage.complexity>
34-
<coverage.line>69%</coverage.line>
35-
<coverage.instruction>68%</coverage.instruction>
36-
<coverage.branch>52%</coverage.branch>
37-
<coverage.method>73%</coverage.method>
38-
<coverage.class>87%</coverage.class>
33+
<coverage.complexity>45%</coverage.complexity>
34+
<coverage.line>53%</coverage.line>
35+
<coverage.instruction>51%</coverage.instruction>
36+
<coverage.branch>35%</coverage.branch>
37+
<coverage.method>56%</coverage.method>
38+
<coverage.class>70%</coverage.class>
3939
</properties>
4040

4141
<dependencies>

core/src/main/java/com/sap/ai/sdk/core/commons/ClientResponseHandler.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,19 @@ public class ClientResponseHandler<T, E extends ClientException>
3838
@Nonnull private final BiFunction<String, Throwable, E> exceptionType;
3939

4040
/** The parses for JSON responses, will be private once we can remove mixins */
41-
@Nonnull public ObjectMapper JACKSON = getDefaultObjectMapper();
41+
@Nonnull private ObjectMapper JACKSON = getDefaultObjectMapper();
42+
43+
/**
44+
* Set the {@link ObjectMapper} to use for parsing JSON responses.
45+
*
46+
* @param jackson The {@link ObjectMapper} to use
47+
*/
48+
@Beta
49+
@Nonnull
50+
public ClientResponseHandler<T, E> objectMapper(@Nonnull final ObjectMapper jackson) {
51+
JACKSON = jackson;
52+
return this;
53+
}
4254

4355
/**
4456
* Processes a {@link ClassicHttpResponse} and returns some value corresponding to that response.
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package com.sap.ai.sdk.core.commons;
2+
3+
import static com.sap.ai.sdk.core.JacksonConfiguration.getDefaultObjectMapper;
4+
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import java.io.IOException;
7+
import java.util.function.BiFunction;
8+
import java.util.stream.Stream;
9+
import javax.annotation.Nonnull;
10+
import lombok.extern.slf4j.Slf4j;
11+
import org.apache.hc.core5.http.ClassicHttpResponse;
12+
13+
/**
14+
* Parse incoming JSON responses and handles any errors. For internal use only.
15+
*
16+
* @param <D> The type of the response.
17+
* @param <E> The type of the exception to throw.
18+
* @since 1.1.0
19+
*/
20+
@Slf4j
21+
public class ClientStreamingHandler<D extends StreamedDelta, E extends ClientException> {
22+
23+
@Nonnull private final Class<D> deltaType;
24+
@Nonnull private final ClientResponseHandler<? extends ClientError, E> errorHandler;
25+
@Nonnull private final BiFunction<String, Throwable, E> exceptionType;
26+
27+
/** The parses for JSON responses, will be private once we can remove mixins */
28+
@Nonnull private ObjectMapper JACKSON = getDefaultObjectMapper();
29+
30+
/**
31+
* Set the {@link ObjectMapper} to use for parsing JSON responses.
32+
*
33+
* @param jackson The {@link ObjectMapper} to use
34+
*/
35+
@Nonnull
36+
public ClientStreamingHandler<D, E> objectMapper(@Nonnull final ObjectMapper jackson) {
37+
JACKSON = jackson;
38+
errorHandler.objectMapper(jackson);
39+
return this;
40+
}
41+
42+
/**
43+
* Creates a new instance of the {@link ClientStreamingHandler}.
44+
*
45+
* @param deltaType The type of the response.
46+
* @param errorType The type of the error.
47+
* @param exceptionType The type of the exception to throw.
48+
*/
49+
public ClientStreamingHandler(
50+
@Nonnull final Class<D> deltaType,
51+
@Nonnull final Class<? extends ClientError> errorType,
52+
@Nonnull final BiFunction<String, Throwable, E> exceptionType) {
53+
this.deltaType = deltaType;
54+
this.exceptionType = exceptionType;
55+
this.errorHandler = new ClientResponseHandler<>(errorType, errorType, exceptionType);
56+
}
57+
58+
/**
59+
* Processes a {@link ClassicHttpResponse} and returns a {@link Stream} of deltas corresponding to
60+
* that response.
61+
*
62+
* @param response The response to process
63+
* @return A {@link Stream} of a model class instantiated from the response
64+
* @throws E in case of a problem or the connection was aborted
65+
*/
66+
@SuppressWarnings("PMD.CloseResource") // Stream is closed automatically when consumed
67+
@Nonnull
68+
public Stream<D> handleResponse(@Nonnull final ClassicHttpResponse response) throws E {
69+
if (response.getCode() >= 300) {
70+
errorHandler.buildExceptionAndThrow(response);
71+
}
72+
return IterableStreamConverter.lines(response.getEntity(), exceptionType)
73+
// half of the lines are empty newlines, the last line is "data: [DONE]"
74+
.filter(line -> !line.isEmpty() && !"data: [DONE]".equals(line.trim()))
75+
.peek(
76+
line -> {
77+
if (!line.startsWith("data: ")) {
78+
final String msg = "Failed to parse response";
79+
errorHandler.parseErrorAndThrow(line, exceptionType.apply(msg, null));
80+
}
81+
})
82+
.map(
83+
line -> {
84+
final String data = line.substring(5); // remove "data: "
85+
try {
86+
return JACKSON.readValue(data, deltaType);
87+
} catch (final IOException e) { // exception message e gets lost
88+
log.error("Failed to parse the following response: {}", line);
89+
throw exceptionType.apply("Failed to parse delta message: " + line, e);
90+
}
91+
});
92+
}
93+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package com.sap.ai.sdk.core.commons;
2+
3+
import static java.nio.charset.StandardCharsets.UTF_8;
4+
import static java.util.Spliterator.NONNULL;
5+
import static java.util.Spliterator.ORDERED;
6+
7+
import io.vavr.control.Try;
8+
import java.io.BufferedReader;
9+
import java.io.IOException;
10+
import java.io.InputStream;
11+
import java.io.InputStreamReader;
12+
import java.util.Iterator;
13+
import java.util.NoSuchElementException;
14+
import java.util.Spliterators;
15+
import java.util.concurrent.Callable;
16+
import java.util.function.BiFunction;
17+
import java.util.function.Function;
18+
import java.util.stream.Stream;
19+
import java.util.stream.StreamSupport;
20+
import javax.annotation.Nonnull;
21+
import javax.annotation.Nullable;
22+
import lombok.AccessLevel;
23+
import lombok.RequiredArgsConstructor;
24+
import lombok.extern.slf4j.Slf4j;
25+
import org.apache.hc.core5.http.HttpEntity;
26+
27+
/**
28+
* Internal utility class to convert from a reading handler to {@link Iterable} and {@link Stream}.
29+
*
30+
* <p><strong>Note:</strong> All operations are sequential in nature. Thread safety is not
31+
* guaranteed.
32+
*
33+
* @param <T> Iterated item type.
34+
*/
35+
@Slf4j
36+
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
37+
class IterableStreamConverter<T> implements Iterator<T> {
38+
/** see DEFAULT_CHAR_BUFFER_SIZE in {@link BufferedReader} * */
39+
static final int BUFFER_SIZE = 8192;
40+
41+
/** Read next entry for Stream or {@code null} when no further entry can be read. */
42+
private final Callable<T> readHandler;
43+
44+
/** Close handler to be called when Stream terminated. */
45+
private final Runnable stopHandler;
46+
47+
/** Error handler to be called when Stream is interrupted. */
48+
private final Function<Exception, RuntimeException> errorHandler;
49+
50+
private boolean isDone = false;
51+
private boolean isNextFetched = false;
52+
private T next = null;
53+
54+
@SuppressWarnings("checkstyle:IllegalCatch")
55+
@Override
56+
public boolean hasNext() {
57+
if (isDone) {
58+
return false;
59+
}
60+
if (isNextFetched) {
61+
return true;
62+
}
63+
try {
64+
next = readHandler.call();
65+
isNextFetched = true;
66+
if (next == null) {
67+
isDone = true;
68+
stopHandler.run();
69+
}
70+
} catch (final Exception e) {
71+
isDone = true;
72+
stopHandler.run();
73+
log.debug("Error while reading next element.", e);
74+
throw errorHandler.apply(e);
75+
}
76+
return !isDone;
77+
}
78+
79+
@Override
80+
public T next() {
81+
if (next == null && !hasNext()) {
82+
throw new NoSuchElementException(); // normally not reached with Stream API
83+
}
84+
isNextFetched = false;
85+
return next;
86+
}
87+
88+
/**
89+
* Create a sequential Stream of lines from an HTTP response string (UTF-8). The underlying {@link
90+
* InputStream} is closed, when the resulting Stream is closed (e.g. via try-with-resources) or
91+
* when an exception occurred.
92+
*
93+
* @param entity The HTTP entity object.
94+
* @return A sequential Stream object.
95+
* @throws ClientException if the provided HTTP entity object is {@code null} or empty.
96+
*/
97+
@SuppressWarnings("PMD.CloseResource") // Stream is closed automatically when consumed
98+
@Nonnull
99+
static Stream<String> lines(
100+
@Nullable final HttpEntity entity,
101+
@Nonnull final BiFunction<String, Throwable, ? extends ClientException> exceptionType)
102+
throws ClientException {
103+
if (entity == null) {
104+
throw exceptionType.apply("Orchestration service response was empty.", null);
105+
}
106+
107+
final InputStream inputStream;
108+
try {
109+
inputStream = entity.getContent();
110+
} catch (final IOException e) {
111+
throw exceptionType.apply("Failed to read response content.", e);
112+
}
113+
114+
final var reader = new BufferedReader(new InputStreamReader(inputStream, UTF_8), BUFFER_SIZE);
115+
final Runnable closeHandler =
116+
() -> Try.run(reader::close).onFailure(e -> log.error("Could not close input stream", e));
117+
final Function<Exception, RuntimeException> errHandler =
118+
e -> exceptionType.apply("Parsing response content was interrupted.", e);
119+
120+
final var iterator = new IterableStreamConverter<>(reader::readLine, closeHandler, errHandler);
121+
final var spliterator = Spliterators.spliteratorUnknownSize(iterator, ORDERED | NONNULL);
122+
return StreamSupport.stream(spliterator, /* NOT PARALLEL */ false).onClose(closeHandler);
123+
}
124+
}

orchestration/src/main/java/com/sap/ai/sdk/orchestration/StreamedDelta.java renamed to core/src/main/java/com/sap/ai/sdk/core/commons/StreamedDelta.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.sap.ai.sdk.orchestration;
1+
package com.sap.ai.sdk.core.commons;
22

33
import javax.annotation.Nonnull;
44
import javax.annotation.Nullable;

foundation-models/openai/src/main/java/com/sap/ai/sdk/foundationmodels/openai/OpenAiClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import com.sap.ai.sdk.core.AiCoreService;
99
import com.sap.ai.sdk.core.DeploymentResolutionException;
1010
import com.sap.ai.sdk.core.commons.ClientResponseHandler;
11+
import com.sap.ai.sdk.core.commons.ClientStreamingHandler;
12+
import com.sap.ai.sdk.core.commons.StreamedDelta;
1113
import com.sap.ai.sdk.foundationmodels.openai.model.OpenAiChatCompletionDelta;
1214
import com.sap.ai.sdk.foundationmodels.openai.model.OpenAiChatCompletionOutput;
1315
import com.sap.ai.sdk.foundationmodels.openai.model.OpenAiChatCompletionParameters;
@@ -16,7 +18,6 @@
1618
import com.sap.ai.sdk.foundationmodels.openai.model.OpenAiEmbeddingOutput;
1719
import com.sap.ai.sdk.foundationmodels.openai.model.OpenAiEmbeddingParameters;
1820
import com.sap.ai.sdk.foundationmodels.openai.model.OpenAiError;
19-
import com.sap.ai.sdk.foundationmodels.openai.model.StreamedDelta;
2021
import com.sap.cloud.sdk.cloudplatform.connectivity.ApacheHttpClient5Accessor;
2122
import com.sap.cloud.sdk.cloudplatform.connectivity.DefaultHttpDestination;
2223
import com.sap.cloud.sdk.cloudplatform.connectivity.Destination;
@@ -298,7 +299,7 @@ private <D extends StreamedDelta> Stream<D> streamRequest(
298299
final BasicClassicHttpRequest request, @Nonnull final Class<D> deltaType) {
299300
try {
300301
final var client = ApacheHttpClient5Accessor.getHttpClient(destination);
301-
return new OpenAiStreamingHandler<>(deltaType)
302+
return new ClientStreamingHandler<>(deltaType, OpenAiError.class, OpenAiClientException::new)
302303
.handleResponse(client.executeOpen(null, request, null));
303304
} catch (final IOException e) {
304305
throw new OpenAiClientException("Request to OpenAI model failed", e);

foundation-models/openai/src/main/java/com/sap/ai/sdk/foundationmodels/openai/OpenAiStreamingHandler.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

foundation-models/openai/src/main/java/com/sap/ai/sdk/foundationmodels/openai/model/OpenAiChatCompletionDelta.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.annotation.JsonProperty;
44
import com.google.common.annotations.Beta;
5+
import com.sap.ai.sdk.core.commons.StreamedDelta;
56
import java.util.List;
67
import javax.annotation.Nonnull;
78
import javax.annotation.Nullable;

0 commit comments

Comments
 (0)