Skip to content
This repository was archived by the owner on Jul 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions app/controllers/discourse_ai/admin/ai_personas_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,205 @@ def destroy
end
end

class << self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like stuff that should be generic/in core/an ActiveSupport::Concern that we could use elsewhere too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will move it out to a service at some point definitely do not want a concern here , this is a very sharp knife I only want people that absolutely understand repercussions to use it.

POOL_SIZE = 10
def thread_pool
@thread_pool ||=
Concurrent::CachedThreadPool.new(min_threads: 0, max_threads: POOL_SIZE, idletime: 30)
end

def schedule_block(&block)
# think about a better way to handle cross thread connections
if Rails.env.test?
block.call
return
end

db = RailsMultisite::ConnectionManagement.current_db
thread_pool.post do
begin
RailsMultisite::ConnectionManagement.with_connection(db) { block.call }
rescue StandardError => e
Discourse.warn_exception(e, message: "Discourse AI: Unable to stream reply")
end
end
end
end

CRLF = "\r\n"

def stream_reply
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see Keegan mentioned a similar thing, but this is an extremely fat controller action, I really don't feel like we should be doing this very often, if ever. It doesn't feel very maintainable or easy to read, and could easily expand even further in size. Much of this is a perfect case for using a service.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasons why it's perfect for a service:

  • It has model lookups
  • It has several parameter validations and error messages
  • It has a clear action and result from that action, which could also have more errors

Not saying the reply streaming stuff should be in one, but everything before it easily could be

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been waiting a bit on service docs and service stability, stuff has been changing recently

persona =
AiPersona.find_by(name: params[:persona_name]) ||
AiPersona.find_by(id: params[:persona_id])
return render_json_error(I18n.t("discourse_ai.errors.persona_not_found")) if persona.nil?

return render_json_error(I18n.t("discourse_ai.errors.persona_disabled")) if !persona.enabled

if persona.default_llm.blank?
return render_json_error(I18n.t("discourse_ai.errors.no_default_llm"))
end

if params[:query].blank?
return render_json_error(I18n.t("discourse_ai.errors.no_query_specified"))
end

if !persona.user_id
return render_json_error(I18n.t("discourse_ai.errors.no_user_for_persona"))
end

if !params[:username] && !params[:user_unique_id]
return render_json_error(I18n.t("discourse_ai.errors.no_user_specified"))
end

user = nil

if params[:username]
user = User.find_by_username(params[:username])
return render_json_error(I18n.t("discourse_ai.errors.user_not_found")) if user.nil?
elsif params[:user_unique_id]
user = stage_user
end

raise Discourse::NotFound if user.nil?

topic_id = params[:topic_id].to_i
topic = nil
post = nil

if topic_id > 0
topic = Topic.find(topic_id)

raise Discourse::NotFound if topic.nil?

if topic.topic_allowed_users.where(user_id: user.id).empty?
return render_json_error(I18n.t("discourse_ai.errors.user_not_allowed"))
end

post =
PostCreator.create!(
user,
topic_id: topic_id,
raw: params[:query],
skip_validations: true,
)
else
post =
PostCreator.create!(
user,
title: I18n.t("discourse_ai.ai_bot.default_pm_prefix"),
raw: params[:query],
archetype: Archetype.private_message,
target_usernames: "#{user.username},#{persona.user.username}",
skip_validations: true,
)

topic = post.topic
end

hijack = request.env["rack.hijack"]
io = hijack.call

user = current_user

self.class.queue_streamed_reply(io, persona, user, topic, post)
end

private

AI_STREAM_CONVERSATION_UNIQUE_ID = "ai-stream-conversation-unique-id"

# keeping this in a static method so we don't capture ENV and other bits
# this allows us to release memory earlier
def self.queue_streamed_reply(io, persona, user, topic, post)
schedule_block do
begin
io.write "HTTP/1.1 200 OK"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's for a good reason (performance?), but is there any way not to do this writing of headers and data completely manually?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rails has streaming, but I don't want to stream from the foreground thread, I can make wrapper classes for this but at the end the same code will run

io.write CRLF
io.write "Content-Type: text/plain; charset=utf-8"
io.write CRLF
io.write "Transfer-Encoding: chunked"
io.write CRLF
io.write "Cache-Control: no-cache, no-store, must-revalidate"
io.write CRLF
io.write "Connection: close"
io.write CRLF
io.write "X-Accel-Buffering: no"
io.write CRLF
io.write "X-Content-Type-Options: nosniff"
io.write CRLF
io.write CRLF
io.flush

