Skip to content

Commit d35ea01

Browse files
committed
PERF: Move per-post language detection job into batches using redis
Currently, upon post serialization, we spawn a new job to detect the language of the post. This could potentially flood sidekiq on larger sites. This PR uses redis to keep track of posts that need language detection, then schedules them in a 5-minute-ly job, with a maximum of 1000 posts at a time.
1 parent e82061a commit d35ea01

File tree

3 files changed

+134
-5
lines changed

3 files changed

+134
-5
lines changed

plugin.rb

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ module ::DiscourseTranslator
1717
PLUGIN_NAME = "discourse_translator".freeze
1818
DETECTED_LANG_CUSTOM_FIELD = "post_detected_lang".freeze
1919
TRANSLATED_CUSTOM_FIELD = "translated_text".freeze
20+
LANG_DETECT_NEEDED = "lang_detect_needed".freeze
2021

2122
autoload :Microsoft,
2223
"#{Rails.root}/plugins/discourse-translator/services/discourse_translator/microsoft"
@@ -134,6 +135,45 @@ def execute(args)
134135
end
135136
end
136137
end
138+
139+
class DetectPostsTranslation < ::Jobs::Scheduled
140+
sidekiq_options retry: false
141+
every 5.minutes
142+
143+
BATCH_SIZE = 100
144+
MAX_QUEUE_SIZE = 1000
145+
146+
def execute(args)
147+
return unless SiteSetting.translator_enabled
148+
149+
post_ids = Discourse.redis.spop(DiscourseTranslator::LANG_DETECT_NEEDED, MAX_QUEUE_SIZE)
150+
return if post_ids.blank?
151+
152+
post_ids.each_slice(BATCH_SIZE) do |batch|
153+
process_batch(batch)
154+
end
155+
end
156+
157+
private
158+
159+
def process_batch(post_ids)
160+
posts = Post.where(id: post_ids).to_a
161+
posts.each do |post|
162+
DistributedMutex.synchronize("detect_translation_#{post.id}") do
163+
begin
164+
translator = "DiscourseTranslator::#{SiteSetting.translator}".constantize
165+
translator.detect(post)
166+
if !post.custom_fields_clean?
167+
post.save_custom_fields
168+
post.publish_change_to_clients!(:revised)
169+
end
170+
rescue ::DiscourseTranslator::ProblemCheckedTranslationError
171+
# problem-checked translation errors gracefully
172+
end
173+
end
174+
end
175+
end
176+
end
137177
end
138178

139179
on(:post_process) { |post| Jobs.enqueue(:detect_translation, post_id: post.id) }
@@ -160,7 +200,7 @@ def execute(args)
160200
detected_lang = post_custom_fields[::DiscourseTranslator::DETECTED_LANG_CUSTOM_FIELD]
161201

162202
if !detected_lang
163-
Jobs.enqueue(:detect_translation, post_id: object.id)
203+
Discourse.redis.sadd?(DiscourseTranslator::LANG_DETECT_NEEDED, object.id)
164204
false
165205
else
166206
detected_lang !=
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# frozen_string_literal: true
2+
3+
require "aws-sdk-translate"
4+
5+
describe Jobs::DetectPostsTranslation do
6+
fab!(:posts) { Fabricate.times(5, :post) }
7+
8+
before do
9+
SiteSetting.translator_enabled = true
10+
SiteSetting.translator = "Amazon"
11+
client = Aws::Translate::Client.new(stub_responses: true)
12+
client.stub_responses(
13+
:translate_text,
14+
{ translated_text: "大丈夫", source_language_code: "en", target_language_code: "jp" },
15+
)
16+
Aws::Translate::Client.stubs(:new).returns(client)
17+
posts.each do |post|
18+
Discourse.redis.sadd?(DiscourseTranslator::LANG_DETECT_NEEDED, post.id)
19+
end
20+
end
21+
22+
it "processes posts in batches and updates their translations" do
23+
described_class.new.execute({})
24+
25+
posts.each do |post|
26+
post.reload
27+
expect(post.custom_fields[DiscourseTranslator::DETECTED_LANG_CUSTOM_FIELD]).not_to be_nil
28+
end
29+
30+
expect(Discourse.redis.smembers(DiscourseTranslator::LANG_DETECT_NEEDED)).to be_empty
31+
end
32+
33+
it "does not process posts if the translator is disabled" do
34+
SiteSetting.translator_enabled = false
35+
described_class.new.execute({})
36+
37+
posts.each do |post|
38+
post.reload
39+
expect(post.custom_fields[DiscourseTranslator::DETECTED_LANG_CUSTOM_FIELD]).to be_nil
40+
end
41+
42+
expect(Discourse.redis.smembers(DiscourseTranslator::LANG_DETECT_NEEDED)).to match_array(posts.map(&:id).map(&:to_s))
43+
end
44+
45+
it "processes a maximum of MAX_QUEUE_SIZE posts per run" do
46+
large_number = 2000
47+
large_number.times { |i| Discourse.redis.sadd?(DiscourseTranslator::LANG_DETECT_NEEDED, i + 1) }
48+
described_class.new.execute({})
49+
50+
remaining = Discourse.redis.scard(DiscourseTranslator::LANG_DETECT_NEEDED)
51+
expect(remaining).to eq(large_number - Jobs::DetectPostsTranslation::MAX_QUEUE_SIZE)
52+
end
53+
54+
it "handles an empty Redis queue gracefully" do
55+
Discourse.redis.del(DiscourseTranslator::LANG_DETECT_NEEDED)
56+
expect { described_class.new.execute({}) }.not_to raise_error
57+
end
58+
59+
it "removes successfully processed posts from Redis" do
60+
described_class.new.execute({})
61+
62+
posts.each do |post|
63+
expect(Discourse.redis.sismember(DiscourseTranslator::LANG_DETECT_NEEDED, post.id)).to be_falsey
64+
end
65+
end
66+
67+
it "skips posts that no longer exist" do
68+
non_existent_post_id = -1
69+
Discourse.redis.sadd?(DiscourseTranslator::LANG_DETECT_NEEDED, non_existent_post_id)
70+
71+
expect { described_class.new.execute({}) }.not_to raise_error
72+
73+
expect(Discourse.redis.sismember(DiscourseTranslator::LANG_DETECT_NEEDED, non_existent_post_id)).to be_falsey
74+
end
75+
76+
it "ensures posts are processed within a distributed mutex" do
77+
mutex_spy = instance_spy(DistributedMutex)
78+
allow(DistributedMutex).to receive(:synchronize).and_yield
79+
80+
described_class.new.execute({})
81+
82+
posts.each do |post|
83+
expect(DistributedMutex).to have_received(:synchronize).with("detect_translation_#{post.id}")
84+
end
85+
end
86+
end

spec/serializers/post_serializer_spec.rb

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,13 @@
7777
expect(serializer.can_translate).to eq(false)
7878
end
7979

80-
it "enqueues detect translation job" do
81-
expect { serializer.can_translate }.to change {
82-
Jobs::DetectTranslation.jobs.size
83-
}.by(1)
80+
it "adds post id to redis if detected_language is blank" do
81+
post.custom_fields["detected_language"] = nil
82+
post.save_custom_fields
83+
84+
serializer.can_translate
85+
86+
expect(Discourse.redis.sismember(DiscourseTranslator::LANG_DETECT_NEEDED, post.id)).to eq(true)
8487
end
8588
end
8689

0 commit comments

Comments
 (0)