7070 */
7171public class OpenAiApi {
7272
73+ private static final org .slf4j .Logger logger = org .slf4j .LoggerFactory .getLogger (OpenAiApi .class );
74+
7375 public static final String HTTP_USER_AGENT_HEADER = "User-Agent" ;
7476
7577 public static final String SPRING_AI_USER_AGENT = "spring-ai" ;
@@ -116,6 +118,8 @@ public static Builder builder() {
116118
117119 private OpenAiStreamFunctionCallingHelper chunkMerger = new OpenAiStreamFunctionCallingHelper ();
118120
121+ private StreamErrorHandlingStrategy streamErrorHandlingStrategy = StreamErrorHandlingStrategy .SKIP ;
122+
119123 /**
120124 * Create a new chat completion api.
121125 * @param baseUrl api base URL.
@@ -245,16 +249,25 @@ public Flux<ChatCompletionChunk> chatCompletionStream(ChatCompletionRequest chat
245249 .headers (headers -> {
246250 headers .addAll (additionalHttpHeader );
247251 addDefaultHeadersIfMissing (headers );
248- }) // @formatter:on
252+ })
249253 .body (Mono .just (chatRequest ), ChatCompletionRequest .class )
250254 .retrieve ()
251255 .bodyToFlux (String .class )
252256 // cancels the flux stream after the "[DONE]" is received.
253257 .takeUntil (SSE_DONE_PREDICATE )
254258 // filters out the "[DONE]" message.
255259 .filter (SSE_DONE_PREDICATE .negate ())
256- .map (content -> ModelOptionsUtils .jsonToObject (content , ChatCompletionChunk .class ))
257- // Detect is the chunk is part of a streaming function call.
260+ // Parse JSON string to ChatCompletionChunk with error handling
261+ .flatMap (content -> {
262+ try {
263+ ChatCompletionChunk chunk = ModelOptionsUtils .jsonToObject (content , ChatCompletionChunk .class );
264+ return Mono .just (chunk );
265+ }
266+ catch (Exception e ) {
267+ return handleParseError (content , e );
268+ }
269+ })
270+ // Detect if the chunk is part of a streaming function call.
258271 .map (chunk -> {
259272 if (this .chunkMerger .isStreamingToolFunctionCall (chunk )) {
260273 isInsideTool .set (true );
@@ -276,12 +289,52 @@ public Flux<ChatCompletionChunk> chatCompletionStream(ChatCompletionRequest chat
276289 // Flux<Flux<ChatCompletionChunk>> -> Flux<Mono<ChatCompletionChunk>>
277290 .concatMapIterable (window -> {
278291 Mono <ChatCompletionChunk > monoChunk = window .reduce (
279- new ChatCompletionChunk (null , null , null , null , null , null , null , null ),
280292 (previous , current ) -> this .chunkMerger .merge (previous , current ));
281293 return List .of (monoChunk );
282294 })
283295 // Flux<Mono<ChatCompletionChunk>> -> Flux<ChatCompletionChunk>
284296 .flatMap (mono -> mono );
297+ // @formatter:on
298+ }
299+
300+ /**
301+ * Handles parsing errors when processing streaming chat completion chunks. The
302+ * behavior depends on the configured {@link StreamErrorHandlingStrategy}.
303+ * @param content the raw content that failed to parse
304+ * @param e the exception that occurred during parsing
305+ * @return a Mono that either emits nothing (skip), emits an error, or logs and
306+ * continues
307+ */
308+ private Mono <ChatCompletionChunk > handleParseError (String content , Exception e ) {
309+ String errorMessage = String .format (
310+ "Failed to parse ChatCompletionChunk from streaming response. "
311+ + "Raw content: [%s]. This may indicate malformed JSON from the LLM. Error: %s" ,
312+ content , e .getMessage ());
313+
314+ switch (this .streamErrorHandlingStrategy ) {
315+ case FAIL_FAST :
316+ logger .error (errorMessage , e );
317+ return Mono .error (new ChatCompletionParseException ("Invalid JSON chunk received from LLM" , content , e ));
318+
319+ case LOG_AND_CONTINUE :
320+ logger .warn (errorMessage );
321+ logger .debug ("Full stack trace for JSON parsing error:" , e );
322+ return Mono .empty ();
323+
324+ case SKIP :
325+ default :
326+ logger .warn ("Skipping invalid chunk in streaming response. Raw content: [{}]. Error: {}" , content ,
327+ e .getMessage ());
328+ return Mono .empty ();
329+ }
330+ }
331+
332+ /**
333+ * Sets the error handling strategy for streaming chat completion parsing errors.
334+ * @param strategy the strategy to use when encountering JSON parsing errors
335+ */
336+ public void setStreamErrorHandlingStrategy (StreamErrorHandlingStrategy strategy ) {
337+ this .streamErrorHandlingStrategy = strategy != null ? strategy : StreamErrorHandlingStrategy .SKIP ;
285338 }
286339
287340 /**
@@ -2006,6 +2059,7 @@ public Builder(OpenAiApi api) {
20062059 this .restClientBuilder = api .restClient != null ? api .restClient .mutate () : RestClient .builder ();
20072060 this .webClientBuilder = api .webClient != null ? api .webClient .mutate () : WebClient .builder ();
20082061 this .responseErrorHandler = api .getResponseErrorHandler ();
2062+ this .streamErrorHandlingStrategy = api .streamErrorHandlingStrategy ;
20092063 }
20102064
20112065 private String baseUrl = OpenAiApiConstants .DEFAULT_BASE_URL ;
@@ -2024,6 +2078,8 @@ public Builder(OpenAiApi api) {
20242078
20252079 private ResponseErrorHandler responseErrorHandler = RetryUtils .DEFAULT_RESPONSE_ERROR_HANDLER ;
20262080
2081+ private StreamErrorHandlingStrategy streamErrorHandlingStrategy = StreamErrorHandlingStrategy .SKIP ;
2082+
20272083 public Builder baseUrl (String baseUrl ) {
20282084 Assert .hasText (baseUrl , "baseUrl cannot be null or empty" );
20292085 this .baseUrl = baseUrl ;
@@ -2077,10 +2133,18 @@ public Builder responseErrorHandler(ResponseErrorHandler responseErrorHandler) {
20772133 return this ;
20782134 }
20792135
2136+ public Builder streamErrorHandlingStrategy (StreamErrorHandlingStrategy streamErrorHandlingStrategy ) {
2137+ Assert .notNull (streamErrorHandlingStrategy , "streamErrorHandlingStrategy cannot be null" );
2138+ this .streamErrorHandlingStrategy = streamErrorHandlingStrategy ;
2139+ return this ;
2140+ }
2141+
20802142 public OpenAiApi build () {
20812143 Assert .notNull (this .apiKey , "apiKey must be set" );
2082- return new OpenAiApi (this .baseUrl , this .apiKey , this .headers , this .completionsPath , this .embeddingsPath ,
2083- this .restClientBuilder , this .webClientBuilder , this .responseErrorHandler );
2144+ OpenAiApi api = new OpenAiApi (this .baseUrl , this .apiKey , this .headers , this .completionsPath ,
2145+ this .embeddingsPath , this .restClientBuilder , this .webClientBuilder , this .responseErrorHandler );
2146+ api .setStreamErrorHandlingStrategy (this .streamErrorHandlingStrategy );
2147+ return api ;
20842148 }
20852149
20862150 }
0 commit comments