Skip to content

Commit 311b88d

Browse files
Merge pull request #510 from zilverline/do-not-default-to-nested-transactions
Do not default to using nested transactions
2 parents 08e0bd6 + bd8c976 commit 311b88d

17 files changed

+171
-108
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# frozen_string_literal: true
2+
3+
class SequentAvoidExceptionHandlingInUpdateUniqueKeys < ActiveRecord::Migration[7.2]
4+
def up
5+
Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do
6+
execute_sql_file 'update_unique_keys', version: 4
7+
end
8+
end
9+
10+
def down
11+
Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do
12+
execute_sql_file 'update_unique_keys', version: 3
13+
end
14+
end
15+
16+
private
17+
18+
def execute_sql_file(filename, version:)
19+
say "Applying '#{filename}' version #{version}", true
20+
suppress_messages do
21+
execute File.read(
22+
File.join(
23+
File.dirname(__FILE__),
24+
format('sequent/%s_v%02d.sql', filename, version),
25+
),
26+
)
27+
end
28+
end
29+
end
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
CREATE OR REPLACE PROCEDURE update_unique_keys(_stream_records jsonb)
2+
LANGUAGE plpgsql SET search_path FROM CURRENT AS $$
3+
DECLARE
4+
_aggregate jsonb;
5+
_aggregate_id aggregates.aggregate_id%TYPE;
6+
_aggregate_version events.sequence_number%TYPE;
7+
_unique_keys jsonb;
8+
BEGIN
9+
FOR _aggregate IN SELECT aggregate FROM jsonb_array_elements(_stream_records) AS aggregate ORDER BY aggregate->>'aggregate_id' LOOP
10+
_aggregate_id = _aggregate->>'aggregate_id';
11+
_aggregate_version = _aggregate->>'aggregate_version';
12+
IF _aggregate_version <> (
13+
SELECT e.sequence_number AS aggregate_version
14+
FROM aggregates a LEFT JOIN events e ON (a.aggregate_id, a.events_partition_key) = (e.aggregate_id, e.partition_key)
15+
WHERE a.aggregate_id = _aggregate_id
16+
ORDER BY e.sequence_number DESC
17+
LIMIT 1
18+
FOR NO KEY UPDATE OF a
19+
) THEN
20+
RAISE EXCEPTION 'update_unique_keys: aggregate version is not equal to latest event sequence number for aggregate %', _aggregate_id;
21+
END IF;
22+
END LOOP;
23+
24+
FOR _aggregate IN SELECT aggregate FROM jsonb_array_elements(_stream_records) AS aggregate ORDER BY aggregate->>'aggregate_id' LOOP
25+
_aggregate_id = _aggregate->>'aggregate_id';
26+
_unique_keys = COALESCE(_aggregate->'unique_keys', '{}'::jsonb);
27+
28+
DELETE FROM aggregate_unique_keys AS target
29+
WHERE target.aggregate_id = _aggregate_id
30+
AND NOT (_unique_keys ? target.scope);
31+
END LOOP;
32+
33+
FOR _aggregate IN SELECT aggregate FROM jsonb_array_elements(_stream_records) AS aggregate ORDER BY aggregate->>'aggregate_id' LOOP
34+
_aggregate_id = _aggregate->>'aggregate_id';
35+
_unique_keys = COALESCE(_aggregate->'unique_keys', '{}'::jsonb);
36+
37+
INSERT INTO aggregate_unique_keys AS target (aggregate_id, scope, key)
38+
SELECT _aggregate_id, key, value
39+
FROM jsonb_each(_unique_keys) AS x
40+
ORDER BY 1, 2
41+
ON CONFLICT (aggregate_id, scope) DO UPDATE
42+
SET key = EXCLUDED.key
43+
WHERE target.key <> EXCLUDED.key;
44+
END LOOP;
45+
END;
46+
$$;

db/structure.sql

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -734,10 +734,6 @@ BEGIN
734734
SET key = EXCLUDED.key
735735
WHERE target.key <> EXCLUDED.key;
736736
END LOOP;
737-
EXCEPTION
738-
WHEN unique_violation THEN
739-
RAISE unique_violation
740-
USING MESSAGE = 'duplicate unique key value for aggregate ' || (_aggregate->>'aggregate_type') || ' ' || _aggregate_id || ' (' || SQLERRM || ')';
741737
END;
742738
$$;
743739