persona_class =
DiscourseAi::AiBot::Personas::Persona.find_by(id: persona.id, user: user)
bot = DiscourseAi::AiBot::Bot.as(persona.user, persona: persona_class.new)

data =
{ topic_id: topic.id, bot_user_id: persona.user.id, persona_id: persona.id }.to_json +
"\n\n"

io.write data.bytesize.to_s(16)
io.write CRLF
io.write data
io.write CRLF

DiscourseAi::AiBot::Playground
.new(bot)
.reply_to(post) do |partial|
next if partial.length == 0

data = { partial: partial }.to_json + "\n\n"

data.force_encoding("UTF-8")

io.write data.bytesize.to_s(16)
io.write CRLF
io.write data
io.write CRLF
io.flush
end

io.write "0"
io.write CRLF
io.write CRLF

io.flush
io.done
rescue StandardError => e
# make it a tiny bit easier to debug in dev, this is tricky
# multi-threaded code that exhibits various limitations in rails
p e if Rails.env.development?
Discourse.warn_exception(e, message: "Discourse AI: Unable to stream reply")
ensure
io.close
end
end
end

def stage_user
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing that could be in a service, or at least a reusable class in core. Nothing except the custom field is AI-specific here

unique_id = params[:user_unique_id].to_s
field = UserCustomField.find_by(name: AI_STREAM_CONVERSATION_UNIQUE_ID, value: unique_id)

if field
field.user
else
preferred_username = params[:preferred_username]
username = UserNameSuggester.suggest(preferred_username || unique_id)

user =
User.new(
username: username,
email: "#{SecureRandom.hex}@invalid.com",
staged: true,
active: false,
)
user.custom_fields[AI_STREAM_CONVERSATION_UNIQUE_ID] = unique_id
user.save!
user
end
end

def find_ai_persona
@ai_persona = AiPersona.find(params[:id])
end
Expand Down
2 changes: 1 addition & 1 deletion app/models/completion_prompt.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def each_message_length
messages.each_with_index do |msg, idx|
next if msg["content"].length <= 1000

errors.add(:messages, I18n.t("errors.prompt_message_length", idx: idx + 1))
errors.add(:messages, I18n.t("discourse_ai.errors.prompt_message_length", idx: idx + 1))
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion config/locales/client.en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ en:
scopes:
descriptions:
discourse_ai:
search: "Allows semantic search via the /discourse-ai/embeddings/semantic-search endpoint."
search: "Allows semantic search"
stream_completion: "Allows streaming ai persona completions"

site_settings:
categories:
Expand Down
16 changes: 12 additions & 4 deletions config/locales/server.en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ en:
flagged_by_toxicity: The AI plugin flagged this after classifying it as toxic.
flagged_by_nsfw: The AI plugin flagged this after classifying at least one of the attached images as NSFW.

errors:
prompt_message_length: The message %{idx} is over the 1000 character limit.
invalid_prompt_role: The message %{idx} has an invalid role.

reports:
overall_sentiment:
title: "Overall sentiment"
Expand Down Expand Up @@ -169,6 +165,7 @@ en:
failed_to_share: "Failed to share the conversation"
conversation_deleted: "Conversation share deleted successfully"
ai_bot:
default_pm_prefix: "[Untitled AI bot PM]"
personas:
default_llm_required: "Default LLM model is required prior to enabling Chat"
cannot_delete_system_persona: "System personas cannot be deleted, please disable it instead"
Expand Down Expand Up @@ -347,3 +344,14 @@ en:
llm_models:
missing_provider_param: "%{param} can't be blank"
bedrock_invalid_url: "Please complete all the fields to contact this model."

errors:
no_query_specified: The query parameter is required, please specify it.
no_user_for_persona: The persona specified does not have a user associated with it.
persona_not_found: The persona specified does not exist. Check the persona_name or persona_id params.
no_user_specified: The username or the user_unique_id parameter is required, please specify it.
user_not_found: The user specified does not exist. Check the username param.
persona_disabled: The persona specified is disabled. Check the persona_name or persona_id params.
no_default_llm: The persona must have a default_llm defined.
user_not_allowed: The user is not allowed to participate in the topic.
prompt_message_length: The message %{idx} is over the 1000 character limit.
2 changes: 2 additions & 0 deletions config/routes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
path: "ai-personas",
controller: "discourse_ai/admin/ai_personas"

