@@ -74,8 +74,205 @@ def destroy
7474 end
7575 end
7676
77+ class << self
78+ POOL_SIZE = 10
79+ def thread_pool
80+ @thread_pool ||=
81+ Concurrent ::CachedThreadPool . new ( min_threads : 0 , max_threads : POOL_SIZE , idletime : 30 )
82+ end
83+
84+ def schedule_block ( &block )
85+ # think about a better way to handle cross thread connections
86+ if Rails . env . test?
87+ block . call
88+ return
89+ end
90+
91+ db = RailsMultisite ::ConnectionManagement . current_db
92+ thread_pool . post do
93+ begin
94+ RailsMultisite ::ConnectionManagement . with_connection ( db ) { block . call }
95+ rescue StandardError => e
96+ Discourse . warn_exception ( e , message : "Discourse AI: Unable to stream reply" )
97+ end
98+ end
99+ end
100+ end
101+
102+ CRLF = "\r \n "
103+
104+ def stream_reply
105+ persona =
106+ AiPersona . find_by ( name : params [ :persona_name ] ) ||
107+ AiPersona . find_by ( id : params [ :persona_id ] )
108+ return render_json_error ( I18n . t ( "discourse_ai.errors.persona_not_found" ) ) if persona . nil?
109+
110+ return render_json_error ( I18n . t ( "discourse_ai.errors.persona_disabled" ) ) if !persona . enabled
111+
112+ if persona . default_llm . blank?
113+ return render_json_error ( I18n . t ( "discourse_ai.errors.no_default_llm" ) )
114+ end
115+
116+ if params [ :query ] . blank?
117+ return render_json_error ( I18n . t ( "discourse_ai.errors.no_query_specified" ) )
118+ end
119+
120+ if !persona . user_id
121+ return render_json_error ( I18n . t ( "discourse_ai.errors.no_user_for_persona" ) )
122+ end
123+
124+ if !params [ :username ] && !params [ :user_unique_id ]
125+ return render_json_error ( I18n . t ( "discourse_ai.errors.no_user_specified" ) )
126+ end
127+
128+ user = nil
129+
130+ if params [ :username ]
131+ user = User . find_by_username ( params [ :username ] )
132+ return render_json_error ( I18n . t ( "discourse_ai.errors.user_not_found" ) ) if user . nil?
133+ elsif params [ :user_unique_id ]
134+ user = stage_user
135+ end
136+
137+ raise Discourse ::NotFound if user . nil?
138+
139+ topic_id = params [ :topic_id ] . to_i
140+ topic = nil
141+ post = nil
142+
143+ if topic_id > 0
144+ topic = Topic . find ( topic_id )
145+
146+ raise Discourse ::NotFound if topic . nil?
147+
148+ if topic . topic_allowed_users . where ( user_id : user . id ) . empty?
149+ return render_json_error ( I18n . t ( "discourse_ai.errors.user_not_allowed" ) )
150+ end
151+
152+ post =
153+ PostCreator . create! (
154+ user ,
155+ topic_id : topic_id ,
156+ raw : params [ :query ] ,
157+ skip_validations : true ,
158+ )
159+ else
160+ post =
161+ PostCreator . create! (
162+ user ,
163+ title : I18n . t ( "discourse_ai.ai_bot.default_pm_prefix" ) ,
164+ raw : params [ :query ] ,
165+ archetype : Archetype . private_message ,
166+ target_usernames : "#{ user . username } ,#{ persona . user . username } " ,
167+ skip_validations : true ,
168+ )
169+
170+ topic = post . topic
171+ end
172+
173+ hijack = request . env [ "rack.hijack" ]
174+ io = hijack . call
175+
176+ user = current_user
177+
178+ self . class . queue_streamed_reply ( io , persona , user , topic , post )
179+ end
180+
77181 private
78182
183+ AI_STREAM_CONVERSATION_UNIQUE_ID = "ai-stream-conversation-unique-id"
184+
185+ # keeping this in a static method so we don't capture ENV and other bits
186+ # this allows us to release memory earlier
187+ def self . queue_streamed_reply ( io , persona , user , topic , post )
188+ schedule_block do
189+ begin
190+ io . write "HTTP/1.1 200 OK"
191+ io . write CRLF
192+ io . write "Content-Type: text/plain; charset=utf-8"
193+ io . write CRLF
194+ io . write "Transfer-Encoding: chunked"
195+ io . write CRLF
196+ io . write "Cache-Control: no-cache, no-store, must-revalidate"
197+ io . write CRLF
198+ io . write "Connection: close"
199+ io . write CRLF
200+ io . write "X-Accel-Buffering: no"
201+ io . write CRLF
202+ io . write "X-Content-Type-Options: nosniff"
203+ io . write CRLF
204+ io . write CRLF
205+ io . flush
206+
207+ persona_class =
208+ DiscourseAi ::AiBot ::Personas ::Persona . find_by ( id : persona . id , user : user )
209+ bot = DiscourseAi ::AiBot ::Bot . as ( persona . user , persona : persona_class . new )
210+
211+ data =
212+ { topic_id : topic . id , bot_user_id : persona . user . id , persona_id : persona . id } . to_json +
213+ "\n \n "
214+
215+ io . write data . bytesize . to_s ( 16 )
216+ io . write CRLF
217+ io . write data
218+ io . write CRLF
219+
220+ DiscourseAi ::AiBot ::Playground
221+ . new ( bot )
222+ . reply_to ( post ) do |partial |
223+ next if partial . length == 0
224+
225+ data = { partial : partial } . to_json + "\n \n "
226+
227+ data . force_encoding ( "UTF-8" )
228+
229+ io . write data . bytesize . to_s ( 16 )
230+ io . write CRLF
231+ io . write data
232+ io . write CRLF
233+ io . flush
234+ end
235+
236+ io . write "0"
237+ io . write CRLF
238+ io . write CRLF
239+
240+ io . flush
241+ io . done
242+ rescue StandardError => e
243+ # make it a tiny bit easier to debug in dev, this is tricky
244+ # multi-threaded code that exhibits various limitations in rails
245+ p e if Rails . env . development?
246+ Discourse . warn_exception ( e , message : "Discourse AI: Unable to stream reply" )
247+ ensure
248+ io . close
249+ end
250+ end
251+ end
252+
253+ def stage_user
254+ unique_id = params [ :user_unique_id ] . to_s
255+ field = UserCustomField . find_by ( name : AI_STREAM_CONVERSATION_UNIQUE_ID , value : unique_id )
256+
257+ if field
258+ field . user
259+ else
260+ preferred_username = params [ :preferred_username ]
261+ username = UserNameSuggester . suggest ( preferred_username || unique_id )
262+
263+ user =
264+ User . new (
265+ username : username ,
266+ email : "#{ SecureRandom . hex } @invalid.com" ,
267+ staged : true ,
268+ active : false ,
269+ )
270+ user . custom_fields [ AI_STREAM_CONVERSATION_UNIQUE_ID ] = unique_id
271+ user . save!
272+ user
273+ end
274+ end
275+
79276 def find_ai_persona
80277 @ai_persona = AiPersona . find ( params [ :id ] )
81278 end
0 commit comments