Skip to content

Commit d9639a2

Browse files
eileencodesbyroot
andcommitted
Allow async executor to be configurable
This is a followup/alternative to rails#41406. This change wouldn't work for GitHub because we intend to implement an executor for each database and use the database configuration to set the `min_threads` and `max_threads` for each one. The changes here borrow from rails#41406 by implementing an `Concurrent::ImmediateExecutor` by default. Otherwise applications have the option of having one global thread pool that is used by all connections or a thread pool for each connection. A global thread pool can set with `config.active_record.async_query_executor = :global_thread_pool`. This will create a single `Concurrent::ThreadPoolExecutor` for applications to utilize. By default the concurrency is 4, but it can be changed for the `global_thread_pool` by setting `global_executor_concurrency` to another number. If applications want to use a thread pool per database connection they can set `config.active_record.async_query_executor = :multi_thread_pool`. This will create a `Concurrent::ThreadPoolExecutor` for each database connection and set the `min_threads` and `max_threads` by their configuration values or the defaults. I've also moved the async tests out of the adapter test and into their own tests and added tests for all the new functionality. This change would allow us at GitHub to control threads per database and per writer/reader or other apps to use one global executor. The immediate executor allows apps to no-op by default. Took the immediate executor idea from Jean's PR. Co-authored-by: Jean Boussier <[email protected]>
1 parent 5b2fb8a commit d9639a2

File tree

9 files changed

+394
-122
lines changed

9 files changed

+394
-122
lines changed

activerecord/CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
* Allow applications to configure the thread pool for async queries
2+
3+
Some applications may want one thread pool per database whereas others want to use
4+
a single global thread pool for all queries. By default Rails will set `async_query_executor`
5+
to `:immediate` and create a `Concurrent::ImmediateExecutor` object which is essentially a no-op.
6+
To create one thread pool for all database connections to use applications can set
7+
`config.active_record.async_query_executor` to `:global_thread_pool` and optionally define
8+
`config.active_record.global_executor_concurrency`. This defaults to 4. For applications that want
9+
to have a thread pool for each database connection, `config.active_record.async_query_executor` can
10+
be set to `:multi_thread_pool`. The configuration for each thread pool is set in the database
11+
configuration.
12+
13+
*Eileen M. Uchitelle*
14+
115
* Allow new syntax for `enum` to avoid leading `_` from reserved options.
216

317
Before:

activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,7 @@ def initialize(pool_config)
144144

145145
@lock_thread = false
146146

147-
@async_executor = Concurrent::ThreadPoolExecutor.new(
148-
min_threads: 0,
149-
max_threads: @size,
150-
max_queue: @size * 4,
151-
fallback_policy: :caller_runs
152-
)
147+
@async_executor = build_async_executor
153148

154149
@reaper = Reaper.new(self, db_config.reaping_frequency)
155150
@reaper.run
@@ -463,6 +458,22 @@ def schedule_query(future_result) # :nodoc:
463458
end
464459

465460
private
461+
def build_async_executor
462+
case Base.async_query_executor
463+
when :multi_thread_pool
464+
Concurrent::ThreadPoolExecutor.new(
465+
min_threads: @db_config.min_threads,
466+
max_threads: @db_config.max_threads,
467+
max_queue: @db_config.max_queue,
468+
fallback_policy: :caller_runs
469+
)
470+
when :global_thread_pool
471+
Base.global_thread_pool_async_query_executor
472+
else
473+
Base.immediate_query_executor
474+
end
475+
end
476+
466477
#--
467478
# this is unfortunately not concurrent
468479
def bulk_make_new_connections(num_new_conns_needed)

activerecord/lib/active_record/core.rb

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,44 @@ def self.configurations
157157

158158
mattr_accessor :application_record_class, instance_accessor: false, default: nil
159159

160+
# Sets the async_query_executor for an application. By default the thread pool executor
161+
# set to `:immediate. Options are:
162+
#
163+
# * :immediate - Initializes a single +Concurrent::ImmediateExecutor+
164+
# * :global_thread_pool - Initializes a single +Concurrent::ThreadPoolExecutor+
165+
# that uses the +async_query_concurrency+ for the +max_threads+ value.
166+
# * :multi_thread_pool - Initializes a +Concurrent::ThreadPoolExecutor+ for each
167+
# database connection. The initializer values are defined in the configuration hash.
168+
mattr_accessor :async_query_executor, instance_accessor: false, default: :immediate
169+
170+
def self.immediate_query_executor # :nodoc:
171+
@@immediate_query_executor ||= Concurrent::ImmediateExecutor.new
172+
end
173+
174+
def self.global_thread_pool_async_query_executor # :nodoc:
175+
concurrency = global_executor_concurrency || 4
176+
@@global_thread_pool_async_query_executor ||= Concurrent::ThreadPoolExecutor.new(
177+
min_threads: 0,
178+
max_threads: concurrency,
179+
max_queue: concurrency * 4,
180+
fallback_policy: :caller_runs
181+
)
182+
end
183+
184+
# Set the +global_executor_concurrency+. This configuration value can only be used
185+
# with the global thread pool async query executor.
186+
def self.global_executor_concurrency=(global_executor_concurrency)
187+
if async_query_executor == :immediate || async_query_executor == :multi_thread_pool
188+
raise ArgumentError, "`global_executor_concurrency` cannot be set when using either immediate or multiple thread pools. For multiple thread pools, please set the concurrency in your database configuration. Immediate thread pools are essentially a no-op."
189+
end
190+
191+
@@global_executor_concurrency = global_executor_concurrency
192+
end
193+
194+
def self.global_executor_concurrency # :nodoc:
195+
@@global_executor_concurrency ||= nil
196+
end
197+
160198
def self.application_record_class? # :nodoc:
161199
if Base.application_record_class
162200
self == Base.application_record_class

