Skip to content

Commit 7b00016

Browse files
committed
Add a basic batching mechanism
Inspired by AWS Event Source Mapping, it support batch on size or time_window
1 parent 78614f6 commit 7b00016

14 files changed

+388
-8
lines changed

Procfile.dev

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
web: bin/rails server
22
css: bin/rails tailwindcss:watch
3+
job: bin/jobs

app/jobs/classifier_trainer_job.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ class ClassifierTrainerJob < ApplicationJob
33
queue_as :training
44

55
def perform(trained_messages)
6-
Rails.logger.info "Retrain all the classifiers for public"
6+
Rails.logger.info "Retraining classifiers."
7+
trained_message_ids = trained_messages.map { |data| data["id"] }
8+
trained_messages = TrainedMessage.where(id: trained_message_ids)
9+
710
# Separate messages by their training target
811
user_name_messages = trained_messages.select(&:user_name?)
912
message_content_messages = trained_messages.select(&:message_content?)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
class ProcessPendingBatchesJob < ApplicationJob
2+
queue_as :batching
3+
4+
def perform
5+
BatchProcessor.process_pending_batches
6+
end
7+
end

app/models/batch_processor.rb

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
class BatchProcessor < ApplicationRecord
2+
validates :batch_key, presence: true, uniqueness: true
3+
validates :job_class, presence: true
4+
validates :pending_count, presence: true, numericality: { greater_than_or_euqal_to: 0 }
5+
6+
# Generic batching configuration
7+
DEFAULT_BATCH_SIZE = 100
8+
DEFAULT_BATCH_WINDOW_IN_SECONDS = 30.seconds
9+
10+
# JSON serialization for SQLite compatibility
11+
def shared_args
12+
JSON.parse(shared_args_json || "{}")
13+
end
14+
15+
def shared_args=(value)
16+
self.shared_args_json = value.to_json
17+
end
18+
19+
def pending_items
20+
JSON.parse(pending_items_json || "[]")
21+
end
22+
23+
def pending_items=(value)
24+
self.pending_items_json = value.to_json
25+
end
26+
27+
def self.add_to_batch(batch_key, job_class, item_data, shared_args = {}, batch_size: DEFAULT_BATCH_SIZE, batch_window: DEFAULT_BATCH_WINDOW_IN_SECONDS)
28+
batch = find_or_create_by(batch_key: batch_key) do |b|
29+
b.job_class = job_class
30+
b.shared_args = shared_args
31+
b.pending_items = []
32+
b.pending_count = 0
33+
b.batch_size = batch_size
34+
b.batch_window_in_seconds = batch_window.to_i
35+
b.last_processed_at = Time.current
36+
end
37+
38+
current_items = batch.pending_items
39+
current_items << item_data
40+
batch.pending_items = current_items
41+
batch.pending_count = current_items.size
42+
batch.save!
43+
44+
# Trigger processing ONLY if the batch is full
45+
if batch.pending_count >= batch.batch_size
46+
process_batch(batch)
47+
end
48+
end
49+
50+
def self.process_pending_batches
51+
where("pending_count > 0")
52+
.find_each do |batch|
53+
if batch.last_processed_at < batch.batch_window_in_seconds.seconds.ago
54+
process_batch(batch)
55+
end
56+
end
57+
end
58+
59+
private
60+
def self.should_process_batch?(batch)
61+
batch.pending_count >= batch.batch_size || batch.last_processed_at < batch.batch_window_in_seconds.seconds.ago
62+
end
63+
64+
def self.process_batch(batch)
65+
return if batch.pending_items.blank?
66+
67+
job_class = batch.job_class.constantize
68+
job_class.perform_later(batch.pending_items, **batch.shared_args.symbolize_keys)
69+
70+
batch.update!(
71+
pending_items: [],
72+
pending_count: 0,
73+
last_processed_at: Time.current
74+
)
75+
end
76+
end