@@ -1588,6 +1584,7 @@ ALTER TABLE ONLY sequent_schema.snapshot_records
15881584
SET search_path TO public,view_schema,sequent_schema;
15891585

15901586
INSERT INTO "schema_migrations" (version) VALUES
1587+
('20260220122600'),
15911588
('20260129130000'),
15921589
('20250815103000'),
15931590
('20250630113000'),

lib/sequent/core/active_projectors_event_publisher.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def process_events(event_handlers, ...)
2121
# (active or not) and updating the projector tables. Normally a transaction is already
2222
# active due to using the `CommandService#execute_command`, but if this event publisher is
2323
# used directly it is also important to run inside a transaction.
24-
Sequent.configuration.transaction_provider.transactional do
24+
Sequent.configuration.transaction_provider.transaction do
2525
ensure_no_unknown_active_projectors!(event_handlers)
2626
active_event_handlers = event_handlers.select { |x| active?(x) }
2727
super(active_event_handlers, ...)

lib/sequent/core/command_service.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def execute_commands(*commands, validation_context: nil)
4444

4545
def process_commands(validation_context)
4646
Sequent::Util.skip_if_already_processing(:command_service_process_commands) do
47-
transaction_provider.transactional do
47+
transaction_provider.transaction do
4848
until command_queue.empty?
4949
command = command_queue.pop
5050
command_middleware.invoke(command) do

lib/sequent/core/event_store.rb

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,8 @@
1010
module Sequent
1111
module Core
1212
class AggregateKeyNotUniqueError < RuntimeError
13-
attr_reader :aggregate_type, :aggregate_id
14-
1513
def self.unique_key_error_message?(message)
16-
message =~ /duplicate unique key value for aggregate/
17-
end
18-
19-
def initialize(message)
20-
super
21-
22-
match = message.match(
23-
# rubocop:disable Layout/LineLength
24-
/aggregate (\p{Upper}\p{Alnum}*(?:::\p{Upper}\p{Alnum}*)*) (\p{XDigit}{8}-\p{XDigit}{4}-\p{XDigit}{4}-\p{XDigit}{4}-\p{XDigit}{12})/,
25-
# rubocop:enable Layout/LineLength
26-
)
27-
if match
28-
@aggregate_type = match[1]
29-
@aggregate_id = match[2]
30-
end
14+
message =~ /duplicate key value violates unique constraint \"aggregate_unique_keys_scope_key_key\"/
3115
end
3216
end
3317

