Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions server/lib/reverse_etl/loaders/standard.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def process_batch_records(sync_run, sync, sync_config, activity)
transformer = Transformers::UserMapping.new
client = sync.destination.connector_client.new
batch_size = sync_config.stream.batch_size
<<<<<<< HEAD

# track sync record status
successfull_sync_records = []
Expand All @@ -84,6 +85,20 @@ def process_batch_records(sync_run, sync, sync_config, activity)
end
update_sync_records_status(sync_run, successfull_sync_records, failed_sync_records)
heartbeat(activity, sync_run)
=======

sync_run.sync_records.pending.find_in_batches(batch_size:).each_slice(THREAD_COUNT) do |batch_of_sync_records|
mutex = Mutex.new
successful_sync_records = []
failed_sync_records = []
Parallel.each(batch_of_sync_records, in_threads: THREAD_COUNT) do |sync_records|
process_single_batch(sync, sync_run, sync_config, sync_records,
mutex, successful_sync_records, failed_sync_records)
end
update_sync_records_status(sync_run, successful_sync_records, failed_sync_records)
heartbeat(activity, sync_run)
end
>>>>>>> 5e35988b1 (fix(CE): parallelism getting stuck at batch upserting (#1799))
end

def handle_response(report, sync_run)
Expand Down
208 changes: 208 additions & 0 deletions server/spec/lib/reverse_etl/loaders/standard_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,214 @@
end
end

<<<<<<< HEAD
=======
context "when individual record processing hits StandardError" do
let(:transformer) { ReverseEtl::Transformers::UserMapping.new }
let(:transform) { transformer.transform(sync_individual, sync_record_individual) }
let(:client) { instance_double(sync_individual.destination.connector_client) }
before do
allow(client).to receive(:connector_spec).and_return(connector_spec)
end

it "marks sync_record as failed" do
sync_config = sync_individual.to_protocol
sync_config.sync_run_id = sync_run_individual.id.to_s

allow(sync_individual.destination.connector_client).to receive(:new).and_return(client)
allow(client).to receive(:write).with(sync_config, [transform],
"destination_insert").and_raise(StandardError.new("write error"))
expect(subject).to receive(:heartbeat).once.with(activity, sync_run_individual)
subject.write(sync_run_individual.id, activity)
sync_record_individual.reload
expect(sync_record_individual.status).to eq("failed")
expect(sync_record_individual.logs).to eq({ "error" => "write error" })
end
end

context "when individual record processing hits ActiveRecord::RecordNotUnique" do
let(:transformer) { ReverseEtl::Transformers::UserMapping.new }
let(:transform) { transformer.transform(sync_individual, sync_record_individual) }
let(:client) { instance_double(sync_individual.destination.connector_client) }
before do
allow(client).to receive(:connector_spec).and_return(connector_spec)
end

it "marks sync_record as failed with unique violation" do
sync_config = sync_individual.to_protocol
sync_config.sync_run_id = sync_run_individual.id.to_s

allow(sync_individual.destination.connector_client).to receive(:new).and_return(client)
allow(client).to receive(:write)
.with(sync_config, [transform], "destination_insert")
.and_raise(ActiveRecord::RecordNotUnique.new("duplicate key"))
expect(subject).to receive(:heartbeat).once.with(activity, sync_run_individual)
subject.write(sync_run_individual.id, activity)
sync_record_individual.reload
expect(sync_record_individual.status).to eq("failed")
end
end

context "when individual record processing creates per-thread clients" do
let(:transformer) { ReverseEtl::Transformers::UserMapping.new }
let(:transform) { transformer.transform(sync_individual, sync_record_individual) }
let(:client) { instance_double(sync_individual.destination.connector_client) }
let(:tracker) do
Multiwoven::Integrations::Protocol::TrackingMessage.new(success: 1, failed: 0)
end
let(:multiwoven_message) { tracker.to_multiwoven_message }
before do
allow(client).to receive(:connector_spec).and_return(connector_spec)
end

it "calls connector_client.new per thread, not shared" do
sync_config = sync_individual.to_protocol
sync_config.sync_run_id = sync_run_individual.id.to_s

expect(sync_individual.destination.connector_client).to receive(:new).at_least(:once).and_return(client)
allow(client).to receive(:write).and_return(multiwoven_message)
subject.write(sync_run_individual.id, activity)
end
end

context "when individual record processing cleans up client" do
let(:transformer) { ReverseEtl::Transformers::UserMapping.new }
let(:transform) { transformer.transform(sync_individual, sync_record_individual) }
let(:client) { double("client", connector_spec:, close: nil) }

it "closes client even when error occurs" do
sync_config = sync_individual.to_protocol
sync_config.sync_run_id = sync_run_individual.id.to_s

allow(sync_individual.destination.connector_client).to receive(:new).and_return(client)
allow(client).to receive(:write).and_raise(StandardError.new("boom"))
expect(client).to receive(:close).at_least(:once)
subject.write(sync_run_individual.id, activity)
end
end

context "when batch processing hits StandardError" do
let(:transformer) { ReverseEtl::Transformers::UserMapping.new }
let(:transform) do
[transformer.transform(sync_batch, sync_record_batch1), transformer.transform(sync_batch, sync_record_batch2)]
end
let(:client) { instance_double(sync_batch.destination.connector_client) }
before do
allow(client).to receive(:connector_spec).and_return(connector_spec)
end

it "marks all batch records as failed" do
sync_config = sync_batch.to_protocol
sync_config.sync_run_id = sync_run_batch.id.to_s

allow(sync_batch.destination.connector_client).to receive(:new).and_return(client)
allow(client).to receive(:write).with(sync_config, transform).and_raise(StandardError.new("batch error"))
expect(subject).to receive(:heartbeat).once.with(activity, sync_run_batch)
subject.write(sync_run_batch.id, activity)
sync_run_batch.sync_records.reload.each do |sync_record|
expect(sync_record.status).to eq("failed")
end
end
end

context "when batch processing hits ActiveRecord::RecordNotUnique" do
let(:transformer) { ReverseEtl::Transformers::UserMapping.new }
let(:transform) do
[transformer.transform(sync_batch, sync_record_batch1), transformer.transform(sync_batch, sync_record_batch2)]
end
let(:client) { instance_double(sync_batch.destination.connector_client) }
before do
allow(client).to receive(:connector_spec).and_return(connector_spec)
end

it "marks all batch records as failed" do
sync_config = sync_batch.to_protocol
sync_config.sync_run_id = sync_run_batch.id.to_s

allow(sync_batch.destination.connector_client).to receive(:new).and_return(client)
allow(client).to receive(:write)
.with(sync_config, transform).and_raise(ActiveRecord::RecordNotUnique.new("duplicate"))
expect(subject).to receive(:heartbeat).once.with(activity, sync_run_batch)
subject.write(sync_run_batch.id, activity)
sync_run_batch.sync_records.reload.each do |sync_record|
expect(sync_record.status).to eq("failed")
end
end
end

context "when batch processing cleans up client" do
let(:transformer) { ReverseEtl::Transformers::UserMapping.new }
let(:transform) do
[transformer.transform(sync_batch, sync_record_batch1), transformer.transform(sync_batch, sync_record_batch2)]
end
let(:client) { double("client", connector_spec:, close: nil) }

it "closes client even when error occurs" do
sync_config = sync_batch.to_protocol
sync_config.sync_run_id = sync_run_batch.id.to_s

allow(sync_batch.destination.connector_client).to receive(:new).and_return(client)
allow(client).to receive(:write).and_raise(StandardError.new("boom"))
expect(client).to receive(:close).once
subject.write(sync_run_batch.id, activity)
end
end

context "when batch support is enabled and records span multiple batch groups" do
tracker = Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: 10,
failed: 0
)
let(:multiwoven_message) { tracker.to_multiwoven_message }
let(:client) { instance_double(sync_batch.destination.connector_client) }

before do
allow(client).to receive(:connector_spec).and_return(connector_spec)
# 2 records already exist (sync_record_batch1, sync_record_batch2).
# To get 2 each_slice groups we need > THREAD_COUNT batches (batch_size=10).
# Formula: THREAD_COUNT * 10 - 2 + 1 extra records → THREAD_COUNT+1 batches → 2 groups.
# Works regardless of SYNC_LOADER_THREAD_POOL_SIZE env value.
thread_count = described_class::THREAD_COUNT
extra_records = (thread_count * 10) - 2 + 1
extra_records.times do |i|
create(:sync_record, sync: sync_batch, sync_run: sync_run_batch, primary_key: "extra_#{i}")
end
end

it "calls heartbeat once per batch group, not once total" do
sync_config = sync_batch.to_protocol
sync_config.sync_run_id = sync_run_batch.id.to_s
allow(sync_batch.destination.connector_client).to receive(:new).and_return(client)
allow(client).to receive(:write).and_return(multiwoven_message)

expect(subject).to receive(:heartbeat).twice.with(activity, sync_run_batch)
subject.write(sync_run_batch.id, activity)
end

it "calls update_sync_records_status once per batch group" do
sync_config = sync_batch.to_protocol
sync_config.sync_run_id = sync_run_batch.id.to_s
allow(sync_batch.destination.connector_client).to receive(:new).and_return(client)
allow(client).to receive(:write).and_return(multiwoven_message)

expect(subject).to receive(:update_sync_records_status).twice.and_call_original
subject.write(sync_run_batch.id, activity)
end

it "marks all records across all groups as success" do
sync_config = sync_batch.to_protocol
sync_config.sync_run_id = sync_run_batch.id.to_s
allow(sync_batch.destination.connector_client).to receive(:new).and_return(client)
allow(client).to receive(:write).and_return(multiwoven_message)

subject.write(sync_run_batch.id, activity)
sync_run_batch.sync_records.reload.each do |sync_record|
expect(sync_record.status).to eq("success")
end
end
end

>>>>>>> 5e35988b1 (fix(CE): parallelism getting stuck at batch upserting (#1799))
context "when the report has tracking logs with a message" do
let(:log_message) { '{"request":"Sample log message"}' }
let(:report) do
Expand Down
Loading