Skip to content

Commit add68d4

Browse files
committed
PERF: Move per-post language detection job into batches using redis
Currently, upon post serialization, we spawn a new job to detect the language of the post. This could potentially flood sidekiq on larger sites. This PR uses redis to keep track of posts that need language detection, then schedules them in a 5-minute-ly job, with a maximum of 1000 posts at a time.
1 parent e82061a commit add68d4

File tree

3 files changed

+138
-5
lines changed

3 files changed

+138
-5
lines changed

plugin.rb

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ module ::DiscourseTranslator
1717
PLUGIN_NAME = "discourse_translator".freeze
1818
DETECTED_LANG_CUSTOM_FIELD = "post_detected_lang".freeze
1919
TRANSLATED_CUSTOM_FIELD = "translated_text".freeze
20+
LANG_DETECT_NEEDED = "lang_detect_needed".freeze
2021

2122
autoload :Microsoft,
2223
"#{Rails.root}/plugins/discourse-translator/services/discourse_translator/microsoft"
@@ -134,6 +135,43 @@ def execute(args)
134135
end
135136
end
136137
end
138+
139+
class DetectPostsTranslation < ::Jobs::Scheduled
140+
sidekiq_options retry: false
141+
every 5.minutes
142+
143+
BATCH_SIZE = 100
144+
MAX_QUEUE_SIZE = 1000
145+
146+
def execute(args)
147+
return unless SiteSetting.translator_enabled
148+
149+
post_ids = Discourse.redis.spop(DiscourseTranslator::LANG_DETECT_NEEDED, MAX_QUEUE_SIZE)
150+
return if post_ids.blank?
151+
152+
post_ids.each_slice(BATCH_SIZE) { |batch| process_batch(batch) }
153+
end
154+
155+
private
156+
157+
def process_batch(post_ids)
158+
posts = Post.where(id: post_ids).to_a
159+
posts.each do |post|
160+
DistributedMutex.synchronize("detect_translation_#{post.id}") do
161+
begin
162+
translator = "DiscourseTranslator::#{SiteSetting.translator}".constantize
163+
translator.detect(post)
164+
if !post.custom_fields_clean?
165+
post.save_custom_fields
166+
post.publish_change_to_clients!(:revised)
167+
end
168+
rescue ::DiscourseTranslator::ProblemCheckedTranslationError
169+
# problem-checked translation errors gracefully
170+
end
171+
end
172+
end
173+
end
174+
end
137175
end
138176

139177
on(:post_process) { |post| Jobs.enqueue(:detect_translation, post_id: post.id) }
@@ -160,7 +198,7 @@ def execute(args)
160198
detected_lang = post_custom_fields[::DiscourseTranslator::DETECTED_LANG_CUSTOM_FIELD]
161199

162200
if !detected_lang
163-
Jobs.enqueue(:detect_translation, post_id: object.id)
201+
Discourse.redis.sadd?(DiscourseTranslator::LANG_DETECT_NEEDED, object.id)
164202
false
165203
else
166204
detected_lang !=
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# frozen_string_literal: true
2+
3+
require "aws-sdk-translate"
4+
5+
describe Jobs::DetectPostsTranslation do
6+
fab!(:posts) { Fabricate.times(5, :post) }
7+
8+
before do
9+
SiteSetting.translator_enabled = true
10+
SiteSetting.translator = "Amazon"
11+
client = Aws::Translate::Client.new(stub_responses: true)
12+
client.stub_responses(
13+
:translate_text,
14+
{ translated_text: "大丈夫", source_language_code: "en", target_language_code: "jp" },
15+
)
16+
Aws::Translate::Client.stubs(:new).returns(client)
17+
posts.each { |post| Discourse.redis.sadd?(DiscourseTranslator::LANG_DETECT_NEEDED, post.id) }
18+
end
19+
20+
it "processes posts in batches and updates their translations" do
21+
described_class.new.execute({})
22+
23+
posts.each do |post|
24+
post.reload
25+
expect(post.custom_fields[DiscourseTranslator::DETECTED_LANG_CUSTOM_FIELD]).not_to be_nil
26+
end
27+
28+
expect(Discourse.redis.smembers(DiscourseTranslator::LANG_DETECT_NEEDED)).to be_empty
29+
end
30+
31+
it "does not process posts if the translator is disabled" do
32+
SiteSetting.translator_enabled = false
33+
described_class.new.execute({})
34+
35+
posts.each do |post|
36+
post.reload
37+
expect(post.custom_fields[DiscourseTranslator::DETECTED_LANG_CUSTOM_FIELD]).to be_nil
38+
end
39+
40+
expect(Discourse.redis.smembers(DiscourseTranslator::LANG_DETECT_NEEDED)).to match_array(
41+
posts.map(&:id).map(&:to_s),
42+
)
43+
end
44+
45+
it "processes a maximum of MAX_QUEUE_SIZE posts per run" do
46+
large_number = 2000
47+
large_number.times { |i| Discourse.redis.sadd?(DiscourseTranslator::LANG_DETECT_NEEDED, i + 1) }
48+
described_class.new.execute({})
49+
50+
remaining = Discourse.redis.scard(DiscourseTranslator::LANG_DETECT_NEEDED)
51+
expect(remaining).to eq(large_number - Jobs::DetectPostsTranslation::MAX_QUEUE_SIZE)
52+
end
53+
54+
it "handles an empty Redis queue gracefully" do
55+
Discourse.redis.del(DiscourseTranslator::LANG_DETECT_NEEDED)
56+
expect { described_class.new.execute({}) }.not_to raise_error
57+
end
58+
59+
it "removes successfully processed posts from Redis" do
60+
described_class.new.execute({})
61+
62+
posts.each do |post|
63+
expect(
64+
Discourse.redis.sismember(DiscourseTranslator::LANG_DETECT_NEEDED, post.id),
65+
).to be_falsey
66+
end
67+
end
68+
69+
it "skips posts that no longer exist" do
70+
non_existent_post_id = -1
71+
Discourse.redis.sadd?(DiscourseTranslator::LANG_DETECT_NEEDED, non_existent_post_id)
72+
73+
expect { described_class.new.execute({}) }.not_to raise_error
74+
75+
expect(
76+
Discourse.redis.sismember(DiscourseTranslator::LANG_DETECT_NEEDED, non_existent_post_id),
77+
).to be_falsey
78+
end
79+
80+
it "ensures posts are processed within a distributed mutex" do
81+
mutex_spy = instance_spy(DistributedMutex)
82+
allow(DistributedMutex).to receive(:synchronize).and_yield
83+
84+
described_class.new.execute({})
85+
86+
posts.each do |post|
87+
expect(DistributedMutex).to have_received(:synchronize).with("detect_translation_#{post.id}")
88+
end
89+
end
90+
end

spec/serializers/post_serializer_spec.rb

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,15 @@
7777
expect(serializer.can_translate).to eq(false)
7878
end
7979

80-
it "enqueues detect translation job" do
81-
expect { serializer.can_translate }.to change {
82-
Jobs::DetectTranslation.jobs.size
83-
}.by(1)
80+
it "adds post id to redis if detected_language is blank" do
81+
post.custom_fields["detected_language"] = nil
82+
post.save_custom_fields
83+
84+
serializer.can_translate
85+
86+
expect(
87+
Discourse.redis.sismember(DiscourseTranslator::LANG_DETECT_NEEDED, post.id),
88+
).to eq(true)
8489
end
8590
end
8691

0 commit comments

Comments
 (0)