lib/sequent/core/projectors.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def deactivate_unknown_projectors!(known_projector_classes: Sequent::Core::Migra
4343
.pluck(:name)
4444
return if unknown_active_projector_names.empty?
4545

46-
transaction_provider.transactional do
46+
transaction_provider.transaction do
4747
ProjectorState
4848
.where.not(name: known_projector_classes.map(&:name))
4949
.update_all(active_version: nil)
@@ -89,7 +89,7 @@ def projector_states
8989
cached = Thread.current[PROJECTOR_STATES_KEY]
9090
return cached unless cached.nil?
9191

92-
transaction_provider.transactional do
92+
transaction_provider.transaction do
9393
cleanup = -> { Thread.current[PROJECTOR_STATES_KEY] = nil }
9494
transaction_provider.after_commit(&cleanup)
9595
transaction_provider.after_rollback(&cleanup)
@@ -116,7 +116,7 @@ def transaction_provider
116116
def update_projector_state(rows)
117117
return if rows.empty?
118118

119-
transaction_provider.transactional do
119+
transaction_provider.transaction do
120120
lock_projector_states_for_update
121121

122122
ProjectorState.upsert_all(rows)

lib/sequent/core/transactions/active_record_transaction_provider.rb

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,36 +3,51 @@
33
module Sequent
44
module Core
55
module Transactions
6-
##
7-
# Always require a new transaction.
8-
#
9-
# Read:
10-
# http://api.rubyonrails.org/classes/ActiveRecord/Transactions/ClassMethods.html
11-
#
12-
# Without this change, there is a potential bug:
13-
#
14-
# ```ruby
15-
# ActiveRecord::Base.transaction do
16-
# Sequent.configuration.command_service.execute_commands command
17-
# end
18-
#
19-
# on Command do
20-
# do.some.things
21-
# fail ActiveRecord::Rollback
22-
# end
23-
# ```
24-
#
25-
# In this example, you might be surprised to find that `do.some.things`
26-
# does not get rolled back! This is because AR doesn't automatically make
27-
# a "savepoint" for us when we call `.transaction` in a nested manner. In
28-
# order to enable this behaviour, we have to call `.transaction` like
29-
# this: `.transaction(requires_new: true)`.
30-
#
316
class ActiveRecordTransactionProvider
32-
def transactional(&block)
33-
ActiveRecord::Base.transaction(requires_new: true, &block)
7+
attr_reader :requires_new
8+
9+
##
10+
# Configure if save points should be used to simulate nested transactions. This is only
11+
# useful in combination with the `ActiveRecord::Rollback` exception.
12+
#
13+
# Read: https://api.rubyonrails.org/classes/ActiveRecord/Rollback.html and
14+
# http://api.rubyonrails.org/classes/ActiveRecord/Transactions/ClassMethods.html
15+
#
16+
# ```ruby
17+
# ActiveRecord::Base.transaction do
18+
# Sequent.configuration.command_service.execute_commands command
19+
# end
20+
#
21+
# on Command do
22+
# do.some.things
23+
# fail ActiveRecord::Rollback
24+
# end
25+
# ```
26+
#
27+
# In this example, you might be surprised to find that `do.some.things` does not get rolled
28+
# back! This only happens with `ActiveRecord::Rollback`, since it is handled by the inner
29+
# transaction. All other exceptions are automatically propagated and will cause the parent
30+
# transaction to rollback.
31+
#
32+
# Note that using save points with PostgreSQL adds additional overhead and is rarely useful,
33+
# so our advice is to only use `ActiveRecord::Rollback` directly inside of an
34+
# `ActiveRecord::Base.transaction(requires_new: true) do ... end` block so it is clear what
35+
# the expected behavior is.
36+
def initialize(requires_new: false)
37+
@requires_new = requires_new
38+
warn <<~EOS if @requires_new
39+
[DEPRECATED] avoid using `requires_new: true` globally, use explicit `ActiveRecord::Base.transaction(requires_new: true)`
40+
blocks with `ActiveRecord::Rollback` instead if nested transactions are needed.
41+
EOS
42+
end
43+
44+
def transaction(requires_new: @requires_new, &block)
45+
ActiveRecord::Base.transaction(requires_new:, &block)
3446
end
3547

48+
# Deprecated alias
49+
alias transactional transaction
50+
3651
def after_commit(&block)
3752
ActiveRecord::Base.current_transaction.after_commit(&block)
3853
end

lib/sequent/core/transactions/no_transactions.rb

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,15 @@ module Transactions
99
# view state will always be recreated anyway.
1010
#
1111
class NoTransactions
12-
def transactional
13-
yield
12+
def transaction
13+
yield ActiveRecord::Transaction::NULL_TRANSACTION
1414
end
15+
16+
# Deprecated alias
17+
alias transactional transaction
18+
19+
def after_commit = yield
20+
def after_rollback = nil
1521
end
1622
end
1723
end

lib/sequent/core/transactions/read_only_active_record_transaction_provider.rb

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,21 @@ def initialize(transaction_provider)
88
@transaction_provider = transaction_provider
99
end
1010

11-
def transactional(&block)
11+
def transaction(&block)
1212
register_call
13-
@transaction_provider.transactional do
13+
@transaction_provider.transaction(requires_new: true) do |transaction|
1414
Sequent::ApplicationRecord.connection.execute('SET TRANSACTION READ ONLY')
15-
block.call
16-
ensure
17-
deregister_call
18-
reset_stack_size if stack_size == 0
15+
block.call(transaction)
1916
end
17+
ensure
18+
deregister_call
19+
reset_stack_size if stack_size == 0
2020
end
2121

22-
def after_commit(&block)
23-
ActiveRecord::Base.current_transaction.after_commit(&block)
24-
end
22+
# Deprecated
23+
alias transactional transaction
2524

26-
def after_rollback(&block)
27-
ActiveRecord::Base.current_transaction.after_rollback(&block)
28-
end
25+
delegate :after_commit, :after_rollback, to: :@transaction_provider
2926

3027
private
3128

0 commit comments

Comments
 (0)