Skip to content

Commit b57dcec

Browse files
committed
Refactor Active Record connection adapters further
A `raw_execute` implementation is now provided, instead adapters have to implement `perform_query`. It's a much simpler method that no longer need to concern itself with Active Support notifications nor calling `with_raw_connection`.
1 parent f9f7deb commit b57dcec

File tree

6 files changed

+112
-109
lines changed

6 files changed

+112
-109
lines changed

activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,28 @@ def internal_exec_query(...) # :nodoc:
545545
private
546546
# Lowest level way to execute a query. Doesn't check for illegal writes, doesn't annotate queries, yields a native result object.
547547
def raw_execute(sql, name = nil, binds = [], prepare: false, async: false, allow_retry: false, materialize_transactions: true)
548+
type_casted_binds = type_casted_binds(binds)
549+
notification_payload = {
550+
sql: sql,
551+
name: name,
552+
binds: binds,
553+
type_casted_binds: type_casted_binds,
554+
async: async,
555+
connection: self,
556+
transaction: current_transaction.user_transaction.presence,
557+
statement_name: nil,
558+
row_count: 0,
559+
}
560+
@instrumenter.instrument("sql.active_record", notification_payload) do
561+
with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn|
562+
perform_query(conn, sql, binds, type_casted_binds, prepare: prepare, notification_payload: notification_payload)
563+
end
564+
rescue ActiveRecord::StatementInvalid => ex
565+
raise ex.set_query(sql, binds)
566+
end
567+
end
568+
569+
def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:)
548570
raise NotImplementedError
549571
end
550572

activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,7 @@ def warning_ignored?(warning)
773773
# Make sure we carry over any changes to ActiveRecord.default_timezone that have been
774774
# made since we established the connection
775775
def sync_timezone_changes(raw_connection)
776+
raise NotImplementedError
776777
end
777778

778779
# See https://dev.mysql.com/doc/mysql-errors/en/server-error-reference.html

activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -58,37 +58,34 @@ def with_multi_statements
5858
end
5959
end
6060

61-
def raw_execute(sql, name, binds = nil, prepare: false, async: false, allow_retry: false, materialize_transactions: true)
62-
log(sql, name, async: async) do |notification_payload|
63-
with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn|
64-
sync_timezone_changes(conn)
65-
66-
result = if prepare
67-
stmt = @statements[sql] ||= conn.prepare(sql)
68-
69-
begin
70-
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
71-
stmt.execute(*type_casted_binds)
72-
end
73-
rescue ::Mysql2::Error
74-
@statements.delete(sql)
75-
stmt.close
76-
raise
77-
end
78-
verified!
79-
else
80-
conn.query(sql)
81-
end
61+
def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:)
62+
sync_timezone_changes(raw_connection)
8263

83-
@affected_rows_before_warnings = conn.affected_rows
84-
conn.abandon_results!
64+
result = if prepare
65+
stmt = @statements[sql] ||= raw_connection.prepare(sql)
8566

86-
verified!
87-
handle_warnings(sql)
88-
notification_payload[:row_count] = result&.size || 0
89-
result
67+
begin
68+
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
69+
stmt.execute(*type_casted_binds)
70+
end
71+
rescue ::Mysql2::Error
72+
@statements.delete(sql)
73+
stmt.close
74+
raise
9075
end
76+
verified!
77+
else
78+
raw_connection.query(sql)
9179
end
80+
81+
notification_payload[:row_count] = result&.size || 0
82+
83+
@affected_rows_before_warnings = raw_connection.affected_rows
84+
raw_connection.abandon_results!
85+
86+
verified!
87+
handle_warnings(sql)
88+
result
9289
end
9390

9491
def cast_result(result)

activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb

Lines changed: 27 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -132,47 +132,40 @@ def cancel_any_running_query
132132
rescue PG::Error
133133
end
134134

135-
def raw_execute(sql, name, binds = nil, prepare: false, async: false, allow_retry: false, materialize_transactions: true)
135+
def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:)
136136
update_typemap_for_default_timezone
137-
138-
type_casted_binds = type_casted_binds(binds)
139-
140-
log(sql, name, binds, type_casted_binds, async: async) do |notification_payload|
141-
with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn|
142-
result = if prepare
143-
begin
144-
stmt_key = prepare_statement(sql, binds, conn)
145-
notification_payload[:statement_name] = stmt_key
146-
conn.exec_prepared(stmt_key, type_casted_binds)
147-
rescue PG::FeatureNotSupported => error
148-
if is_cached_plan_failure?(error)
149-
# Nothing we can do if we are in a transaction because all commands
150-
# will raise InFailedSQLTransaction
151-
if in_transaction?
152-
raise PreparedStatementCacheExpired.new(error.message, connection_pool: @pool)
153-
else
154-
@lock.synchronize do
155-
# outside of transactions we can simply flush this query and retry
156-
@statements.delete sql_key(sql)
157-
end
158-
retry
159-
end
137+
result = if prepare
138+
begin
139+
stmt_key = prepare_statement(sql, binds, raw_connection)
140+
notification_payload[:statement_name] = stmt_key
141+
raw_connection.exec_prepared(stmt_key, type_casted_binds)
142+
rescue PG::FeatureNotSupported => error
143+
if is_cached_plan_failure?(error)
144+
# Nothing we can do if we are in a transaction because all commands
145+
# will raise InFailedSQLTransaction
146+
if in_transaction?
147+
raise PreparedStatementCacheExpired.new(error.message, connection_pool: @pool)
148+
else
149+
@lock.synchronize do
150+
# outside of transactions we can simply flush this query and retry
151+
@statements.delete sql_key(sql)
160152
end
161-
162-
raise
153+
retry
163154
end
164-
elsif without_prepared_statement?(binds)
165-
conn.async_exec(sql)
166-
else
167-
conn.exec_params(sql, type_casted_binds)
168155
end
169156

