Skip to content
This repository was archived by the owner on Jul 22, 2025. It is now read-only.

Commit c315d94

Browse files
committed
DEV: re-implement bulk sentiment classifier
New implementation uses core concurrent job queue, it is more robust and predictable than the one shipped in Concurrent. Additionally: - Trickles through updates during bulk classification - Reports errors if we fail during a bulk classification
1 parent baaa3d1 commit c315d94

File tree

1 file changed

+51
-39
lines changed

1 file changed

+51
-39
lines changed

lib/sentiment/post_classification.rb

Lines changed: 51 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -44,57 +44,69 @@ def self.backfill_query(from_post_id: nil, max_age_days: nil)
4444
Post.from(Arel.sql("(#{unioned_queries}) as posts"))
4545
end
4646

47+
CONCURRENT_CLASSFICATIONS = 100
48+
4749
def bulk_classify!(relation)
48-
http_pool_size = 100
4950
pool =
50-
Concurrent::CachedThreadPool.new(
51+
Scheduler::ThreadPool.new(
5152
min_threads: 0,
52-
max_threads: http_pool_size,
53-
idletime: 30,
53+
max_threads: CONCURRENT_CLASSFICATIONS,
54+
idle_time: 30,
5455
)
5556

5657
available_classifiers = classifiers
5758
return if available_classifiers.blank?
5859

59-
promised_classifications =
60-
relation
61-
.map do |record|
62-
text = prepare_text(record)
63-
next if text.blank?
64-
65-
Concurrent::Promises
66-
.fulfilled_future({ target: record, text: text }, pool)
67-
.then_on(pool) do |w_text|
68-
results = Concurrent::Hash.new
69-
already_classified = w_text[:target].sentiment_classifications.map(&:model_used)
70-
71-
classifiers_for_target =
72-
available_classifiers.reject do |ac|
73-
already_classified.include?(ac[:model_name])
74-
end
75-
76-
promised_target_results =
77-
classifiers_for_target.map do |cft|
78-
Concurrent::Promises.future_on(pool) do
79-
results[cft[:model_name]] = request_with(cft[:client], w_text[:text])
80-
end
81-
end
82-
83-
Concurrent::Promises
84-
.zip(*promised_target_results)
85-
.then_on(pool) { |_| w_text.merge(classification: results) }
86-
end
87-
.flat(1)
60+
results = Queue.new
61+
queued = 0
62+
63+
relation.each do |record|
64+
text = prepare_text(record)
65+
next if text.blank?
66+
67+
already_classified = record.sentiment_classifications.pluck(&:model_used)
68+
missing_classifiers =
69+
available_classifiers.reject { |ac| already_classified.include?(ac[:model_name]) }
70+
71+
missing_classifiers.each do |classifier|
72+
pool.post do
73+
result = { target: record, classifier: classifier, text: text }
74+
begin
75+
result[:classification] = request_with(classifier[:client], text)
76+
rescue StandardError => e
77+
result[:error] = e
78+
end
79+
results << result
8880
end
89-
.compact
81+
queued += 1
82+
end
83+
end
84+
85+
errors = []
86+
87+
while queued > 0
88+
result = results.pop
89+
if result[:error]
90+
errors << result
91+
else
92+
store_classification(
93+
result[:target],
94+
[[result[:classifier][:model_name], result[:classification]]],
95+
)
96+
end
97+
queued -= 1
98+
end
9099

91-
Concurrent::Promises
92-
.zip(*promised_classifications)
93-
.value!
94-
.each { |r| store_classification(r[:target], r[:classification]) }
100+
if errors.any?
101+
example_posts = errors.map { |e| e[:target].id }.take(5).join(", ")
102+
Discourse.warn_exception(
103+
errors[0][:error],
104+
"Discourse AI: Errors during bulk classification: Failed to classify #{errors.count} posts (example ids: #{example_posts})",
105+
)
106+
end
95107
ensure
96108
pool.shutdown
97-
pool.wait_for_termination
109+
pool.wait_for_termination(timeout: 30)
98110
end
99111

100112
def classify!(target)

0 commit comments

Comments
 (0)