Skip to content
This repository was archived by the owner on Jul 22, 2025. It is now read-only.

Commit b67568b

Browse files
committed
Implement streaming and generation of thinking tokens
This is required for claude and introduces some new concepts into prompt
1 parent 2b25e49 commit b67568b

File tree

7 files changed

+156
-14
lines changed

7 files changed

+156
-14
lines changed

lib/completions/anthropic_message_processor.rb

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@ def to_tool_call
4444
end
4545
end
4646

47-
attr_reader :tool_calls, :input_tokens, :output_tokens
47+
attr_reader :tool_calls, :input_tokens, :output_tokens, :output_thinking
4848

49-
def initialize(streaming_mode:, partial_tool_calls: false)
49+
def initialize(streaming_mode:, partial_tool_calls: false, output_thinking: false)
5050
@streaming_mode = streaming_mode
5151
@tool_calls = []
5252
@current_tool_call = nil
5353
@partial_tool_calls = partial_tool_calls
54+
@output_thinking = output_thinking
55+
@thinking = nil
5456
end
5557

5658
def to_tool_calls
@@ -69,13 +71,48 @@ def process_streamed_message(parsed)
6971
tool_id,
7072
partial_tool_calls: @partial_tool_calls,
7173
) if tool_name
74+
elsif parsed[:type] == "content_block_start" && parsed.dig(:content_block, :type) == "thinking"
75+
if @output_thinking
76+
@thinking =
77+
DiscourseAi::Completions::Thinking.new(
78+
message: +parsed.dig(:content_block, :thinking).to_s,
79+
signature: +"",
80+
partial: true,
81+
)
82+
result = @thinking.dup
83+
end
84+
elsif parsed[:type] == "content_block_delta" && parsed.dig(:delta, :type) == "thinking_delta"
85+
if @output_thinking
86+
delta = parsed.dig(:delta, :thinking)
87+
@thinking.message << delta if @thinking
88+
result = DiscourseAi::Completions::Thinking.new(message: delta, partial: true)
89+
end
90+
elsif parsed[:type] == "content_block_delta" && parsed.dig(:delta, :type) == "signature_delta"
91+
if @output_thinking
92+
@thinking.signature << parsed.dig(:delta, :signature) if @thinking
93+
end
94+
elsif parsed[:type] == "content_block_stop" && @thinking
95+
@thinking.partial = false
96+
result = @thinking
97+
@thinking = nil
7298
elsif parsed[:type] == "content_block_start" || parsed[:type] == "content_block_delta"
7399
if @current_tool_call
74100
tool_delta = parsed.dig(:delta, :partial_json).to_s
75101
@current_tool_call.append(tool_delta)
76102
result = @current_tool_call.partial_tool_call if @current_tool_call.has_partial?
103+
elsif parsed.dig(:content_block, :type) == "redacted_thinking"
104+
if @output_thinking
105+
result =
106+
DiscourseAi::Completions::Thinking.new(
107+
message: nil,
108+
signature: parsed.dig(:content_block, :data),
109+
redacted: true,
110+
)
111+
end
77112
else
78113
result = parsed.dig(:delta, :text).to_s
114+
# no need to return empty strings for streaming, no value
115+
result = nil if result == ""
79116
end
80117
elsif parsed[:type] == "content_block_stop"
81118
if @current_tool_call
@@ -105,15 +142,32 @@ def process_message(payload)
105142
content = parsed.dig(:content)
106143
if content.is_a?(Array)
107144
result =
108-
content.map do |data|
109-
if data[:type] == "tool_use"
110-
call = AnthropicToolCall.new(data[:name], data[:id])
111-
call.append(data[:input].to_json)
112-
call.to_tool_call
113-
else
114-
data[:text]
145+
content
146+
.map do |data|
147+
if data[:type] == "tool_use"
148+
call = AnthropicToolCall.new(data[:name], data[:id])
149+
call.append(data[:input].to_json)
150+
call.to_tool_call
151+
elsif data[:type] == "thinking"
152+
if @output_thinking
153+
DiscourseAi::Completions::Thinking.new(
154+
message: data[:thinking],
155+
signature: data[:signature],
156+
)
157+
end
158+
elsif data[:type] == "redacted_thinking"
159+
if @output_thinking
160+
DiscourseAi::Completions::Thinking.new(
161+
message: nil,
162+
signature: data[:data],
163+
redacted: true,
164+
)
165+
end
166+
else
167+
data[:text]
168+
end
115169
end
116-
end
170+
.compact
117171
end
118172

119173
@input_tokens = parsed.dig(:usage, :input_tokens)

lib/completions/dialects/claude.rb

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,30 @@ def tool_msg(msg)
8787
end
8888

8989
def model_msg(msg)
90-
{ role: "assistant", content: msg[:content] }
90+
if msg[:thinking] || msg[:redacted_thinking_signature]
91+
content_array = []
92+
93+
if msg[:thinking]
94+
content_array << {
95+
type: "thinking",
96+
thinking: msg[:thinking],
97+
signature: msg[:thinking_signature],
98+
}
99+
end
100+
101+
if msg[:redacted_thinking_signature]
102+
content_array << {
103+
type: "redacted_thinking",
104+
data: msg[:redacted_thinking_signature],
105+
}
106+
end
107+
108+
content_array << { type: "text", text: msg[:content] }
109+
110+
{ role: "assistant", content: content_array }
111+
else
112+
{ role: "assistant", content: msg[:content] }
113+
end
91114
end
92115

