Skip to content
This repository was archived by the owner on Jul 22, 2025. It is now read-only.

Commit fa82a1a

Browse files
committed
FIX: cross talk when in ai helper
Previous to this change we reused channels for proofreading progress and ai helper progress The new changeset ensures each POST to stream progress gets a dedicated message bus channel This fixes a class of issues where the wrong information could be displayed to end users on subsequent proofreading or helper calls
1 parent 4ad64ed commit fa82a1a

File tree

9 files changed

+193
-85
lines changed

9 files changed

+193
-85
lines changed

app/controllers/discourse_ai/ai_helper/assistant_controller.rb

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ def stream_suggestion
124124
# otherwise we may end up streaming the data to the wrong client
125125
raise Discourse::InvalidParameters.new(:client_id) if params[:client_id].blank?
126126

127+
channel_id = next_channel_id
128+
progress_channel = "discourse_ai_helper/stream_suggestions/#{channel_id}"
129+
127130
if location == "composer"
128131
Jobs.enqueue(
129132
:stream_composer_helper,
@@ -133,6 +136,7 @@ def stream_suggestion
133136
custom_prompt: params[:custom_prompt],
134137
force_default_locale: params[:force_default_locale] || false,
135138
client_id: params[:client_id],
139+
progress_channel:,
136140
)
137141
else
138142
post_id = get_post_param!
@@ -148,10 +152,11 @@ def stream_suggestion
148152
prompt: params[:mode],
149153
custom_prompt: params[:custom_prompt],
150154
client_id: params[:client_id],
155+
progress_channel:,
151156
)
152157
end
153158

154-
render json: { success: true }, status: 200
159+
render json: { success: true, progress_channel: }, status: 200
155160
rescue DiscourseAi::Completions::Endpoints::Base::CompletionFailed
156161
render_json_error I18n.t("discourse_ai.ai_helper.errors.completion_request_failed"),
157162
status: 502
@@ -192,6 +197,18 @@ def caption_image
192197

193198
private
194199

200+
CHANNEL_ID_KEY = "discourse_ai_helper_next_channel_id"
201+
202+
def next_channel_id
203+
Discourse
204+
.redis
205+
.pipelined do |pipeline|
206+
pipeline.incr(CHANNEL_ID_KEY)
207+
pipeline.expire(CHANNEL_ID_KEY, 1.day)
208+
end
209+
.first
210+
end
211+
195212
def get_text_param!
196213
params[:text].tap { |t| raise Discourse::InvalidParameters.new(:text) if t.blank? }
197214
end

app/jobs/regular/stream_composer_helper.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ def execute(args)
99
return unless user = User.find_by(id: args[:user_id])
1010
return unless args[:text]
1111
return unless args[:client_id]
12+
return unless args[:progress_channel]
1213

1314
helper_mode = args[:prompt]
1415

1516
DiscourseAi::AiHelper::Assistant.new.stream_prompt(
1617
helper_mode,
1718
args[:text],
1819
user,
19-
"/discourse-ai/ai-helper/stream_composer_suggestion",
20+
args[:progress_channel],
2021
force_default_locale: args[:force_default_locale],
2122
client_id: args[:client_id],
2223
custom_prompt: args[:custom_prompt],

app/jobs/regular/stream_post_helper.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ def execute(args)
88
return unless post = Post.includes(:topic).find_by(id: args[:post_id])
99
return unless user = User.find_by(id: args[:user_id])
1010
return unless args[:text]
11+
return unless args[:progress_channel]
12+
return unless args[:client_id]
1113

1214
topic = post.topic
1315
reply_to = post.reply_to_post
@@ -31,8 +33,9 @@ def execute(args)
3133
helper_mode,
3234
input,
3335
user,
34-
"/discourse-ai/ai-helper/stream_suggestion/#{post.id}",
36+
args[:progress_channel],
3537
custom_prompt: args[:custom_prompt],
38+
client_id: args[:client_id],
3639
)
3740
end
3841
end

assets/javascripts/discourse/components/ai-post-helper-menu.gjs

