2222
2323import  java .io .IOException ;
2424import  java .util .ArrayDeque ;
25+ import  java .util .ArrayList ;
2526import  java .util .Collections ;
2627import  java .util .Deque ;
2728import  java .util .Iterator ;
29+ import  java .util .List ;
2830import  java .util .Objects ;
29- import  java .util .function .Predicate ;
3031
3132import  static  org .elasticsearch .common .xcontent .XContentParserUtils .ensureExpectedToken ;
3233import  static  org .elasticsearch .common .xcontent .XContentParserUtils .parseList ;
@@ -111,6 +112,8 @@ public class OpenAiStreamingProcessor extends DelegatingProcessor<Deque<ServerSe
111112    private  static  final  String  DELTA_FIELD  = "delta" ;
112113    private  static  final  String  CONTENT_FIELD  = "content" ;
113114    private  static  final  String  DONE_MESSAGE  = "[done]" ;
115+     private  static  final  String  REFUSAL_FIELD  = "refusal" ;
116+     private  static  final  String  TOOL_CALLS_FIELD  = "tool_calls" ;
114117
115118    @ Override 
116119    protected  void  next (Deque <ServerSentEvent > item ) throws  Exception  {
@@ -159,6 +162,10 @@ private Iterator<StreamingChatCompletionResults.Result> parse(XContentParserConf
159162
160163                ensureExpectedToken (XContentParser .Token .START_OBJECT , currentToken , parser );
161164
165+                 String  content  = null ;
166+                 String  refusal  = null ;
167+                 List <StreamingChatCompletionResults .ToolCall > toolCalls  = new  ArrayList <>();
168+ 
162169                currentToken  = parser .nextToken ();
163170
164171                // continue until the end of delta 
@@ -167,25 +174,84 @@ private Iterator<StreamingChatCompletionResults.Result> parse(XContentParserConf
167174                        parser .skipChildren ();
168175                    }
169176
170-                     if  (currentToken  == XContentParser .Token .FIELD_NAME  && parser .currentName ().equals (CONTENT_FIELD )) {
171-                         parser .nextToken ();
172-                         ensureExpectedToken (XContentParser .Token .VALUE_STRING , parser .currentToken (), parser );
173-                         var  content  = parser .text ();
174-                         consumeUntilObjectEnd (parser ); // end delta 
175-                         consumeUntilObjectEnd (parser ); // end choices 
176-                         return  content ;
177+                     if  (currentToken  == XContentParser .Token .FIELD_NAME ) {
178+                         switch  (parser .currentName ()) {
179+                             case  CONTENT_FIELD :
180+                                 parser .nextToken ();
181+                                 ensureExpectedToken (XContentParser .Token .VALUE_STRING , parser .currentToken (), parser );
182+                                 content  = parser .text ();
183+                                 break ;
184+                             case  REFUSAL_FIELD :
185+                                 parser .nextToken ();
186+                                 ensureExpectedToken (XContentParser .Token .VALUE_STRING , parser .currentToken (), parser );
187+                                 refusal  = parser .text ();
188+                                 break ;
189+                             case  TOOL_CALLS_FIELD :
190+                                 parser .nextToken ();
191+                                 ensureExpectedToken (XContentParser .Token .START_ARRAY , parser .currentToken (), parser );
192+                                 toolCalls  = parseToolCalls (parser );
193+                                 break ;
194+                         }
177195                    }
178196
179197                    currentToken  = parser .nextToken ();
180198                }
181199
200+                 consumeUntilObjectEnd (parser ); // end delta 
182201                consumeUntilObjectEnd (parser ); // end choices 
183-                 return  "" ; // stopped 
184-             }).stream ()
185-                 .filter (Objects ::nonNull )
186-                 .filter (Predicate .not (String ::isEmpty ))
187-                 .map (StreamingChatCompletionResults .Result ::new )
188-                 .iterator ();
202+ 
203+                 return  new  StreamingChatCompletionResults .Result (content , refusal , toolCalls );
204+             }).stream ().filter (Objects ::nonNull ).iterator ();
205+         }
206+     }
207+ 
208+     private  List <StreamingChatCompletionResults .ToolCall > parseToolCalls (XContentParser  parser ) throws  IOException  {
209+         List <StreamingChatCompletionResults .ToolCall > toolCalls  = new  ArrayList <>();
210+         while  (parser .nextToken () != XContentParser .Token .END_ARRAY ) {
211+             ensureExpectedToken (XContentParser .Token .START_OBJECT , parser .currentToken (), parser );
212+             int  index  = -1 ;
213+             String  id  = null ;
214+             String  functionName  = null ;
215+             String  functionArguments  = null ;
216+ 
217+             while  (parser .nextToken () != XContentParser .Token .END_OBJECT ) {
218+                 if  (parser .currentToken () == XContentParser .Token .FIELD_NAME ) {
219+                     switch  (parser .currentName ()) {
220+                         case  "index" :
221+                             parser .nextToken ();
222+                             ensureExpectedToken (XContentParser .Token .VALUE_NUMBER , parser .currentToken (), parser );
223+                             index  = parser .intValue ();
224+                             break ;
225+                         case  "id" :
226+                             parser .nextToken ();
227+                             ensureExpectedToken (XContentParser .Token .VALUE_STRING , parser .currentToken (), parser );
228+                             id  = parser .text ();
229+                             break ;
230+                         case  "function" :
231+                             parser .nextToken ();
232+                             ensureExpectedToken (XContentParser .Token .START_OBJECT , parser .currentToken (), parser );
233+                             while  (parser .nextToken () != XContentParser .Token .END_OBJECT ) {
234+                                 if  (parser .currentToken () == XContentParser .Token .FIELD_NAME ) {
235+                                     switch  (parser .currentName ()) {
236+                                         case  "name" :
237+                                             parser .nextToken ();
238+                                             ensureExpectedToken (XContentParser .Token .VALUE_STRING , parser .currentToken (), parser );
239+                                             functionName  = parser .text ();
240+                                             break ;
241+                                         case  "arguments" :
242+                                             parser .nextToken ();
243+                                             ensureExpectedToken (XContentParser .Token .VALUE_STRING , parser .currentToken (), parser );
244+                                             functionArguments  = parser .text ();
245+                                             break ;
246+                                     }
247+                                 }
248+                             }
249+                             break ;
250+                     }
251+                 }
252+             }
253+             toolCalls .add (new  StreamingChatCompletionResults .ToolCall (index , id , functionName , functionArguments ));
189254        }
255+         return  toolCalls ;
190256    }
191257}
0 commit comments