activerecord/lib/active_record/database_configurations/database_config.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,18 @@ def pool
4848
raise NotImplementedError
4949
end
5050

51+
def min_threads
52+
raise NotImplementedError
53+
end
54+
55+
def max_threads
56+
raise NotImplementedError
57+
end
58+
59+
def max_queue
60+
raise NotImplementedError
61+
end
62+
5163
def checkout_timeout
5264
raise NotImplementedError
5365
end

activerecord/lib/active_record/database_configurations/hash_config.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,18 @@ def pool
6666
(configuration_hash[:pool] || 5).to_i
6767
end
6868

69+
def min_threads
70+
(configuration_hash[:min_threads] || 0).to_i
71+
end
72+
73+
def max_threads
74+
(configuration_hash[:max_threads] || pool).to_i
75+
end
76+
77+
def max_queue
78+
max_threads * 4
79+
end
80+
6981
def checkout_timeout
7082
(configuration_hash[:checkout_timeout] || 5).to_f
7183
end

activerecord/test/cases/adapter_test.rb

Lines changed: 0 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -343,122 +343,6 @@ def test_in_clause_length_is_deprecated
343343
end
344344
end
345345

346-
module AsynchronousQueriesSharedTests
347-
def test_async_select_failure
348-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
349-
350-
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
351-
assert_kind_of ActiveRecord::FutureResult, future_result
352-
assert_raises ActiveRecord::StatementInvalid do
353-
future_result.result
354-
end
355-
ensure
356-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
357-
end
358-
359-
def test_async_query_from_transaction
360-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
361-
362-
assert_nothing_raised do
363-
@connection.select_all "SELECT * FROM posts", async: true
364-
end
365-
366-
@connection.transaction do
367-
assert_raises AsynchronousQueryInsideTransactionError do
368-
@connection.select_all "SELECT * FROM posts", async: true
369-
end
370-
end
371-
ensure
372-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
373-
end
374-
375-
def test_async_query_cache
376-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
377-
378-
@connection.enable_query_cache!
379-
380-
@connection.select_all "SELECT * FROM posts"
381-
result = @connection.select_all "SELECT * FROM posts", async: true
382-
assert_equal Result, result.class
383-
ensure
384-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
385-
@connection.disable_query_cache!
386-
end
387-
388-
def test_async_query_foreground_fallback
389-
status = {}
390-
391-
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
392-
if event.payload[:sql] == "SELECT * FROM does_not_exists"
393-
status[:executed] = true
394-
status[:async] = event.payload[:async]
395-
end
396-
end
397-
398-
@connection.pool.stub(:schedule_query, proc { }) do
399-
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
400-
assert_kind_of ActiveRecord::FutureResult, future_result
401-
assert_raises ActiveRecord::StatementInvalid do
402-
future_result.result
403-
end
404-
end
405-
406-
assert_equal true, status[:executed]
407-
assert_equal false, status[:async]
408-
ensure
409-
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
410-
end
411-
end
412-
413-
class AsynchronousQueriesTest < ActiveRecord::TestCase
414-
self.use_transactional_tests = false
415-
416-
include AsynchronousQueriesSharedTests
417-
418-
def setup
419-
@connection = ActiveRecord::Base.connection
420-
end
421-
422-
def test_async_select_all
423-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
424-
status = {}
425-
426-
monitor = Monitor.new
427-
condition = monitor.new_cond
428-
429-
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
430-
if event.payload[:sql] == "SELECT * FROM posts"
431-
status[:executed] = true
432-
status[:async] = event.payload[:async]
433-
monitor.synchronize { condition.signal }
434-
end
435-
end
436-
437-
future_result = @connection.select_all "SELECT * FROM posts", async: true
438-
assert_kind_of ActiveRecord::FutureResult, future_result
439-
440-
monitor.synchronize do
441-
condition.wait_until { status[:executed] }
442-
end
443-
assert_kind_of ActiveRecord::Result, future_result.result
444-
assert_equal @connection.supports_concurrent_connections?, status[:async]
445-
ensure
446-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
447-
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
448-
end
449-
end
450-
451-
class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase
452-
self.use_transactional_tests = true
453-
454-
include AsynchronousQueriesSharedTests
455-
456-
def setup
457-
@connection = ActiveRecord::Base.connection
458-
@connection.materialize_transactions
459-
end
460-
end
461-
462346
class AdapterForeignKeyTest < ActiveRecord::TestCase
463347
self.use_transactional_tests = false
464348

0 commit comments

Comments
 (0)