Skip to content
This repository was archived by the owner on Jul 22, 2025. It is now read-only.

Commit 3fdd031

Browse files
committed
new cancel manager makes cancelling request much easier
1 parent 22d4624 commit 3fdd031

File tree

11 files changed

+253
-10
lines changed

11 files changed

+253
-10
lines changed

lib/ai_bot/playground.rb

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,13 @@ def reply_to(
471471

472472
redis_stream_key = "gpt_cancel:#{reply_post.id}"
473473
Discourse.redis.setex(redis_stream_key, MAX_STREAM_DELAY_SECONDS, 1)
474+
475+
context.cancel_manager = DiscourseAi::Completions::CancelManager.new
476+
context
477+
.cancel_manager
478+
.start_monitor(delay: 0.2) do
479+
context.cancel_manager.cancel! if !Discourse.redis.get(redis_stream_key)
480+
end
474481
end
475482

476483
context.skip_tool_details ||= !bot.persona.class.tool_details
@@ -568,6 +575,8 @@ def reply_to(
568575
end
569576
raise e
570577
ensure
578+
context.cancel_manager.stop_monitor if context&.cancel_manager
579+
571580
# since we are skipping validations and jobs we
572581
# may need to fix participant count
573582
if reply_post && reply_post.topic && reply_post.topic.private_message? &&
@@ -649,7 +658,7 @@ def publish_update(bot_reply_post, payload)
649658
payload,
650659
user_ids: bot_reply_post.topic.allowed_user_ids,
651660
max_backlog_size: 2,
652-
max_backlog_age: 60,
661+
max_backlog_age: MAX_STREAM_DELAY_SECONDS,
653662
)
654663
end
655664
end

lib/completions/cancel_manager.rb

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# frozen_string_literal: true
2+
3+
# special object that can be used to cancel completions and http requests
4+
module DiscourseAi
5+
module Completions
6+
class CancelManager
7+
attr_reader :cancelled
8+
attr_reader :callbacks
9+
10+
def initialize
11+
@cancelled = false
12+
@callbacks = Concurrent::Array.new
13+
@mutex = Mutex.new
14+
@monitor_thread = nil
15+
end
16+
17+
def monitor_thread
18+
@mutex.synchronize { @monitor_thread }
19+
end
20+
21+
def start_monitor(delay: 0.5, &block)
22+
@mutex.synchronize do
23+
raise "Already monitoring" if @monitor_thread
24+
raise "Expected a block" if !block
25+
26+
db = RailsMultisite::ConnectionManagement.current_db
27+
@stop_monitor = false
28+
29+
@monitor_thread =
30+
Thread.new do
31+
begin
32+
loop do
33+
@mutex.synchronize { break if @stop_monitor }
34+
sleep delay
35+
@mutex.synchronize { break if @stop_monitor }
36+
@mutex.synchronize { break if cancelled? }
37+
38+
should_cancel = false
39+
RailsMultisite::ConnectionManagement.with_connection(db) do
40+
should_cancel = block.call
41+
end
42+
43+
@mutex.synchronize { cancel! if should_cancel }
44+
45+
break if cancelled?
46+
end
47+
ensure
48+
@mutex.synchronize { @monitor_thread = nil }
49+
end
50+
end
51+
end
52+
end
53+
54+
def stop_monitor
55+
monitor_thread = nil
56+
57+
@mutex.synchronize { monitor_thread = @monitor_thread }
58+
59+
if monitor_thread
60+
@mutex.synchronize { @stop_monitor = true }
61+
# so we do not deadlock
62+
monitor_thread.wakeup
63+
monitor_thread.join(2)
64+
# should not happen
65+
if monitor_thread.alive?
66+
Rails.logger.warn("DiscourseAI: CancelManager monitor thread did not stop in time")
67+
monitor_thread.kill if monitor_thread.alive?
68+
end
69+
end
70+
end
71+
72+
def cancelled?
73+
@cancelled
74+
end
75+
76+
def add_callback(cb)
77+
@callbacks << cb
78+
end
79+
80+
def remove_callback(cb)
81+
@callbacks.delete(cb)
82+
end
83+
84+
def cancel!
85+
@cancelled = true
86+
@callbacks.each do |cb|
87+
begin
88+
cb.call
89+
rescue StandardError
90+
# ignore cause this may have already been cancelled
91+
end
92+
end
93+
end
94+
end
95+
end
96+
end

lib/completions/endpoints/base.rb

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,17 @@ def perform_completion!(
6868
feature_context: nil,
6969
partial_tool_calls: false,
7070
output_thinking: false,
71+
cancel_manager: nil,
7172
&blk
7273
)
7374
LlmQuota.check_quotas!(@llm_model, user)
7475
start_time = Time.now
7576

77+
if cancel_manager && cancel_manager.cancelled?
78+
# nothing to do
79+
return
80+
end
81+
7682
@forced_json_through_prefill = false
7783
@partial_tool_calls = partial_tool_calls
7884
@output_thinking = output_thinking
@@ -90,6 +96,7 @@ def perform_completion!(
9096
feature_context: feature_context,
9197
partial_tool_calls: partial_tool_calls,
9298
output_thinking: output_thinking,
99+
cancel_manager: cancel_manager,
93100
)
94101

95102
wrapped = result
@@ -118,6 +125,9 @@ def perform_completion!(
118125
end
119126
end
120127

128+
cancel_manager_callback = nil
129+
cancelled = false
130+
121131
FinalDestination::HTTP.start(
122132
model_uri.host,
123133
model_uri.port,
@@ -126,6 +136,14 @@ def perform_completion!(
126136
open_timeout: TIMEOUT,
127137
write_timeout: TIMEOUT,
128138
) do |http|
139+
if cancel_manager
140+
cancel_manager_callback =
141+
lambda do
142+
cancelled = true
143+
http.finish
144+
end
145+
cancel_manager.add_callback(cancel_manager_callback)
146+
end
129147
response_data = +""
130148
response_raw = +""
131149

@@ -196,7 +214,6 @@ def perform_completion!(
196214
end
197215

198216
begin
199-
cancelled = false
200217
cancel = -> do
201218
cancelled = true
202219
http.finish
@@ -224,8 +241,6 @@ def perform_completion!(
224241
partials.each { |inner_partial| blk.call(inner_partial, cancel) }
225242
end
226243
end
227-
rescue IOError, StandardError
228-
raise if !cancelled
229244
end
230245
if xml_stripper
231246
stripped = xml_stripper.finish
@@ -293,6 +308,12 @@ def perform_completion!(
293308
end
294309
end
295310
end
311+
rescue IOError, StandardError
312+
raise if !cancelled
313+
ensure
314+
if cancel_manager && cancel_manager_callback
315+
cancel_manager.remove_callback(cancel_manager_callback)
316+
end
296317
end
297318

298319
def final_log_update(log)

lib/completions/endpoints/canned_response.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ def perform_completion!(
3030
feature_name: nil,
3131
feature_context: nil,
3232
partial_tool_calls: false,
33-
output_thinking: false
33+
output_thinking: false,
34+
cancel_manager: nil
3435
)
3536
@dialect = dialect
3637
@model_params = model_params

lib/completions/endpoints/fake.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ def perform_completion!(
122122
feature_name: nil,
123123
feature_context: nil,
124124
partial_tool_calls: false,
125-
output_thinking: false
125+
output_thinking: false,
126+
cancel_manager: nil
126127
)
127128
last_call = { dialect: dialect, user: user, model_params: model_params }
128129
self.class.last_call = last_call

lib/completions/endpoints/open_ai.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def perform_completion!(
4646
feature_context: nil,
4747
partial_tool_calls: false,
4848
output_thinking: false,
49+
cancel_manager: nil,
4950
&blk
5051
)
5152
@disable_native_tools = dialect.disable_native_tools?

lib/completions/llm.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ def generate(
325325
output_thinking: false,
326326
response_format: nil,
327327
extra_model_params: nil,
328+
cancel_manager: nil,
328329
&partial_read_blk
329330
)
330331
self.class.record_prompt(
@@ -378,6 +379,7 @@ def generate(
378379
feature_context: feature_context,
379380
partial_tool_calls: partial_tool_calls,
380381
output_thinking: output_thinking,
382+
cancel_manager: cancel_manager,
381383
&partial_read_blk
382384
)
383385
end

lib/personas/bot.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def reply(context, llm_args: {}, &update_blk)
5555
unless context.is_a?(BotContext)
5656
raise ArgumentError, "context must be an instance of BotContext"
5757
end
58+
context.cancel_manager ||= DiscourseAi::Completions::CancelManager.new
5859
current_llm = llm
5960
prompt = persona.craft_prompt(context, llm: current_llm)
6061

@@ -91,6 +92,7 @@ def reply(context, llm_args: {}, &update_blk)
9192
feature_name: context.feature_name,
9293
partial_tool_calls: allow_partial_tool_calls,
9394
output_thinking: true,
95+
cancel_manager: context.cancel_manager,
9496
**llm_kwargs,
9597
) do |partial, cancel|
9698
tool =

lib/personas/bot_context.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ class BotContext
1616
:channel_id,
1717
:context_post_ids,
1818
:feature_name,
19-
:resource_url
19+
:resource_url,
20+
:cancel_manager
2021

2122
def initialize(
2223
post: nil,

lib/personas/tools/researcher.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,12 @@ def run_inference(chunk_text, goal, post, &blk)
101101
)
102102

103103
results = []
104-
llm.generate(prompt, user: post.user, feature_name: context.feature_name) do |partial|
105-
results << partial
106-
end
104+
llm.generate(
105+
prompt,
106+
user: post.user,
107+
feature_name: context.feature_name,
108+
cancel_manager: context.cancel_manager,
109+
) { |partial| results << partial }
107110

108111
blk.call(".")
109112
results.join

0 commit comments

Comments
 (0)