Lines changed: 44 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import Component from "@glimmer/component";
22
import { tracked } from "@glimmer/tracking";
33
import { action } from "@ember/object";
4-
import didInsert from "@ember/render-modifiers/modifiers/did-insert";
54
import willDestroy from "@ember/render-modifiers/modifiers/will-destroy";
65
import { service } from "@ember/service";
76
import { modifier } from "ember-modifier";
@@ -43,9 +42,6 @@ export default class AiPostHelperMenu extends Component {
4342
@tracked lastSelectedOption = null;
4443
@tracked isSavingFootnote = false;
4544
@tracked supportsAddFootnote = this.args.data.supportsFastEdit;
46-
@tracked
47-
channel =
48-
`/discourse-ai/ai-helper/stream_suggestion/${this.args.data.quoteState.postId}`;
4945

5046
@tracked
5147
smoothStreamer = new SmoothStreamer(
@@ -150,19 +146,25 @@ export default class AiPostHelperMenu extends Component {
150146
return sanitize(text);
151147
}
152148

153-
@bind
149+
set progressChannel(value) {
150+
if (this._progressChannel) {
151+
this.unsubscribe();
152+
}
153+
this._progressChannel = value;
154+
this.subscribe();
155+
}
156+
154157
subscribe() {
155-
this.messageBus.subscribe(
156-
this.channel,
157-
(data) => this._updateResult(data),
158-
this.args.data.post
159-
.discourse_ai_helper_stream_suggestion_last_message_bus_id
160-
);
158+
this.messageBus.subscribe(this._progressChannel, this._updateResult);
161159
}
162160

163161
@bind
164162
unsubscribe() {
165-
this.messageBus.unsubscribe(this.channel, this._updateResult);
163+
if (!this._progressChannel) {
164+
return;
165+
}
166+
this.messageBus.unsubscribe(this._progressChannel, this._updateResult);
167+
this._progressChannel = null;
166168
}
167169

168170
@bind
@@ -182,32 +184,37 @@ export default class AiPostHelperMenu extends Component {
182184
this.lastSelectedOption = option;
183185
const streamableOptions = ["explain", "translate", "custom_prompt"];
184186

185-
if (streamableOptions.includes(option.name)) {
186-
return this._handleStreamedResult(option);
187-
} else {
188-
this._activeAiRequest = ajax("/discourse-ai/ai-helper/suggest", {
189-
method: "POST",
190-
data: {
191-
mode: option.name,
192-
text: this.args.data.quoteState.buffer,
193-
custom_prompt: this.customPromptValue,
194-
},
195-
});
196-
}
187+
try {
188+
if (streamableOptions.includes(option.name)) {
189+
const streamedResult = await this._handleStreamedResult(option);
190+
this.progressChannel = streamedResult.progress_channel;
191+
return;
192+
} else {
193+
this._activeAiRequest = ajax("/discourse-ai/ai-helper/suggest", {
194+
method: "POST",
195+
data: {
196+
mode: option.name,
197+
text: this.args.data.quoteState.buffer,
198+
custom_prompt: this.customPromptValue,
199+
},
200+
});
201+
}
197202

198-
this._activeAiRequest
199-
.then(({ suggestions }) => {
200-
this.suggestion = suggestions[0].trim();
201-
202-
if (option.name === "proofread") {
203-
return this._handleProofreadOption();
204-
}
205-
})
206-
.catch(popupAjaxError)
207-
.finally(() => {
208-
this.loading = false;
209-
this.menuState = this.MENU_STATES.result;
210-
});
203+
this._activeAiRequest
204+
.then(({ suggestions }) => {
205+
this.suggestion = suggestions[0].trim();
206+
207+
if (option.name === "proofread") {
208+
return this._handleProofreadOption();
209+
}
210+
})
211+
.finally(() => {
212+
this.loading = false;
213+
this.menuState = this.MENU_STATES.result;
214+
});
215+
} catch (error) {
216+
popupAjaxError(error);
217+
}
211218

212219
return this._activeAiRequest;
213220
}
@@ -340,7 +347,6 @@ export default class AiPostHelperMenu extends Component {
340347
{{else if (eq this.menuState this.MENU_STATES.result)}}
341348
<div
342349
class="ai-post-helper__suggestion"
343-
{{didInsert this.subscribe}}
344350
{{willDestroy this.unsubscribe}}
345351
>
346352
{{#if this.suggestion}}

assets/javascripts/discourse/components/modal/diff-modal.gjs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import Component from "@glimmer/component";
22
import { tracked } from "@glimmer/tracking";
33
import { action } from "@ember/object";
4-
import didInsert from "@ember/render-modifiers/modifiers/did-insert";
54
import willDestroy from "@ember/render-modifiers/modifiers/will-destroy";
65
import { service } from "@ember/service";
76
import { htmlSafe } from "@ember/template";
@@ -19,8 +18,6 @@ import DiffStreamer from "../../lib/diff-streamer";
1918
import SmoothStreamer from "../../lib/smooth-streamer";
2019
import AiIndicatorWave from "../ai-indicator-wave";
2120

22-
const CHANNEL = "/discourse-ai/ai-helper/stream_composer_suggestion";
23-
2421
export default class ModalDiffModal extends Component {
2522
@service currentUser;
2623
@service messageBus;
@@ -83,21 +80,26 @@ export default class ModalDiffModal extends Component {
8380
return this.loading || this.isStreaming;
8481
}
8582

86-
@bind
83+
set progressChannel(value) {
84+
if (this._progressChannel) {
85+
this.messageBus.unsubscribe(this._progressChannel, this.updateResult);
86+
}
87+
this._progressChannel = value;
88+
this.subscribe();
89+
}
90+
8791
subscribe() {
88-
this.messageBus.subscribe(
89-
CHANNEL,
90-
this.updateResult,
91-
this.currentUser
92-
?.discourse_ai_helper_stream_composer_suggestion_last_message_bus_id
93-
);
92+
// we have 1 channel per operation so we can safely subscribe at head
93+
this.messageBus.subscribe(this._progressChannel, this.updateResult, 0);
9494
}
9595

9696
@bind
9797
cleanup() {
9898
// stop all callbacks so it does not end up streaming pointlessly
9999
this.#resetState();
100-
this.messageBus.unsubscribe(CHANNEL, this.updateResult);
100+
if (this._progressChannel) {
101+
this.messageBus.unsubscribe(this._progressChannel, this.updateResult);
102+
}
101103
}
102104

103105
@action
@@ -122,7 +124,7 @@ export default class ModalDiffModal extends Component {
122124

123125
try {
124126
this.loading = true;
125-
return await ajax("/discourse-ai/ai-helper/stream_suggestion", {
127+
const result = await ajax("/discourse-ai/ai-helper/stream_suggestion", {
126128
method: "POST",
127129
data: {
128130
location: "composer",
@@ -133,6 +135,8 @@ export default class ModalDiffModal extends Component {
133135
client_id: this.messageBus.clientId,
134136
},
135137
});
138+
139+
this.progressChannel = result.progress_channel;
136140
} catch (e) {
137141
popupAjaxError(e);
138142
}
@@ -183,11 +187,7 @@ export default class ModalDiffModal extends Component {
183187
@closeModal={{this.cleanupAndClose}}
184188
>
185189
<:body>
186-
<div
187-
{{didInsert this.subscribe}}
188-
{{willDestroy this.cleanup}}
189-
class="text-preview"
190-
>
190+
<div {{willDestroy this.cleanup}} class="text-preview">
191191
<div
192192
class={{concatClass
193193
"composer-ai-helper-modal__suggestion"

lib/ai_helper/entry_point.rb

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,6 @@ def inject_into(plugin)
7373
scope.user.in_any_groups?(SiteSetting.ai_auto_image_caption_allowed_groups_map)
7474
end,
7575
) { object.auto_image_caption }
76-
77-
plugin.add_to_serializer(
78-
:post,
79-
:discourse_ai_helper_stream_suggestion_last_message_bus_id,
80-
include_condition: -> { SiteSetting.ai_helper_enabled && scope.authenticated? },
81-
) { MessageBus.last_id("/discourse-ai/ai-helper/stream_suggestion/#{object.id}") }
82-
83-
plugin.add_to_serializer(
84-
:current_user,
85-
:discourse_ai_helper_stream_composer_suggestion_last_message_bus_id,
86-
include_condition: -> { SiteSetting.ai_helper_enabled && scope.authenticated? },
87-
) { MessageBus.last_id("/discourse-ai/ai-helper/stream_composer_suggestion") }
8876
end
8977
end
9078
end

spec/jobs/regular/stream_composer_helper_spec.rb

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,33 @@
1818
let(:mode) { DiscourseAi::AiHelper::Assistant::PROOFREAD }
1919

2020
it "does nothing if there is no user" do
21+
channel = "/some/channel"
2122
messages =
22-
MessageBus.track_publish("/discourse-ai/ai-helper/stream_suggestion") do
23-
job.execute(user_id: nil, text: input, prompt: mode, force_default_locale: false)
23+
MessageBus.track_publish(channel) do
24+
job.execute(
25+
user_id: nil,
26+
text: input,
27+
prompt: mode,
28+
force_default_locale: false,
29+
client_id: "123",
30+
progress_channel: channel,
31+
)
2432
end
2533

2634
expect(messages).to be_empty
2735
end
2836

2937
it "does nothing if there is no text" do
38+
channel = "/some/channel"
3039
messages =
31-
MessageBus.track_publish("/discourse-ai/ai-helper/stream_suggestion") do
40+
MessageBus.track_publish(channel) do
3241
job.execute(
3342
user_id: user.id,
3443
text: nil,
3544
prompt: mode,
3645
force_default_locale: false,
3746
client_id: "123",
47+
progress_channel: channel,
3848
)
3949
end
4050

@@ -47,16 +57,18 @@
4757

4858
it "publishes updates with a partial result" do
4959
proofread_result = "I like to eat pie for breakfast because it is delicious."
60+
channel = "/channel/123"
5061

5162
DiscourseAi::Completions::Llm.with_prepared_responses([proofread_result]) do
5263
messages =
53-
MessageBus.track_publish("/discourse-ai/ai-helper/stream_composer_suggestion") do
64+
MessageBus.track_publish(channel) do
5465
job.execute(
5566
user_id: user.id,
5667
text: input,
5768
prompt: mode,
5869
force_default_locale: true,
5970
client_id: "123",
71+
progress_channel: channel,
6072
)
6173
end
6274

@@ -68,16 +80,18 @@
6880

6981
it "publishes a final update to signal we're done" do
7082
proofread_result = "I like to eat pie for breakfast because it is delicious."
83+
channel = "/channel/123"
7184

7285
DiscourseAi::Completions::Llm.with_prepared_responses([proofread_result]) do
7386
messages =
74-
MessageBus.track_publish("/discourse-ai/ai-helper/stream_composer_suggestion") do
87+
MessageBus.track_publish(channel) do
7588
job.execute(
7689
user_id: user.id,
7790
text: input,
7891
prompt: mode,
7992
force_default_locale: true,
8093
client_id: "123",
94+
progress_channel: channel,
8195
)
8296
end
8397

0 commit comments

Comments
 (0)