Skip to content

Commit 8fce768

Browse files
authored
PERF: Move per-post language detection job into batches using redis (#177)
* PERF: Move per-post language detection job into batches using redis Currently, upon post serialization, we spawn a new job to detect the language of the post. This could potentially flood sidekiq on larger sites. This PR uses redis to keep track of posts that need language detection, then schedules them in a 5-minute-ly job, with a maximum of 1000 posts at a time.
1 parent 4213639 commit 8fce768

File tree

4 files changed

+129
-10
lines changed

4 files changed

+129
-10
lines changed

plugin.rb

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ module ::DiscourseTranslator
1717
PLUGIN_NAME = "discourse_translator".freeze
1818
DETECTED_LANG_CUSTOM_FIELD = "post_detected_lang".freeze
1919
TRANSLATED_CUSTOM_FIELD = "translated_text".freeze
20+
LANG_DETECT_NEEDED = "lang_detect_needed".freeze
2021

2122
autoload :Microsoft,
2223
"#{Rails.root}/plugins/discourse-translator/services/discourse_translator/microsoft"
@@ -134,9 +135,44 @@ def execute(args)
134135
end
135136
end
136137
end
137-
end
138138

139-
on(:post_process) { |post| Jobs.enqueue(:detect_translation, post_id: post.id) }
139+
class DetectPostsTranslation < ::Jobs::Scheduled
140+
sidekiq_options retry: false
141+
every 5.minutes
142+
143+
BATCH_SIZE = 100
144+
MAX_QUEUE_SIZE = 1000
145+
146+
def execute(args)
147+
return unless SiteSetting.translator_enabled
148+
149+
post_ids = Discourse.redis.spop(DiscourseTranslator::LANG_DETECT_NEEDED, MAX_QUEUE_SIZE)
150+
return if post_ids.blank?
151+
152+
post_ids.each_slice(BATCH_SIZE) { |batch| process_batch(batch) }
153+
end
154+
155+
private
156+
157+
def process_batch(post_ids)
158+
posts = Post.where(id: post_ids).to_a
159+
posts.each do |post|
160+
DistributedMutex.synchronize("detect_translation_#{post.id}") do
161+
begin
162+
translator = "DiscourseTranslator::#{SiteSetting.translator}".constantize
163+
translator.detect(post)
164+
if !post.custom_fields_clean?
165+
post.save_custom_fields
166+
post.publish_change_to_clients!(:revised)
167+
end
168+
rescue ::DiscourseTranslator::ProblemCheckedTranslationError
169+
# problem-checked translation errors gracefully
170+
end
171+
end
172+
end
173+
end
174+
end
175+
end
140176

141177
topic_view_post_custom_fields_allowlister { [::DiscourseTranslator::DETECTED_LANG_CUSTOM_FIELD] }
142178

@@ -160,7 +196,7 @@ def execute(args)
160196
detected_lang = post_custom_fields[::DiscourseTranslator::DETECTED_LANG_CUSTOM_FIELD]
161197

162198
if !detected_lang
163-
Jobs.enqueue(:detect_translation, post_id: object.id)
199+
Discourse.redis.sadd?(DiscourseTranslator::LANG_DETECT_NEEDED, object.id)
164200
false
165201
else
166202
detected_lang !=
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# frozen_string_literal: true
2+
3+
require "aws-sdk-translate"
4+
5+
describe Jobs::DetectPostsTranslation do
6+
fab!(:posts) { Fabricate.times(5, :post) }
7+
let(:redis_key) { DiscourseTranslator::LANG_DETECT_NEEDED }
8+
9+
before do
10+
SiteSetting.translator_enabled = true
11+
SiteSetting.translator = "Amazon"
12+
client = Aws::Translate::Client.new(stub_responses: true)
13+
client.stub_responses(
14+
:translate_text,
15+
{ translated_text: "大丈夫", source_language_code: "en", target_language_code: "jp" },
16+
)
17+
Aws::Translate::Client.stubs(:new).returns(client)
18+
posts.each { |post| Discourse.redis.sadd?(redis_key, post.id) }
19+
end
20+
21+
it "processes posts in batches and updates their translations" do
22+
described_class.new.execute({})
23+
24+
posts.each do |post|
25+
post.reload
26+
expect(post.custom_fields[DiscourseTranslator::DETECTED_LANG_CUSTOM_FIELD]).not_to be_nil
27+
end
28+
29+
expect(Discourse.redis.smembers(redis_key)).to be_empty
30+
end
31+
32+
it "does not process posts if the translator is disabled" do
33+
SiteSetting.translator_enabled = false
34+
described_class.new.execute({})
35+
36+
posts.each do |post|
37+
post.reload
38+
expect(post.custom_fields[DiscourseTranslator::DETECTED_LANG_CUSTOM_FIELD]).to be_nil
39+
end
40+
41+
expect(Discourse.redis.smembers(redis_key)).to match_array(posts.map(&:id).map(&:to_s))
42+
end
43+
44+
it "processes a maximum of MAX_QUEUE_SIZE posts per run" do
45+
large_number = 2000
46+
large_number.times { |i| Discourse.redis.sadd?(redis_key, i + 1) }
47+
described_class.new.execute({})
48+
49+
remaining = Discourse.redis.scard(redis_key)
50+
expect(remaining).to eq(large_number - Jobs::DetectPostsTranslation::MAX_QUEUE_SIZE)
51+
end
52+
53+
it "handles an empty Redis queue gracefully" do
54+
Discourse.redis.del(redis_key)
55+
expect { described_class.new.execute({}) }.not_to raise_error
56+
end
57+
58+
it "removes successfully processed posts from Redis" do
59+
described_class.new.execute({})
60+
61+
posts.each { |post| expect(Discourse.redis.sismember(redis_key, post.id)).to be_falsey }
62+
end
63+
64+
it "skips posts that no longer exist" do
65+
non_existent_post_id = -1
66+
Discourse.redis.sadd?(redis_key, non_existent_post_id)
67+
68+
expect { described_class.new.execute({}) }.not_to raise_error
69+
70+
expect(Discourse.redis.sismember(redis_key, non_existent_post_id)).to be_falsey
71+
end
72+
73+
it "ensures posts are processed within a distributed mutex" do
74+
allow(DistributedMutex).to receive(:synchronize).and_yield
75+
76+
described_class.new.execute({})
77+
78+
posts.each do |post|
79+
expect(DistributedMutex).to have_received(:synchronize).with("detect_translation_#{post.id}")
80+
end
81+
end
82+
end

spec/models/post_spec.rb

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
require "rails_helper"
44

55
RSpec.describe Post do
6+
before { SiteSetting.translator_enabled = true }
7+
68
describe "translator custom fields" do
79
let(:post) do
810
Fabricate(
@@ -17,10 +19,6 @@
1719
)
1820
end
1921

20-
before { SiteSetting.translator_enabled = true }
21-
22-
after { SiteSetting.translator_enabled = false }
23-
2422
it "should reset custom fields when post has been updated" do
2523
post.update!(raw: "this is an updated post")
2624

spec/serializers/post_serializer_spec.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,13 @@
7777
expect(serializer.can_translate).to eq(false)
7878
end
7979

80-
it "enqueues detect translation job" do
80+
it "adds post id to redis if detected_language is blank" do
81+
post.custom_fields["detected_language"] = nil
82+
post.save_custom_fields
83+
8184
expect { serializer.can_translate }.to change {
82-
Jobs::DetectTranslation.jobs.size
83-
}.by(1)
85+
Discourse.redis.sismember(DiscourseTranslator::LANG_DETECT_NEEDED, post.id)
86+
}.from(false).to(true)
8487
end
8588
end
8689

0 commit comments

Comments
 (0)