Skip to content

Commit eebbe6a

Browse files
committed
Add new option.
1 parent a92b483 commit eebbe6a

File tree

12 files changed

+258
-15
lines changed

12 files changed

+258
-15
lines changed

lib/table_sync/receiving.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module Receiving
66
require_relative "receiving/config_decorator"
77
require_relative "receiving/dsl"
88
require_relative "receiving/handler"
9+
require_relative "receiving/hooks/once"
910
require_relative "receiving/model/active_record"
1011
require_relative "receiving/model/sequel"
1112
end

lib/table_sync/receiving/config.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def option(name)
218218
value_as_proc_setter_wrapper: any_value,
219219
default: proc { proc { |&block| block.call } }
220220

221-
TableSync::Receiving::Config.add_simple_option :conditional_handler
221+
TableSync::Receiving::Config.add_simple_option :on_first_sync
222222

223223
%i[
224224
before_update

lib/table_sync/receiving/handler.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,12 @@ def perform(config, params) # rubocop:disable Metrics/MethodLength
160160
if event == :update
161161
model.after_commit do
162162
config.option(:after_commit_on_update, **params, results:)
163+
164+
conditions, handler = config.option(:on_first_sync)
165+
if conditions.present?
166+
hook = ::TableSync::Receiving::Hooks::Once.new(conditions:, config:)
167+
hook.perform(targets: results, &handler)
168+
end
163169
end
164170
else
165171
model.after_commit { config.option(:after_commit_on_destroy, **params, results:) }
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# frozen_string_literal: true
2+
3+
module TableSync::Receiving::Hooks
4+
class Once
5+
attr_reader :conditions, :config
6+
7+
def initialize(conditions:, config:)
8+
@conditions = conditions
9+
@config = config
10+
end
11+
12+
def perform(targets:, &)
13+
target_keys = config.option(:target_keys)
14+
model = config.model
15+
16+
targets.each do |target|
17+
next unless conditions?(target)
18+
19+
model.transaction(isolation: model.isolation_level(:repeatable)) do
20+
model.find_and_update(row: target, target_keys:) do |entry|
21+
next unless allow?(entry)
22+
23+
entry.hooks ||= []
24+
entry.hooks << hook_lookup_code
25+
model.after_commit { yield(entry:) }
26+
end
27+
end
28+
end
29+
end
30+
31+
private
32+
33+
def allow?(entry)
34+
Array(entry.hooks).exclude?(hook_lookup_code)
35+
end
36+
37+
def hook_lookup_code
38+
@hook_lookup_code ||= conditions[:columns].map do |column|
39+
"#{column}-#{conditions[column]}"
40+
end.join(":")
41+
end
42+
43+
def conditions?(row)
44+
conditions[:columns].all? do |column|
45+
row[column] == (conditions[column] || row[column])
46+
end
47+
end
48+
end
49+
end

lib/table_sync/receiving/model/active_record.rb

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22

33
module TableSync::Receiving::Model
44
class ActiveRecord
5+
ISOLATION_LEVELS = {
6+
uncommitted: :read_uncommitted,
7+
committed: :read_committed,
8+
repeatable: :repeatable_read,
9+
serializable: :serializable,
10+
}.freeze
11+
private_constant :ISOLATION_LEVELS
12+
513
class AfterCommitWrap
614
def initialize(&block)
715
@callback = block
@@ -33,6 +41,10 @@ def initialize(table_name)
3341
@schema = model_naming.schema.to_sym
3442
end
3543

44+
def isolation_level(lookup_code)
45+
ISOLATION_LEVELS.fetch(lookup_code)
46+
end
47+
3648
def columns
3749
raw_model.column_names.map(&:to_sym)
3850
end
@@ -110,14 +122,20 @@ def validate_types(data)
110122
types_validator.validate(data)
111123
end
112124

113-
def transaction(&)
114-
::ActiveRecord::Base.transaction(&)
125+
def transaction(**params, &)
126+
::ActiveRecord::Base.transaction(**params, &)
115127
end
116128

117129
def after_commit(&)
118130
db.add_transaction_record(AfterCommitWrap.new(&))
119131
end
120132

133+
def find_and_update(row:, target_keys:)
134+
entry = raw_model.find_by!(row.slice(*target_keys))
135+
yield entry
136+
entry.save!
137+
end
138+
121139
private
122140

123141
attr_reader :raw_model, :types_validator

lib/table_sync/receiving/model/sequel.rb

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@ module TableSync::Receiving::Model
44
class Sequel
55
attr_reader :table, :schema
66

7+
ISOLATION_LEVELS = {
8+
uncommitted: :uncommitted,
9+
committed: :committed,
10+
repeatable: :repeatable,
11+
serializable: :serializable,
12+
}.freeze
13+
private_constant :ISOLATION_LEVELS
14+
715
def initialize(table_name)
816
@raw_model = Class.new(::Sequel::Model(table_name)).tap(&:unrestrict_primary_key)
917
@types_validator = TableSync::Utils::Schema::Builder::Sequel.build(@raw_model)
@@ -17,6 +25,10 @@ def initialize(table_name)
1725
@schema = model_naming.schema.to_sym
1826
end
1927

28+
def isolation_level(lookup_code)
29+
ISOLATION_LEVELS.fetch(lookup_code)
30+
end
31+
2032
def columns
2133
dataset.columns
2234
end
@@ -57,14 +69,20 @@ def validate_types(data)
5769
types_validator.validate(data)
5870
end
5971

60-
def transaction(&)
61-
db.transaction(&)
72+
def transaction(**params, &)
73+
db.transaction(**params, &)
6274
end
6375

6476
def after_commit(&)
6577
db.after_commit(&)
6678
end
6779

80+
def find_and_update(row:, target_keys:)
81+
entry = dataset.first!(row.slice(*target_keys))
82+
yield entry
83+
entry.save_changes
84+
end
85+
6886
private
6987

7088
attr_reader :raw_model, :types_validator

spec/receiving/config_spec.rb

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -349,33 +349,33 @@
349349
end
350350
end
351351

352-
describe "#conditional_handler" do
352+
describe "#on_first_sync" do
353353
it "returns correct default value" do
354-
value, callable = config.option(:conditional_handler)
354+
value, callable = config.option(:on_first_sync)
355355
expect(value).to be_nil
356356
expect(callable).to be_a(Proc)
357357
end
358358

359359
it "processes a single value" do
360-
config.conditional_handler :test
360+
config.on_first_sync :test
361361

362-
value, callable = config.option(:conditional_handler)
362+
value, callable = config.option(:on_first_sync)
363363
expect(value).to eq(:test)
364364
expect(callable).to be_a(Proc)
365365
end
366366

367367
it "processes a single block" do
368-
config.conditional_handler { :test }
368+
config.on_first_sync { :test }
369369

370-
value, callable = config.option(:conditional_handler)
370+
value, callable = config.option(:on_first_sync)
371371
expect(value).to be_nil
372372
expect(callable.call).to eq(:test)
373373
end
374374

375375
it "processes a value and a block" do
376-
config.conditional_handler(:spam) { :test }
376+
config.on_first_sync(:spam) { :test }
377377

378-
value, callable = config.option(:conditional_handler)
378+
value, callable = config.option(:on_first_sync)
379379
expect(value).to eq(:spam)
380380
expect(callable.call).to eq(:test)
381381
end

spec/receiving/handler_spec.rb

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ def fire_destroy_event
171171
online_status: nil,
172172
project_id: "PID",
173173
rest: nil,
174+
hooks: nil,
174175
version: 123.34534,
175176
}
176177
end
@@ -362,6 +363,7 @@ def fire_destroy_event
362363
online_status: nil,
363364
project_id: "PID",
364365
rest: nil,
366+
hooks: nil,
365367
version: 123.34534,
366368
)
367369

