Skip to content
This repository was archived by the owner on Jul 22, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 51 additions & 39 deletions lib/sentiment/post_classification.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,57 +44,69 @@ def self.backfill_query(from_post_id: nil, max_age_days: nil)
Post.from(Arel.sql("(#{unioned_queries}) as posts"))
end

CONCURRENT_CLASSFICATIONS = 40

def bulk_classify!(relation)
http_pool_size = 100
pool =
Concurrent::CachedThreadPool.new(
Scheduler::ThreadPool.new(
min_threads: 0,
max_threads: http_pool_size,
idletime: 30,
max_threads: CONCURRENT_CLASSFICATIONS,
idle_time: 30,
)

available_classifiers = classifiers
return if available_classifiers.blank?

promised_classifications =
relation
.map do |record|
text = prepare_text(record)
next if text.blank?

Concurrent::Promises
.fulfilled_future({ target: record, text: text }, pool)
.then_on(pool) do |w_text|
results = Concurrent::Hash.new
already_classified = w_text[:target].sentiment_classifications.map(&:model_used)

classifiers_for_target =
available_classifiers.reject do |ac|
already_classified.include?(ac[:model_name])
end

promised_target_results =
classifiers_for_target.map do |cft|
Concurrent::Promises.future_on(pool) do
results[cft[:model_name]] = request_with(cft[:client], w_text[:text])
end
end

Concurrent::Promises
.zip(*promised_target_results)
.then_on(pool) { |_| w_text.merge(classification: results) }
end
.flat(1)
results = Queue.new
queued = 0

relation.each do |record|
text = prepare_text(record)
next if text.blank?

already_classified = record.sentiment_classifications.pluck(&:model_used)
missing_classifiers =
available_classifiers.reject { |ac| already_classified.include?(ac[:model_name]) }

missing_classifiers.each do |classifier|
pool.post do
result = { target: record, classifier: classifier, text: text }
begin
result[:classification] = request_with(classifier[:client], text)
rescue StandardError => e
result[:error] = e
end
results << result
end
.compact
queued += 1
end
end

errors = []

while queued > 0
result = results.pop
if result[:error]
errors << result
else
store_classification(
result[:target],
[[result[:classifier][:model_name], result[:classification]]],
)
end
queued -= 1
end

Concurrent::Promises
.zip(*promised_classifications)
.value!
.each { |r| store_classification(r[:target], r[:classification]) }
if errors.any?
example_posts = errors.map { |e| e[:target].id }.take(5).join(", ")
Discourse.warn_exception(
errors[0][:error],
"Discourse AI: Errors during bulk classification: Failed to classify #{errors.count} posts (example ids: #{example_posts})",
)
end
ensure
pool.shutdown
pool.wait_for_termination
pool.wait_for_termination(timeout: 30)
end

def classify!(target)
Expand Down
Loading