Skip to content

Commit fd52a75

Browse files
committed
Support batching using custom columns
1 parent 7c1db0d commit fd52a75

File tree

6 files changed

+238
-84
lines changed

6 files changed

+238
-84
lines changed

activerecord/CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
* Support batching using custom columns.
2+
3+
```ruby
4+
Product.in_batches(cursor: [:shop_id, :id]) do |relation|
5+
# do something with relation
6+
end
7+
```
8+
9+
*fatkodima*
10+
111
* Use SQLite `IMMEDIATE` transactions when possible.
212

313
Transactions run against the SQLite3 adapter default to IMMEDIATE mode to improve concurrency support and avoid busy exceptions.

activerecord/lib/active_record/relation/batches.rb

Lines changed: 124 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
module ActiveRecord
66
# = Active Record \Batches
77
module Batches
8-
ORDER_IGNORE_MESSAGE = "Scoped order is ignored, it's forced to be batch order."
8+
ORDER_IGNORE_MESSAGE = "Scoped order is ignored, use :cursor with :order to configure custom order."
99
DEFAULT_ORDER = :asc
1010

1111
# Looping through a collection of records from the database
@@ -35,11 +35,13 @@ module Batches
3535
#
3636
# ==== Options
3737
# * <tt>:batch_size</tt> - Specifies the size of the batch. Defaults to 1000.
38-
# * <tt>:start</tt> - Specifies the primary key value to start from, inclusive of the value.
39-
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
38+
# * <tt>:start</tt> - Specifies the cursor column value to start from, inclusive of the value.
39+
# * <tt>:finish</tt> - Specifies the cursor column value to end at, inclusive of the value.
4040
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
4141
# an order is present in the relation.
42-
# * <tt>:order</tt> - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting
42+
# * <tt>:cursor</tt> - Specifies the column to use for batching (can be a column name or an array
43+
# of column names). Defaults to primary key.
44+
# * <tt>:order</tt> - Specifies the cursor column order (can be +:asc+ or +:desc+ or an array consisting
4345
# of :asc or :desc). Defaults to +:asc+.
4446
#
4547
# class Order < ActiveRecord::Base
@@ -71,20 +73,25 @@ module Batches
7173
#
7274
# NOTE: Order can be ascending (:asc) or descending (:desc). It is automatically set to
7375
# ascending on the primary key ("id ASC").
74-
# This also means that this method only works when the primary key is
76+
# This also means that this method only works when the cursor column is
7577
# orderable (e.g. an integer or string).
7678
#
79+
# NOTE: When using custom columns for batching, they should include at least one unique column
80+
# (e.g. primary key) as a tiebreaker. Also, to reduce the likelihood of race conditions,
81+
# all columns should be static (unchangeable after it was set).
82+
#
7783
# NOTE: By its nature, batch processing is subject to race conditions if
7884
# other processes are modifying the database.
79-
def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, order: DEFAULT_ORDER, &block)
85+
def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, cursor: primary_key, order: DEFAULT_ORDER, &block)
8086
if block_given?
81-
find_in_batches(start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do |records|
87+
find_in_batches(start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do |records|
8288
records.each(&block)
8389
end
8490
else
85-
enum_for(:find_each, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do
91+
enum_for(:find_each, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do
8692
relation = self
87-
apply_limits(relation, start, finish, build_batch_orders(order)).size
93+
cursor = Array(cursor)
94+
apply_limits(relation, cursor, start, finish, build_batch_orders(cursor, order)).size
8895
end
8996
end
9097
end
@@ -109,11 +116,13 @@ def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, o
109116
#
110117
# ==== Options
111118
# * <tt>:batch_size</tt> - Specifies the size of the batch. Defaults to 1000.
112-
# * <tt>:start</tt> - Specifies the primary key value to start from, inclusive of the value.
113-
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
119+
# * <tt>:start</tt> - Specifies the cursor column value to start from, inclusive of the value.
120+
# * <tt>:finish</tt> - Specifies the cursor column value to end at, inclusive of the value.
114121
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
115122
# an order is present in the relation.
116-
# * <tt>:order</tt> - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting
123+
# * <tt>:cursor</tt> - Specifies the column to use for batching (can be a column name or an array
124+
# of column names). Defaults to primary key.
125+
# * <tt>:order</tt> - Specifies the cursor column order (can be +:asc+ or +:desc+ or an array consisting
117126
# of :asc or :desc). Defaults to +:asc+.
118127
#
119128
# class Order < ActiveRecord::Base
@@ -140,21 +149,26 @@ def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, o
140149
#
141150
# NOTE: Order can be ascending (:asc) or descending (:desc). It is automatically set to
142151
# ascending on the primary key ("id ASC").
143-
# This also means that this method only works when the primary key is
152+
# This also means that this method only works when the cursor column is
144153
# orderable (e.g. an integer or string).
145154
#
155+
# NOTE: When using custom columns for batching, they should include at least one unique column
156+
# (e.g. primary key) as a tiebreaker. Also, to reduce the likelihood of race conditions,
157+
# all columns should be static (unchangeable after it was set).
158+
#
146159
# NOTE: By its nature, batch processing is subject to race conditions if
147160
# other processes are modifying the database.
148-
def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, order: DEFAULT_ORDER)
161+
def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, cursor: primary_key, order: DEFAULT_ORDER)
149162
relation = self
150163
unless block_given?
151-
return to_enum(:find_in_batches, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do
152-
total = apply_limits(relation, start, finish, build_batch_orders(order)).size
164+
return to_enum(:find_in_batches, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do
165+
cursor = Array(cursor)
166+
total = apply_limits(relation, cursor, start, finish, build_batch_orders(cursor, order)).size
153167
(total - 1).div(batch_size) + 1
154168
end
155169
end
156170

157-
in_batches(of: batch_size, start: start, finish: finish, load: true, error_on_ignore: error_on_ignore, order: order) do |batch|
171+
in_batches(of: batch_size, start: start, finish: finish, load: true, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do |batch|
158172
yield batch.to_a
159173
end
160174
end
@@ -183,11 +197,13 @@ def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore:
183197
# ==== Options
184198
# * <tt>:of</tt> - Specifies the size of the batch. Defaults to 1000.
185199
# * <tt>:load</tt> - Specifies if the relation should be loaded. Defaults to false.
186-
# * <tt>:start</tt> - Specifies the primary key value to start from, inclusive of the value.
187-
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
200+
# * <tt>:start</tt> - Specifies the cursor column value to start from, inclusive of the value.
201+
# * <tt>:finish</tt> - Specifies the cursor column value to end at, inclusive of the value.
188202
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
189203
# an order is present in the relation.
190-
# * <tt>:order</tt> - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting
204+
# * <tt>:cursor</tt> - Specifies the column to use for batching (can be a column name or an array
205+
# of column names). Defaults to primary key.
206+
# * <tt>:order</tt> - Specifies the cursor column order (can be +:asc+ or +:desc+ or an array consisting
191207
# of :asc or :desc). Defaults to +:asc+.
192208
#
193209
# class Order < ActiveRecord::Base
@@ -231,18 +247,21 @@ def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore:
231247
#
232248
# NOTE: Order can be ascending (:asc) or descending (:desc). It is automatically set to
233249
# ascending on the primary key ("id ASC").
234-
# This also means that this method only works when the primary key is
250+
# This also means that this method only works when the cursor column is
235251
# orderable (e.g. an integer or string).
236252
#
253+
# NOTE: When using custom columns for batching, they should include at least one unique column
254+
# (e.g. primary key) as a tiebreaker. Also, to reduce the likelihood of race conditions,
255+
# all columns should be static (unchangeable after it was set).
256+
#
237257
# NOTE: By its nature, batch processing is subject to race conditions if
238258
# other processes are modifying the database.
239-
def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil, order: DEFAULT_ORDER, use_ranges: nil, &block)
240-
unless Array(order).all? { |ord| [:asc, :desc].include?(ord) }
241-
raise ArgumentError, ":order must be :asc or :desc or an array consisting of :asc or :desc, got #{order.inspect}"
242-
end
259+
def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil, cursor: primary_key, order: DEFAULT_ORDER, use_ranges: nil, &block)
260+
cursor = Array(cursor).map(&:to_s)
261+
ensure_valid_options_for_batching!(cursor, start, finish, order)
243262

244263
unless block
245-
return BatchEnumerator.new(of: of, start: start, finish: finish, relation: self, order: order, use_ranges: use_ranges)
264+
return BatchEnumerator.new(of: of, start: start, finish: finish, relation: self, cursor: cursor, order: order, use_ranges: use_ranges)
246265
end
247266

248267
if arel.orders.present?
@@ -261,6 +280,7 @@ def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore:
261280
relation: self,
262281
start: start,
263282
finish: finish,
283+
cursor: cursor,
264284
order: order,
265285
batch_limit: batch_limit,
266286
&block
@@ -271,6 +291,7 @@ def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore:
271291
start: start,
272292
finish: finish,
273293
load: load,
294+
cursor: cursor,
274295
order: order,
275296
use_ranges: use_ranges,
276297
remaining: remaining,
@@ -281,28 +302,51 @@ def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore:
281302
end
282303

283304
private
284-
def apply_limits(relation, start, finish, batch_orders)
285-
relation = apply_start_limit(relation, start, batch_orders) if start
286-
relation = apply_finish_limit(relation, finish, batch_orders) if finish
305+
def ensure_valid_options_for_batching!(cursor, start, finish, order)
306+
if start && Array(start).size != cursor.size
307+
raise ArgumentError, ":start must contain one value per cursor column"
308+
end
309+
310+
if finish && Array(finish).size != cursor.size
311+
raise ArgumentError, ":finish must contain one value per cursor column"
312+
end
313+
314+
if (Array(primary_key) - cursor).any?
315+
indexes = model.schema_cache.indexes(table_name)
316+
unique_index = indexes.find { |index| index.unique && index.where.nil? && (Array(index.columns) - cursor).empty? }
317+
318+
unless unique_index
319+
raise ArgumentError, ":cursor must include a primary key or other unique column(s)"
320+
end
321+
end
322+
323+
if (Array(order) - [:asc, :desc]).any?
324+
raise ArgumentError, ":order must be :asc or :desc or an array consisting of :asc or :desc, got #{order.inspect}"
325+
end
326+
end
327+
328+
def apply_limits(relation, cursor, start, finish, batch_orders)
329+
relation = apply_start_limit(relation, cursor, start, batch_orders) if start
330+
relation = apply_finish_limit(relation, cursor, finish, batch_orders) if finish
287331
relation
288332
end
289333

290-
def apply_start_limit(relation, start, batch_orders)
334+
def apply_start_limit(relation, cursor, start, batch_orders)
291335
operators = batch_orders.map do |_column, order|
292336
order == :desc ? :lteq : :gteq
293337
end
294-
batch_condition(relation, primary_key, start, operators)
338+
batch_condition(relation, cursor, start, operators)
295339
end
296340

297-
def apply_finish_limit(relation, finish, batch_orders)
341+
def apply_finish_limit(relation, cursor, finish, batch_orders)
298342
operators = batch_orders.map do |_column, order|
299343
order == :desc ? :gteq : :lteq
300344
end
301-
batch_condition(relation, primary_key, finish, operators)
345+
batch_condition(relation, cursor, finish, operators)
302346
end
303347

304-
def batch_condition(relation, columns, values, operators)
305-
cursor_positions = Array(columns).zip(Array(values), operators)
348+
def batch_condition(relation, cursor, values, operators)
349+
cursor_positions = cursor.zip(Array(values), operators)
306350

307351
first_clause_column, first_clause_value, operator = cursor_positions.pop
308352
where_clause = predicate_builder[first_clause_column, first_clause_value, operator]
@@ -316,9 +360,9 @@ def batch_condition(relation, columns, values, operators)
316360
relation.where(where_clause)
317361
end
318362

319-
def build_batch_orders(order)
320-
get_the_order_of_primary_key(order).map do |column, ord|
321-
[column, ord || DEFAULT_ORDER]
363+
def build_batch_orders(cursor, order)
364+
cursor.zip(Array(order)).map do |column, order_|
365+
[column, order_ || DEFAULT_ORDER]
322366
end
323367
end
324368

@@ -332,23 +376,23 @@ def act_on_ignored_order(error_on_ignore)
332376
end
333377
end
334378

335-
def get_the_order_of_primary_key(order)
336-
Array(primary_key).zip(Array(order))
337-
end
338-
339-
def batch_on_loaded_relation(relation:, start:, finish:, order:, batch_limit:)
379+
def batch_on_loaded_relation(relation:, start:, finish:, cursor:, order:, batch_limit:)
340380
records = relation.to_a
341-
order = build_batch_orders(order).map(&:second)
381+
order = build_batch_orders(cursor, order).map(&:second)
342382

343383
if start || finish
344384
records = records.filter do |record|
345-
(start.nil? || compare_values_for_order(record.id, start, order) >= 0) &&
346-
(finish.nil? || compare_values_for_order(record.id, finish, order) <= 0)
385+
values = record_cursor_values(record, cursor)
386+
387+
(start.nil? || compare_values_for_order(values, Array(start), order) >= 0) &&
388+
(finish.nil? || compare_values_for_order(values, Array(finish), order) <= 0)
347389
end
348390
end
349391

350392
records.sort! do |record1, record2|
351-
compare_values_for_order(record1.id, record2.id, order)
393+
values1 = record_cursor_values(record1, cursor)
394+
values2 = record_cursor_values(record2, cursor)
395+
compare_values_for_order(values1, values2, order)
352396
end
353397

354398
records.each_slice(batch_limit) do |subrecords|
@@ -361,66 +405,65 @@ def batch_on_loaded_relation(relation:, start:, finish:, order:, batch_limit:)
361405
nil
362406
end
363407

408+
def record_cursor_values(record, cursor)
409+
record.attributes.slice(*cursor).values
410+
end
411+
364412
# This is a custom implementation of `<=>` operator,
365413
# which also takes into account how the collection will be ordered.
366-
def compare_values_for_order(value1, value2, order)
367-
# Multiple column values.
368-
if value1.is_a?(Array)
369-
value1.each_with_index do |element1, index|
370-
element2 = value2[index]
371-
direction = order[index]
372-
comparison = element1 <=> element2
373-
comparison = -comparison if direction == :desc
374-
return comparison if comparison != 0
375-
end
376-
377-
0
378-
# Single column values.
379-
elsif order.first == :asc
380-
value1 <=> value2
381-
else
382-
value2 <=> value1
414+
def compare_values_for_order(values1, values2, order)
415+
values1.each_with_index do |element1, index|
416+
element2 = values2[index]
417+
direction = order[index]
418+
comparison = element1 <=> element2
419+
comparison = -comparison if direction == :desc
420+
return comparison if comparison != 0
383421
end
422+
423+
0
384424
end
385425

386-
def batch_on_unloaded_relation(relation:, start:, finish:, load:, order:, use_ranges:, remaining:, batch_limit:)
387-
batch_orders = build_batch_orders(order)
426+
def batch_on_unloaded_relation(relation:, start:, finish:, load:, cursor:, order:, use_ranges:, remaining:, batch_limit:)
427+
batch_orders = build_batch_orders(cursor, order)
388428
relation = relation.reorder(batch_orders.to_h).limit(batch_limit)
389-
relation = apply_limits(relation, start, finish, batch_orders)
429+
relation = apply_limits(relation, cursor, start, finish, batch_orders)
390430
relation.skip_query_cache! # Retaining the results in the query cache would undermine the point of batching
391431
batch_relation = relation
392432
empty_scope = to_sql == model.unscoped.all.to_sql
393433

394434
loop do
395435
if load
396436
records = batch_relation.records
397-
ids = records.map(&:id)
398-
yielded_relation = where(primary_key => ids)
437+
values = records.pluck(*cursor)
438+
yielded_relation = where(cursor => values)
399439
yielded_relation.load_records(records)
400440
elsif (empty_scope && use_ranges != false) || use_ranges
401-
ids = batch_relation.ids
402-
finish = ids.last
441+
values = batch_relation.pluck(*cursor)
442+
443+
finish = values.last
403444
if finish
404-
yielded_relation = apply_finish_limit(batch_relation, finish, batch_orders)
445+
yielded_relation = apply_finish_limit(batch_relation, cursor, finish, batch_orders)
405446
yielded_relation = yielded_relation.except(:limit, :order)
406447
yielded_relation.skip_query_cache!(false)
407448
end
408449
else
409-
ids = batch_relation.ids
410-
yielded_relation = where(primary_key => ids)
450+
values = batch_relation.pluck(*cursor)
451+
yielded_relation = where(cursor => values)
411452
end
412453

413-
break if ids.empty?
454+
break if values.empty?
414455

415-
primary_key_offset = ids.last
416-
raise ArgumentError.new("Primary key not included in the custom select clause") unless primary_key_offset
456+
if values.flatten.any?(nil)
457+
raise ArgumentError, "Not all of the batch cursor columns were included in the custom select clause "\
458+
"or some columns contain nil."
459+
end
417460

418461
yield yielded_relation
419462

420-
break if ids.length < batch_limit
463+
break if values.length < batch_limit
421464

422465
if limit_value
423-
remaining -= ids.length
466+
remaining -= values.length
424467

425468
if remaining == 0
426469
# Saves a useless iteration when the limit is a multiple of the
@@ -438,7 +481,8 @@ def batch_on_unloaded_relation(relation:, start:, finish:, load:, order:, use_ra
438481
end
439482
operators << (last_order == :desc ? :lt : :gt)
440483

441-
batch_relation = batch_condition(relation, primary_key, primary_key_offset, operators)
484+
cursor_value = values.last
485+
batch_relation = batch_condition(relation, cursor, cursor_value, operators)
442486
end
443487

444488
nil

0 commit comments

Comments
 (0)