@@ -371,6 +373,7 @@ def fire_destroy_event
371373
online_status: nil,
372374
project_id: "PID",
373375
rest: nil,
376+
hooks: nil,
374377
version: 123.34534,
375378
)
376379
end
@@ -605,6 +608,51 @@ def destroy(data:, target_keys:, version_key:)
605608
end
606609
end
607610

611+
describe "#on_first_sync" do
612+
let(:callback_flags) { { on_first_sync: [] } }
613+
let(:handler) do
614+
callback_flags_link = callback_flags
615+
handler = Class.new(described_class)
616+
617+
handler.receive("User", to_table: :players) do
618+
rest_key false
619+
mapping_overrides id: :external_id
620+
621+
on_first_sync columns: %i[online_status], online_status: true do |entry:|
622+
callback_flags_link[:on_first_sync] << entry.external_id
623+
end
624+
end
625+
626+
handler
627+
end
628+
let(:update_event) do
629+
OpenStruct.new(
630+
data: {
631+
event: "update",
632+
model: "User",
633+
attributes: {
634+
id: user_id,
635+
636+
online_status: true,
637+
},
638+
version: 123.34534,
639+
},
640+
project_id: "pid",
641+
)
642+
end
643+
644+
it "provides proper event to wrap receiving" do
645+
fire_update_event
646+
expect(DB[:players].count).to eq(1)
647+
expect(callback_flags[:on_first_sync]).to be_one
648+
649+
update_event.data[:version] = update_event.data[:version] + 1
650+
fire_update_event
651+
expect(DB[:players].count).to eq(1)
652+
expect(callback_flags[:on_first_sync]).to be_one
653+
end
654+
end
655+
608656
describe "avoid dead locks" do
609657
let(:model) do
610658
Class.new(TableSync.receiving_model) do
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# frozen_string_literal: true
2+
3+
describe TableSync::Receiving::Model::ActiveRecord do
4+
subject(:model) { described_class.new(:players) }
5+
6+
describe "#isolation_level" do
7+
it { expect(model.isolation_level(:uncommitted)).to eq(:read_uncommitted) }
8+
it { expect(model.isolation_level(:committed)).to eq(:read_committed) }
9+
it { expect(model.isolation_level(:repeatable)).to eq(:repeatable_read) }
10+
it { expect(model.isolation_level(:serializable)).to eq(:serializable) }
11+
it { expect { model.isolation_level(:invalid) }.to raise_error(KeyError) }
12+
end
13+
14+
describe "find_and_update" do
15+
let(:primary_key) { :external_id }
16+
let(:external_id) { 100_500 }
17+
let!(:player) do
18+
model.send(:raw_model).create(
19+
external_id:,
20+
21+
online_status: false,
22+
version: 123.456,
23+
)
24+
end
25+
26+
let(:row) { { external_id: } }
27+
28+
it "finds and updates an entry" do
29+
model.find_and_update(row:, target_keys: [primary_key]) do |entry|
30+
entry.online_status = true
31+
end
32+
expect(player.reload.online_status).to be_truthy
33+
end
34+
35+
it "raise an error" do
36+
expect do
37+
row = { external_id: external_id + 1 }
38+
model.find_and_update(row:, target_keys: [primary_key]) do |entry|
39+
entry.online_status = true
40+
end
41+
end.to raise_error(ActiveRecord::RecordNotFound)
42+
expect(player.reload.online_status).to be_falsy
43+
end
44+
end
45+
end
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# frozen_string_literal: true
2+
3+
describe TableSync::Receiving::Model::Sequel do
4+
subject(:model) { described_class.new(:players) }
5+
6+
describe "#isolation_level" do
7+
it { expect(model.isolation_level(:uncommitted)).to eq(:uncommitted) }
8+
it { expect(model.isolation_level(:committed)).to eq(:committed) }
9+
it { expect(model.isolation_level(:repeatable)).to eq(:repeatable) }
10+
it { expect(model.isolation_level(:serializable)).to eq(:serializable) }
11+
it { expect { model.isolation_level(:invalid) }.to raise_error(KeyError) }
12+
end
13+
14+
describe "find_and_update" do
15+
let(:primary_key) { :external_id }
16+
let(:external_id) { 100_500 }
17+
let!(:player) do
18+
model.send(:raw_model).create(
19+
external_id:,
20+
21+
online_status: false,
22+
version: 123.456,
23+
)
24+
end
25+
26+
let(:row) { { external_id: } }
27+
28+
it "finds and updates an entry" do
29+
model.find_and_update(row:, target_keys: [primary_key]) do |entry|
30+
entry.online_status = true
31+
end
32+
expect(player.reload.online_status).to be_truthy
33+
end
34+
35+
it "raise an error" do
36+
expect do
37+
row = { external_id: external_id + 1 }
38+
model.find_and_update(row:, target_keys: [primary_key]) do |entry|
39+
entry.online_status = true
40+
end
41+
end.to raise_error(Sequel::NoMatchingRow)
42+
expect(player.reload.online_status).to be_falsy
43+
end
44+
end
45+
end

0 commit comments

Comments
 (0)