@@ -4,28 +4,92 @@ class DiscourseAi::Completions::AnthropicMessageProcessor
44 class AnthropicToolCall
55 attr_reader :name , :raw_json , :id
66
7- def initialize ( name , id )
7+ def initialize ( name , id , partial_tool_calls : false )
88 @name = name
99 @id = id
1010 @raw_json = +""
11+ @tool_call = DiscourseAi ::Completions ::ToolCall . new ( id : id , name : name , parameters : { } )
12+ @streaming_parser = ToolCallProgressTracker . new ( self ) if partial_tool_calls
1113 end
1214
1315 def append ( json )
1416 @raw_json << json
17+ @streaming_parser << json if @streaming_parser
18+ end
19+
20+ def notify_progress ( key , value )
21+ @tool_call . partial = true
22+ @tool_call . parameters [ key . to_sym ] = value
23+ @has_new_data = true
24+ end
25+
26+ def has_partial?
27+ @has_new_data
28+ end
29+
30+ def partial_tool_call
31+ @has_new_data = false
32+ @tool_call
1533 end
1634
1735 def to_tool_call
1836 parameters = JSON . parse ( raw_json , symbolize_names : true )
19- DiscourseAi ::Completions ::ToolCall . new ( id : id , name : name , parameters : parameters )
37+ @tool_call . partial = false
38+ @tool_call . parameters = parameters
39+ @tool_call
40+ end
41+ end
42+
43+ class ToolCallProgressTracker
44+ attr_reader :current_key , :current_value , :tool_call
45+
46+ def initialize ( tool_call )
47+ @tool_call = tool_call
48+ @current_key = nil
49+ @current_value = nil
50+ @parser = DiscourseAi ::Completions ::JsonStreamingParser . new
51+
52+ @parser . key do |k |
53+ @current_key = k
54+ @current_value = nil
55+ end
56+
57+ @parser . value do |v |
58+ tool_call . notify_progress ( @current_key , v ) if @current_key
59+ end
60+ end
61+
62+ def <<( json )
63+ # llm could send broken json
64+ # in that case just deal with it later
65+ # don't stream
66+ return if @broken
67+
68+ begin
69+ @parser << json
70+ rescue DiscourseAi ::Utils ::ParserError
71+ @broken = true
72+ return
73+ end
74+
75+ if @parser . state == :start_string && @current_key
76+ # this is is worth notifying
77+ tool_call . notify_progress ( @current_key , @parser . buf )
78+ end
79+
80+ if @parser . state == :end_value
81+ @current_key = nil
82+ end
2083 end
2184 end
2285
2386 attr_reader :tool_calls , :input_tokens , :output_tokens
2487
25- def initialize ( streaming_mode :)
88+ def initialize ( streaming_mode :, partial_tool_calls : false )
2689 @streaming_mode = streaming_mode
2790 @tool_calls = [ ]
2891 @current_tool_call = nil
92+ @partial_tool_calls = partial_tool_calls
2993 end
3094
3195 def to_tool_calls
@@ -38,11 +102,19 @@ def process_streamed_message(parsed)
38102 tool_name = parsed . dig ( :content_block , :name )
39103 tool_id = parsed . dig ( :content_block , :id )
40104 result = @current_tool_call . to_tool_call if @current_tool_call
41- @current_tool_call = AnthropicToolCall . new ( tool_name , tool_id ) if tool_name
105+ @current_tool_call =
106+ AnthropicToolCall . new (
107+ tool_name ,
108+ tool_id ,
109+ partial_tool_calls : @partial_tool_calls ,
110+ ) if tool_name
42111 elsif parsed [ :type ] == "content_block_start" || parsed [ :type ] == "content_block_delta"
43112 if @current_tool_call
44113 tool_delta = parsed . dig ( :delta , :partial_json ) . to_s
45114 @current_tool_call . append ( tool_delta )
115+ if @current_tool_call . has_partial?
116+ result = @current_tool_call . partial_tool_call
117+ end
46118 else
47119 result = parsed . dig ( :delta , :text ) . to_s
48120 end
0 commit comments