93116
def system_msg(msg)

lib/completions/endpoints/anthropic.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ def processor
123123
DiscourseAi::Completions::AnthropicMessageProcessor.new(
124124
streaming_mode: @streaming_mode,
125125
partial_tool_calls: partial_tool_calls,
126+
output_thinking: output_thinking,
126127
)
127128
end
128129

lib/completions/endpoints/base.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ module DiscourseAi
44
module Completions
55
module Endpoints
66
class Base
7-
attr_reader :partial_tool_calls
7+
attr_reader :partial_tool_calls, :output_thinking
88

99
CompletionFailed = Class.new(StandardError)
1010
# 6 minutes
@@ -67,12 +67,15 @@ def perform_completion!(
6767
feature_name: nil,
6868
feature_context: nil,
6969
partial_tool_calls: false,
70+
output_thinking: false,
7071
&blk
7172
)
7273
LlmQuota.check_quotas!(@llm_model, user)
7374
start_time = Time.now
7475

7576
@partial_tool_calls = partial_tool_calls
77+
@output_thinking = output_thinking
78+
7679
model_params = normalize_model_params(model_params)
7780
orig_blk = blk
7881

@@ -85,6 +88,7 @@ def perform_completion!(
8588
feature_name: feature_name,
8689
feature_context: feature_context,
8790
partial_tool_calls: partial_tool_calls,
91+
output_thinking: output_thinking,
8892
)
8993

9094
wrapped = result

lib/completions/llm.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ def initialize(dialect_klass, gateway_klass, llm_model, gateway: nil)
234234
# @param feature_name { String - Optional } - The feature name to use for the completion.
235235
# @param feature_context { Hash - Optional } - The feature context to use for the completion.
236236
# @param partial_tool_calls { Boolean - Optional } - If true, the completion will return partial tool calls.
237+
# @param output_thinking { Boolean - Optional } - If true, the completion will return the thinking output for thinking models.
237238
#
238239
# @param &on_partial_blk { Block - Optional } - The passed block will get called with the LLM partial response alongside a cancel function.
239240
#
@@ -250,6 +251,7 @@ def generate(
250251
feature_name: nil,
251252
feature_context: nil,
252253
partial_tool_calls: false,
254+
output_thinking: false,
253255
&partial_read_blk
254256
)
255257
self.class.record_prompt(prompt)
@@ -285,6 +287,7 @@ def generate(
285287
feature_name: feature_name,
286288
feature_context: feature_context,
287289
partial_tool_calls: partial_tool_calls,
290+
output_thinking: output_thinking,
288291
&partial_read_blk
289292
)
290293
end

lib/completions/prompt.rb

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,26 @@ def initialize(
4141
@tool_choice = tool_choice
4242
end
4343

44-
def push(type:, content:, id: nil, name: nil, upload_ids: nil)
44+
def push(
45+
type:,
46+
content:,
47+
id: nil,
48+
name: nil,
49+
upload_ids: nil,
50+
thinking: nil,
51+
thinking_signature: nil,
52+
redacted_thinking_signature: nil
53+
)
4554
return if type == :system
4655
new_message = { type: type, content: content }
4756
new_message[:name] = name.to_s if name
4857
new_message[:id] = id.to_s if id
4958
new_message[:upload_ids] = upload_ids if upload_ids
59+
new_message[:thinking] = thinking if thinking
60+
new_message[:thinking_signature] = thinking_signature if thinking_signature
61+
new_message[
62+
:redacted_thinking_signature
63+
] = redacted_thinking_signature if redacted_thinking_signature
5064

5165
validate_message(new_message)
5266
validate_turn(messages.last, new_message)
@@ -73,7 +87,16 @@ def validate_message(message)
7387
raise ArgumentError, "message type must be one of #{valid_types}"
7488
end
7589

76-
valid_keys = %i[type content id name upload_ids]
90+
valid_keys = %i[
91+
type
92+
content
93+
id
94+
name
95+
upload_ids
96+
thinking
97+
thinking_signature
98+
redacted_thinking_signature
99+
]
77100
if (invalid_keys = message.keys - valid_keys).any?
78101
raise ArgumentError, "message contains invalid keys: #{invalid_keys}"
79102
end

lib/completions/thinking.rb

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# frozen_string_literal: true
2+
3+
module DiscourseAi
4+
module Completions
5+
class Thinking
6+
attr_accessor :message, :signature, :redacted, :partial
7+
8+
def initialize(message:, signature: nil, redacted: false, partial: false)
9+
@message = message
10+
@signature = signature
11+
@redacted = redacted
12+
@partial = partial
13+
end
14+
15+
def ==(other)
16+
message == other.message && signature == other.signature && redacted == other.redacted &&
17+
partial == other.partial
18+
end
19+
20+
def dup
21+
Thinking.new(
22+
message: message.dup,
23+
signature: signature.dup,
24+
redacted: redacted,
25+
partial: partial,
26+
)
27+
end
28+
29+
def to_s
30+
"#{message} - #{signature} - #{redacted} - #{partial}"
31+
end
32+
end
33+
end
34+
end

0 commit comments

Comments
 (0)