Skip to content

Commit 9a0b263

Browse files
Reduce duplication of StreamingResponseHandlers (#258)
* Reduce duplication of StreamingResponseHandlers * Removed IterableStreamConverter.java * Javadoc * Review comments
1 parent 8f68a19 commit 9a0b263

File tree

23 files changed

+168
-347
lines changed

23 files changed

+168
-347
lines changed

core/pom.xml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@
3030
</developers>
3131
<properties>
3232
<project.rootdir>${project.basedir}/../</project.rootdir>
33-
<coverage.complexity>60%</coverage.complexity>
34-
<coverage.line>69%</coverage.line>
35-
<coverage.instruction>68%</coverage.instruction>
36-
<coverage.branch>52%</coverage.branch>
37-
<coverage.method>73%</coverage.method>
38-
<coverage.class>87%</coverage.class>
33+
<coverage.complexity>52%</coverage.complexity>
34+
<coverage.line>64%</coverage.line>
35+
<coverage.instruction>62%</coverage.instruction>
36+
<coverage.branch>47%</coverage.branch>
37+
<coverage.method>65%</coverage.method>
38+
<coverage.class>80%</coverage.class>
3939
</properties>
4040

4141
<dependencies>

core/src/main/java/com/sap/ai/sdk/core/commons/ClientError.java renamed to core/src/main/java/com/sap/ai/sdk/core/common/ClientError.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.sap.ai.sdk.core.commons;
1+
package com.sap.ai.sdk.core.common;
22

33
import com.google.common.annotations.Beta;
44
import javax.annotation.Nullable;

core/src/main/java/com/sap/ai/sdk/core/commons/ClientException.java renamed to core/src/main/java/com/sap/ai/sdk/core/common/ClientException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.sap.ai.sdk.core.commons;
1+
package com.sap.ai.sdk.core.common;
22

33
import com.google.common.annotations.Beta;
44
import lombok.experimental.StandardException;

core/src/main/java/com/sap/ai/sdk/core/commons/ClientResponseHandler.java renamed to core/src/main/java/com/sap/ai/sdk/core/common/ClientResponseHandler.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.sap.ai.sdk.core.commons;
1+
package com.sap.ai.sdk.core.common;
22

33
import static com.sap.ai.sdk.core.JacksonConfiguration.getDefaultObjectMapper;
44

@@ -33,12 +33,24 @@
3333
@RequiredArgsConstructor
3434
public class ClientResponseHandler<T, E extends ClientException>
3535
implements HttpClientResponseHandler<T> {
36-
@Nonnull private final Class<T> responseType;
36+
@Nonnull final Class<T> responseType;
3737
@Nonnull private final Class<? extends ClientError> errorType;
38-
@Nonnull private final BiFunction<String, Throwable, E> exceptionType;
38+
@Nonnull final BiFunction<String, Throwable, E> exceptionConstructor;
3939

4040
/** The parses for JSON responses, will be private once we can remove mixins */
41-
@Nonnull public ObjectMapper JACKSON = getDefaultObjectMapper();
41+
@Nonnull ObjectMapper objectMapper = getDefaultObjectMapper();
42+
43+
/**
44+
* Set the {@link ObjectMapper} to use for parsing JSON responses.
45+
*
46+
* @param jackson The {@link ObjectMapper} to use
47+
*/
48+
@Beta
49+
@Nonnull
50+
public ClientResponseHandler<T, E> objectMapper(@Nonnull final ObjectMapper jackson) {
51+
objectMapper = jackson;
52+
return this;
53+
}
4254

4355
/**
4456
* Processes a {@link ClassicHttpResponse} and returns some value corresponding to that response.
@@ -62,15 +74,15 @@ public T handleResponse(@Nonnull final ClassicHttpResponse response) throws E {
6274
private T parseResponse(@Nonnull final ClassicHttpResponse response) throws E {
6375
final HttpEntity responseEntity = response.getEntity();
6476
if (responseEntity == null) {
65-
throw exceptionType.apply("Response was empty.", null);
77+
throw exceptionConstructor.apply("Response was empty.", null);
6678
}
6779
val content = getContent(responseEntity);
6880
log.debug("Parsing response from JSON response: {}", content);
6981
try {
70-
return JACKSON.readValue(content, responseType);
82+
return objectMapper.readValue(content, responseType);
7183
} catch (final JsonProcessingException e) {
7284
log.error("Failed to parse the following response: {}", content);
73-
throw exceptionType.apply("Failed to parse response", e);
85+
throw exceptionConstructor.apply("Failed to parse response", e);
7486
}
7587
}
7688

@@ -79,7 +91,7 @@ private String getContent(@Nonnull final HttpEntity entity) {
7991
try {
8092
return EntityUtils.toString(entity, StandardCharsets.UTF_8);
8193
} catch (IOException | ParseException e) {
82-
throw exceptionType.apply("Failed to read response content.", e);
94+
throw exceptionConstructor.apply("Failed to read response content.", e);
8395
}
8496
}
8597

@@ -91,7 +103,7 @@ private String getContent(@Nonnull final HttpEntity entity) {
91103
@SuppressWarnings("PMD.CloseResource")
92104
public void buildExceptionAndThrow(@Nonnull final ClassicHttpResponse response) throws E {
93105
val exception =
94-
exceptionType.apply(
106+
exceptionConstructor.apply(
95107
"Request failed with status %s %s"
96108
.formatted(response.getCode(), response.getReasonPhrase()),
97109
null);
@@ -126,14 +138,14 @@ public void buildExceptionAndThrow(@Nonnull final ClassicHttpResponse response)
126138
*/
127139
public void parseErrorAndThrow(
128140
@Nonnull final String errorResponse, @Nonnull final E baseException) throws E {
129-
val maybeError = Try.of(() -> JACKSON.readValue(errorResponse, errorType));
141+
val maybeError = Try.of(() -> objectMapper.readValue(errorResponse, errorType));
130142
if (maybeError.isFailure()) {
131143
baseException.addSuppressed(maybeError.getCause());
132144
throw baseException;
133145
}
134146

135147
val error = Objects.requireNonNullElse(maybeError.get().getMessage(), "");
136148
val message = "%s and error message: '%s'".formatted(baseException.getMessage(), error);
137-
throw exceptionType.apply(message, baseException);
149+
throw exceptionConstructor.apply(message, baseException);
138150
}
139151
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package com.sap.ai.sdk.core.common;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import com.google.common.annotations.Beta;
5+
import java.io.IOException;
6+
import java.util.function.BiFunction;
7+
import java.util.stream.Stream;
8+
import javax.annotation.Nonnull;
9+
import lombok.extern.slf4j.Slf4j;
10+
import org.apache.hc.core5.http.ClassicHttpResponse;
11+
12+
/**
13+
* Parse incoming JSON responses and handles any errors. For internal use only.
14+
*
15+
* @param <D> The type of the response.
16+
* @param <E> The type of the exception to throw.
17+
* @since 1.2.0
18+
*/
19+
@Beta
20+
@Slf4j
21+
public class ClientStreamingHandler<D extends StreamedDelta, E extends ClientException>
22+
extends ClientResponseHandler<D, E> {
23+
24+
/**
25+
* Set the {@link ObjectMapper} to use for parsing JSON responses.
26+
*
27+
* @param jackson The {@link ObjectMapper} to use
28+
*/
29+
@Nonnull
30+
public ClientStreamingHandler<D, E> objectMapper(@Nonnull final ObjectMapper jackson) {
31+
super.objectMapper(jackson);
32+
return this;
33+
}
34+
35+
/**
36+
* Creates a new instance of the {@link ClientStreamingHandler}.
37+
*
38+
* @param deltaType The type of the response.
39+
* @param errorType The type of the error.
40+
* @param exceptionType The type of the exception to throw.
41+
*/
42+
public ClientStreamingHandler(
43+
@Nonnull final Class<D> deltaType,
44+
@Nonnull final Class<? extends ClientError> errorType,
45+
@Nonnull final BiFunction<String, Throwable, E> exceptionType) {
46+
super(deltaType, errorType, exceptionType);
47+
}
48+
49+
/**
50+
* Processes a {@link ClassicHttpResponse} and returns a {@link Stream} of deltas corresponding to
51+
* that response.
52+
*
53+
* @param response The response to process
54+
* @return A {@link Stream} of a model class instantiated from the response
55+
* @throws E in case of a problem or the connection was aborted
56+
*/
57+
@SuppressWarnings("PMD.CloseResource") // Stream is closed automatically when consumed
58+
@Nonnull
59+
public Stream<D> handleStreamingResponse(@Nonnull final ClassicHttpResponse response) throws E {
60+
if (response.getCode() >= 300) {
61+
super.buildExceptionAndThrow(response);
62+
}
63+
return IterableStreamConverter.lines(response.getEntity(), exceptionConstructor)
64+
// half of the lines are empty newlines, the last line is "data: [DONE]"
65+
.filter(line -> !line.isEmpty() && !"data: [DONE]".equals(line.trim()))
66+
.peek(
67+
line -> {
68+
if (!line.startsWith("data: ")) {
69+
final String msg = "Failed to parse response";
70+
super.parseErrorAndThrow(line, exceptionConstructor.apply(msg, null));
71+
}
72+
})
73+
.map(
74+
line -> {
75+
final String data = line.substring(5); // remove "data: "
76+
try {
77+
return objectMapper.readValue(data, responseType);
78+
} catch (final IOException e) { // exception message e gets lost
79+
log.error("Failed to parse the following response: {}", line);
80+
throw exceptionConstructor.apply("Failed to parse delta message: " + line, e);
81+
}
82+
});
83+
}
84+
}

foundation-models/openai/src/main/java/com/sap/ai/sdk/foundationmodels/openai/IterableStreamConverter.java renamed to core/src/main/java/com/sap/ai/sdk/core/common/IterableStreamConverter.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.sap.ai.sdk.foundationmodels.openai;
1+
package com.sap.ai.sdk.core.common;
22

33
import static java.nio.charset.StandardCharsets.UTF_8;
44
import static java.util.Spliterator.NONNULL;
@@ -13,6 +13,7 @@
1313
import java.util.NoSuchElementException;
1414
import java.util.Spliterators;
1515
import java.util.concurrent.Callable;
16+
import java.util.function.BiFunction;
1617
import java.util.function.Function;
1718
import java.util.stream.Stream;
1819
import java.util.stream.StreamSupport;
@@ -90,28 +91,32 @@ public T next() {
9091
* when an exception occurred.
9192
*
9293
* @param entity The HTTP entity object.
94+
* @param exceptionType The type of the client exception to throw in case of an error.
9395
* @return A sequential Stream object.
94-
* @throws OpenAiClientException if the provided HTTP entity object is {@code null} or empty.
96+
* @throws ClientException if the provided HTTP entity object is {@code null} or empty.
9597
*/
9698
@SuppressWarnings("PMD.CloseResource") // Stream is closed automatically when consumed
9799
@Nonnull
98-
static Stream<String> lines(@Nullable final HttpEntity entity) throws OpenAiClientException {
100+
static Stream<String> lines(
101+
@Nullable final HttpEntity entity,
102+
@Nonnull final BiFunction<String, Throwable, ? extends ClientException> exceptionType)
103+
throws ClientException {
99104
if (entity == null) {
100-
throw new OpenAiClientException("OpenAI response was empty.");
105+
throw exceptionType.apply("Orchestration service response was empty.", null);
101106
}
102107

103108
final InputStream inputStream;
104109
try {
105110
inputStream = entity.getContent();
106111
} catch (final IOException e) {
107-
throw new OpenAiClientException("Failed to read response content.", e);
112+
throw exceptionType.apply("Failed to read response content.", e);
108113
}
109114

110115
final var reader = new BufferedReader(new InputStreamReader(inputStream, UTF_8), BUFFER_SIZE);
111116
final Runnable closeHandler =
112117
() -> Try.run(reader::close).onFailure(e -> log.error("Could not close input stream", e));
113118
final Function<Exception, RuntimeException> errHandler =
114-
e -> new OpenAiClientException("Parsing response content was interrupted.", e);
119+
e -> exceptionType.apply("Parsing response content was interrupted.", e);
115120

116121
final var iterator = new IterableStreamConverter<>(reader::readLine, closeHandler, errHandler);
117122
final var spliterator = Spliterators.spliteratorUnknownSize(iterator, ORDERED | NONNULL);

orchestration/src/main/java/com/sap/ai/sdk/orchestration/StreamedDelta.java renamed to core/src/main/java/com/sap/ai/sdk/core/common/StreamedDelta.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.sap.ai.sdk.orchestration;
1+
package com.sap.ai.sdk.core.common;
22

33
import javax.annotation.Nonnull;
44
import javax.annotation.Nullable;

foundation-models/openai/src/test/java/com/sap/ai/sdk/foundationmodels/openai/IterableStreamConverterTest.java renamed to core/src/test/java/com/sap/ai/sdk/core/common/IterableStreamConverterTest.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.sap.ai.sdk.foundationmodels.openai;
1+
package com.sap.ai.sdk.core.common;
22

33
import static org.assertj.core.api.Assertions.assertThat;
44
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -18,6 +18,7 @@
1818
import java.nio.charset.StandardCharsets;
1919
import java.util.concurrent.atomic.AtomicInteger;
2020
import lombok.SneakyThrows;
21+
import lombok.experimental.StandardException;
2122
import org.apache.hc.core5.http.ContentType;
2223
import org.apache.hc.core5.http.io.entity.InputStreamEntity;
2324
import org.junit.jupiter.api.DisplayName;
@@ -33,7 +34,7 @@ void testLines() {
3334
final var inputStream = spy(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)));
3435
final var entity = new InputStreamEntity(inputStream, ContentType.TEXT_PLAIN);
3536

36-
final var sut = IterableStreamConverter.lines(entity);
37+
final var sut = IterableStreamConverter.lines(entity, TestClientException::new);
3738
verify(inputStream, never()).read();
3839
verify(inputStream, never()).read(any());
3940
verify(inputStream, never()).read(any(), anyInt(), anyInt());
@@ -69,7 +70,7 @@ void testLinesFindFirst() {
6970

7071
final var entity = new InputStreamEntity(inputStream, ContentType.TEXT_PLAIN);
7172

72-
final var sut = IterableStreamConverter.lines(entity);
73+
final var sut = IterableStreamConverter.lines(entity, TestClientException::new);
7374
assertThat(sut.findFirst()).contains("Foo Bar");
7475
verify(inputStream, times(1)).read(any(), anyInt(), anyInt());
7576
verify(inputStream, never()).close();
@@ -93,14 +94,17 @@ void testLinesThrows() {
9394

9495
final var entity = new InputStreamEntity(inputStream, ContentType.TEXT_PLAIN);
9596

96-
final var sut = IterableStreamConverter.lines(entity);
97+
final var sut = IterableStreamConverter.lines(entity, TestClientException::new);
9798
assertThatThrownBy(sut::count)
98-
.isInstanceOf(OpenAiClientException.class)
99+
.isInstanceOf(TestClientException.class)
99100
.hasMessage("Parsing response content was interrupted.")
100101
.cause()
101102
.isInstanceOf(IOException.class)
102103
.hasMessage("Ups!");
103104
verify(inputStream, times(2)).read(any(), anyInt(), anyInt());
104105
verify(inputStream, times(1)).close();
105106
}
107+
108+
@StandardException
109+
public static class TestClientException extends ClientException {}
106110
}

foundation-models/openai/pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@
3333
</developers>
3434
<properties>
3535
<project.rootdir>${project.basedir}/../../</project.rootdir>
36-
<coverage.complexity>70%</coverage.complexity>
37-
<coverage.line>76%</coverage.line>
38-
<coverage.instruction>75%</coverage.instruction>
39-
<coverage.branch>66%</coverage.branch>
36+
<coverage.complexity>71%</coverage.complexity>
37+
<coverage.line>80%</coverage.line>
38+
<coverage.instruction>76%</coverage.instruction>
39+
<coverage.branch>69%</coverage.branch>
4040
<coverage.method>83%</coverage.method>
41-
<coverage.class>85%</coverage.class>
41+
<coverage.class>84%</coverage.class>
4242
</properties>
4343
<dependencies>
4444
<dependency>

foundation-models/openai/src/main/java/com/sap/ai/sdk/foundationmodels/openai/OpenAiClient.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import com.google.common.annotations.Beta;
88
import com.sap.ai.sdk.core.AiCoreService;
99
import com.sap.ai.sdk.core.DeploymentResolutionException;
10-
import com.sap.ai.sdk.core.commons.ClientResponseHandler;
10+
import com.sap.ai.sdk.core.common.ClientResponseHandler;
11+
import com.sap.ai.sdk.core.common.ClientStreamingHandler;
12+
import com.sap.ai.sdk.core.common.StreamedDelta;
1113
import com.sap.ai.sdk.foundationmodels.openai.model.OpenAiChatCompletionDelta;
1214
import com.sap.ai.sdk.foundationmodels.openai.model.OpenAiChatCompletionOutput;
1315
import com.sap.ai.sdk.foundationmodels.openai.model.OpenAiChatCompletionParameters;
@@ -16,7 +18,6 @@
1618
import com.sap.ai.sdk.foundationmodels.openai.model.OpenAiEmbeddingOutput;
1719
import com.sap.ai.sdk.foundationmodels.openai.model.OpenAiEmbeddingParameters;
1820
import com.sap.ai.sdk.foundationmodels.openai.model.OpenAiError;
19-
import com.sap.ai.sdk.foundationmodels.openai.model.StreamedDelta;
2021
import com.sap.cloud.sdk.cloudplatform.connectivity.ApacheHttpClient5Accessor;
2122
import com.sap.cloud.sdk.cloudplatform.connectivity.DefaultHttpDestination;
2223
import com.sap.cloud.sdk.cloudplatform.connectivity.Destination;
@@ -298,8 +299,8 @@ private <D extends StreamedDelta> Stream<D> streamRequest(
298299
final BasicClassicHttpRequest request, @Nonnull final Class<D> deltaType) {
299300
try {
300301
final var client = ApacheHttpClient5Accessor.getHttpClient(destination);
301-
return new OpenAiStreamingHandler<>(deltaType)
302-
.handleResponse(client.executeOpen(null, request, null));
302+
return new ClientStreamingHandler<>(deltaType, OpenAiError.class, OpenAiClientException::new)
303+
.handleStreamingResponse(client.executeOpen(null, request, null));
303304
} catch (final IOException e) {
304305
throw new OpenAiClientException("Request to OpenAI model failed", e);
305306
}

0 commit comments

Comments
 (0)