170-
verified!
171-
handle_warnings(result)
172-
notification_payload[:row_count] = result.count
173-
result
157+
raise
174158
end
159+
elsif without_prepared_statement?(binds)
160+
raw_connection.async_exec(sql)
161+
else
162+
raw_connection.exec_params(sql, type_casted_binds)
175163
end
164+
165+
verified!
166+
handle_warnings(result)
167+
notification_payload[:row_count] = result.count
168+
result
176169
end
177170

178171
def cast_result(result)

activerecord/lib/active_record/connection_adapters/sqlite3/database_statements.rb

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -86,46 +86,40 @@ def internal_begin_transaction(mode, isolation)
8686
end
8787
end
8888

89-
def raw_execute(sql, name = nil, binds = [], prepare: false, async: false, allow_retry: false, materialize_transactions: true) # :nodoc:
90-
type_casted_binds = type_casted_binds(binds)
91-
92-
result = log(sql, name, binds, type_casted_binds, async: async) do |notification_payload|
93-
with_raw_connection(materialize_transactions: materialize_transactions) do |conn|
94-
if prepare
95-
stmt = @statements[sql] ||= conn.prepare(sql)
96-
stmt.reset!
89+
def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:)
90+
if prepare
91+
stmt = @statements[sql] ||= raw_connection.prepare(sql)
92+
stmt.reset!
93+
stmt.bind_params(type_casted_binds)
94+
95+
result = if stmt.column_count.zero? # No return
96+
stmt.step
97+
ActiveRecord::Result.empty
98+
else
99+
ActiveRecord::Result.new(stmt.columns, stmt.to_a)
100+
end
101+
else
102+
# Don't cache statements if they are not prepared.
103+
stmt = raw_connection.prepare(sql)
104+
begin
105+
unless without_prepared_statement?(binds)
97106
stmt.bind_params(type_casted_binds)
98-
99-
result = if stmt.column_count.zero? # No return
100-
stmt.step
101-
ActiveRecord::Result.empty
102-
else
103-
ActiveRecord::Result.new(stmt.columns, stmt.to_a)
104-
end
107+
end
108+
result = if stmt.column_count.zero? # No return
109+
stmt.step
110+
ActiveRecord::Result.empty
105111
else
106-
# Don't cache statements if they are not prepared.
107-
stmt = conn.prepare(sql)
108-
begin
109-
unless without_prepared_statement?(binds)
110-
stmt.bind_params(type_casted_binds)
111-
end
112-
result = if stmt.column_count.zero? # No return
113-
stmt.step
114-
ActiveRecord::Result.empty
115-
else
116-
ActiveRecord::Result.new(stmt.columns, stmt.to_a)
117-
end
118-
ensure
119-
stmt.close
120-
end
112+
ActiveRecord::Result.new(stmt.columns, stmt.to_a)
121113
end
122-
@last_affected_rows = @raw_connection.changes
123-
verified!
124-
125-
notification_payload[:row_count] = result.length
126-
result
114+
ensure
115+
stmt.close
127116
end
128117
end
118+
@last_affected_rows = raw_connection.changes
119+
verified!
120+
121+
notification_payload[:row_count] = result.length
122+
result
129123
end
130124

131125
def cast_result(result)

activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,16 @@ def exec_insert(sql, name, binds, pk = nil, sequence_name = nil, returning: nil)
1010
end
1111

1212
private
13-
def raw_execute(sql, name, binds = nil, prepare: false, async: false, allow_retry: false, materialize_transactions: true)
14-
log(sql, name, async: async) do |notification_payload|
15-
with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn|
16-
sync_timezone_changes(conn)
17-
result = conn.query(sql)
18-
while conn.more_results_exist?
19-
conn.next_result
20-
end
21-
verified!
22-
handle_warnings(sql)
23-
notification_payload[:row_count] = result.count
24-
result
25-
end
13+
def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:)
14+
sync_timezone_changes(raw_connection)
15+
result = raw_connection.query(sql)
16+
while raw_connection.more_results_exist?
17+
raw_connection.next_result
2618
end
19+
verified!
20+
handle_warnings(sql)
21+
notification_payload[:row_count] = result.count
22+
result
2723
end
2824

2925
def cast_result(result)

0 commit comments

Comments
 (0)