app/models/trained_message.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,18 @@ def should_ban_user
5454
)
5555
end
5656
end
57+
5758
def retrain_classifier
5859
return if untrained?
5960

60-
ClassifierTrainerJob.perform_later([ self ])
61+
BatchProcessor.add_to_batch(
62+
"classifier_training_batch_key",
63+
"ClassifierTrainerJob",
64+
self, # pass trainedMessage as item_data
65+
{}, # no shared_args needed
66+
batch_size: 100,
67+
batch_window: 5.minutes
68+
)
6169
end
6270

6371
private

config/database.yml

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,41 @@ default: &default
1010
timeout: 5000
1111

1212
development:
13-
<<: *default
14-
database: storage/development.sqlite3
13+
primary:
14+
<<: *default
15+
database: storage/development.sqlite3
16+
cache:
17+
<<: *default
18+
database: storage/development_cache.sqlite3
19+
migrations_paths: db/cache_migrate
20+
queue:
21+
<<: *default
22+
database: storage/development_queue.sqlite3
23+
migrations_paths: db/queue_migrate
24+
cable:
25+
<<: *default
26+
database: storage/development_cable.sqlite3
27+
migrations_paths: db/cable_migrate
1528

1629
# Warning: The database defined as "test" will be erased and
1730
# re-generated from your development database when you run "rake".
1831
# Do not set this db to the same as development or production.
1932
test:
20-
<<: *default
21-
database: storage/test.sqlite3
22-
33+
primary:
34+
<<: *default
35+
database: storage/test.sqlite3
36+
cache:
37+
<<: *default
38+
database: storage/test_cache.sqlite3
39+
migrations_paths: db/cache_migrate
40+
queue:
41+
<<: *default
42+
database: storage/test_queue.sqlite3
43+
migrations_paths: db/queue_migrate
44+
cable:
45+
<<: *default
46+
database: storage/test_cable.sqlite3
47+
migrations_paths: db/cable_migrate
2348

2449
# Store production database in the storage/ directory, which by default
2550
# is mounted as a persistent Docker volume in config/deploy.yml.

config/environments/development.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@
5555
# Highlight code that enqueued background job in logs.
5656
config.active_job.verbose_enqueue_logs = true
5757

58+
# Configure Active Job to use Solid Queue
59+
config.active_job.queue_adapter = :solid_queue
60+
61+
# Configure Solid Queue to use the queue database
62+
config.solid_queue.connects_to = { database: { writing: :queue } }
63+
5864
# Raises error for missing translations.
5965
# config.i18n.raise_on_missing_translations = true
6066

config/recurring.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,12 @@ production:
1313
clear_solid_queue_finished_jobs:
1414
command: "SolidQueue::Job.clear_finished_in_batches(sleep_between_batches: 0.3)"
1515
schedule: every hour at minute 12
16+
17+
process_pending_batches:
18+
class: ProcessPendingBatchesJob
19+
schedule: every 3 seconds
20+
21+
development:
22+
process_pending_batches:
23+
class: ProcessPendingBatchesJob
24+
schedule: every 3 seconds
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
class CreateBatchProcessors < ActiveRecord::Migration[8.0]
2+
def change
3+
create_table :batch_processors do |t|
4+
t.string :batch_key, null: false
5+
t.string :job_class, null: false
6+
t.text :shared_args_json, default: "{}"
7+
t.text :pending_items_json, default: "[]"
8+
t.integer :pending_count, default: 0
9+
t.integer :batch_size, default: 100
10+
t.integer :batch_window_in_seconds, default: 30
11+
12+
t.timestamps
13+
end
14+
add_index :batch_processors, :batch_key, unique: true
15+
add_index :batch_processors, :updated_at
16+
end
17+
end
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
class AddLastProcessedAtToBatchProcessors < ActiveRecord::Migration[8.0]
2+
def change
3+
add_column :batch_processors, :last_processed_at, :datetime
4+
end
5+
end

0 commit comments

Comments
 (0)