post "/ai-personas/stream-reply" => "discourse_ai/admin/ai_personas#stream_reply"

resources(
:ai_tools,
only: %i[index create show update destroy],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true
class AddUniqueAiStreamConversationUserIdIndex < ActiveRecord::Migration[7.1]
def change
add_index :user_custom_fields,
[:value],
unique: true,
where: "name = 'ai-stream-conversation-unique-id'"
end
end
8 changes: 4 additions & 4 deletions lib/ai_bot/bot.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def reply(context, &update_blk)
tool_found = true
# a bit hacky, but extra newlines do no harm
if needs_newlines
update_blk.call("\n\n", cancel, nil)
update_blk.call("\n\n", cancel)
needs_newlines = false
end

Expand All @@ -123,7 +123,7 @@ def reply(context, &update_blk)
end
else
needs_newlines = true
update_blk.call(partial, cancel, nil)
update_blk.call(partial, cancel)
end
end

Expand Down Expand Up @@ -191,9 +191,9 @@ def invoke_tool(tool, llm, cancel, context, &update_blk)
tool_details = build_placeholder(tool.summary, tool.details, custom_raw: tool.custom_raw)

if context[:skip_tool_details] && tool.custom_raw.present?
update_blk.call(tool.custom_raw, cancel, nil)
update_blk.call(tool.custom_raw, cancel, nil, :custom_raw)
elsif !context[:skip_tool_details]
update_blk.call(tool_details, cancel, nil)
update_blk.call(tool_details, cancel, nil, :tool_details)
end

result
Expand Down
5 changes: 5 additions & 0 deletions lib/ai_bot/entry_point.rb
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ def inject_into(plugin)
plugin.register_editable_topic_custom_field(:ai_persona_id)
end

plugin.add_api_key_scope(
:discourse_ai,
{ stream_completion: { actions: %w[discourse_ai/admin/ai_personas#stream_reply] } },
)

plugin.on(:site_setting_changed) do |name, old_value, new_value|
if name == :ai_embeddings_model && SiteSetting.ai_embeddings_enabled? &&
new_value != old_value
Expand Down
11 changes: 9 additions & 2 deletions lib/ai_bot/playground.rb
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,12 @@ def get_context(participants:, conversation_context:, user:, skip_tool_details:
result
end

def reply_to(post)
def reply_to(post, &blk)
# this is a multithreading issue
# post custom prompt is needed and it may not
# be properly loaded, ensure it is loaded
PostCustomPrompt.none

reply = +""
start = Time.now

Expand Down Expand Up @@ -441,11 +446,13 @@ def reply_to(post)
context[:skip_tool_details] ||= !bot.persona.class.tool_details

new_custom_prompts =
bot.reply(context) do |partial, cancel, placeholder|
bot.reply(context) do |partial, cancel, placeholder, type|
reply << partial
raw = reply.dup
raw << "\n\n" << placeholder if placeholder.present? && !context[:skip_tool_details]

blk.call(partial) if blk && type != :tool_details

if stream_reply && !Discourse.redis.get(redis_stream_key)
cancel&.call
reply_post.update!(raw: reply, cooked: PrettyText.cook(reply))
Expand Down
13 changes: 13 additions & 0 deletions lib/completions/endpoints/fake.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ def self.with_fake_content(content)
@fake_content = nil
end

def self.fake_content=(content)
@fake_content = content
end

def self.fake_content
@fake_content || STOCK_CONTENT
end
Expand Down Expand Up @@ -100,6 +104,13 @@ def self.last_call=(params)
@last_call = params
end

def self.reset!
@last_call = nil
@fake_content = nil
@delays = nil
@chunk_count = nil
end

def perform_completion!(
dialect,
user,
Expand All @@ -111,6 +122,8 @@ def perform_completion!(

content = self.class.fake_content

content = content.shift if content.is_a?(Array)

if block_given?
split_indices = (1...content.length).to_a.sample(self.class.chunk_count - 1).sort
indexes = [0, *split_indices, content.length]
Expand Down
Loading