diff --git a/integrations/Gemfile.lock b/integrations/Gemfile.lock index 643ffc830..4d54f46e1 100644 --- a/integrations/Gemfile.lock +++ b/integrations/Gemfile.lock @@ -7,7 +7,7 @@ GIT PATH remote: . specs: - multiwoven-integrations (0.34.19) + multiwoven-integrations (0.34.20) MailchimpMarketing activesupport async-websocket diff --git a/integrations/lib/multiwoven/integrations/destination/postgresql/client.rb b/integrations/lib/multiwoven/integrations/destination/postgresql/client.rb index 659ac5404..cfed42c05 100644 --- a/integrations/lib/multiwoven/integrations/destination/postgresql/client.rb +++ b/integrations/lib/multiwoven/integrations/destination/postgresql/client.rb @@ -6,6 +6,8 @@ module Multiwoven::Integrations::Destination module Postgresql include Multiwoven::Integrations::Core class Client < DestinationConnector + MAX_CHUNK_SIZE = 10_000 + def check_connection(connection_config) connection_config = connection_config.with_indifferent_access create_connection(connection_config) @@ -50,30 +52,34 @@ def write(sync_config, records, action = "destination_insert") raw_table = sync_config.stream.name table_name = qualify_table(connection_config[:schema], raw_table) primary_key = sync_config.model.primary_key - log_message_array = [] db = create_connection(connection_config) write_success = 0 write_failure = 0 + log_message_array = [] - records.each do |record| - query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key) - logger.debug("POSTGRESQL:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}") - begin - response = db.exec(query) + records.each_slice(MAX_CHUNK_SIZE) do |chunk| + bulk_write(db, table_name, chunk, primary_key, action) + write_success += chunk.size + log_message_array << log_request_response("info", "bulk_#{action}", "#{chunk.size} rows") + rescue StandardError => e + logger.warn("POSTGRESQL:BULK_WRITE:FALLBACK chunk_size=#{chunk.size} error=#{e.message}") + chunk.each do |record| + response = bulk_write(db, table_name, [record], primary_key, action) write_success += 1 - log_message_array << log_request_response("info", query, response) - rescue StandardError => e - handle_exception(e, { + log_message_array << log_request_response("info", "fallback_#{action}", response) + rescue StandardError => individual_error + handle_exception(individual_error, { context: "POSTGRESQL:RECORD:WRITE:EXCEPTION", type: "error", sync_id: sync_config.sync_id, sync_run_id: sync_config.sync_run_id }) write_failure += 1 - log_message_array << log_request_response("error", query, e.message) + log_message_array << log_request_response("error", "fallback_#{action}", individual_error.message) end end + tracking_message(write_success, write_failure, log_message_array) rescue StandardError => e handle_exception(e, { @@ -82,10 +88,48 @@ def write(sync_config, records, action = "destination_insert") sync_id: sync_config.sync_id, sync_run_id: sync_config.sync_run_id }) + ensure + db&.close end private + def bulk_write(db, table_name, records, primary_key, action) + return if records.empty? + + columns = records.flat_map(&:keys).uniq + col_list = columns.map { |c| quote_ident(c) }.join(", ") + + values_clauses = records.map do |record| + vals = columns.map { |col| escape_value(db, record[col]) } + "(#{vals.join(", ")})" + end + + sql = "INSERT INTO #{table_name} (#{col_list}) VALUES #{values_clauses.join(", ")}" + sql += build_upsert_clause(columns, primary_key) if action.to_s == "destination_update" + db.exec(sql) + end + + def build_upsert_clause(columns, primary_key) + return "" unless primary_key.present? + + update_cols = columns.reject { |c| c.to_s == primary_key.to_s } + return " ON CONFLICT (#{quote_ident(primary_key)}) DO NOTHING" if update_cols.empty? + + set_clause = update_cols.map { |c| "#{quote_ident(c)} = EXCLUDED.#{quote_ident(c)}" }.join(", ") + " ON CONFLICT (#{quote_ident(primary_key)}) DO UPDATE SET #{set_clause}" + end + + def escape_value(db, value) + return "NULL" if value.nil? + + "'#{db.escape_string(value.to_s)}'" + end + + def quote_ident(name) + PG::Connection.quote_ident(name.to_s) + end + def query(connection, query) connection.exec(query) do |result| result.map do |row| @@ -108,7 +152,13 @@ def create_connection(connection_config) def create_streams(records) group_by_table(records).map do |r| - Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns])) + Multiwoven::Integrations::Protocol::Stream.new( + name: r[:tablename], + action: StreamAction["fetch"], + json_schema: convert_to_json_schema(r[:columns]), + batch_support: true, + batch_size: MAX_CHUNK_SIZE + ) end end diff --git a/integrations/lib/multiwoven/integrations/rollout.rb b/integrations/lib/multiwoven/integrations/rollout.rb index 749feaf0d..6ded9e0b8 100644 --- a/integrations/lib/multiwoven/integrations/rollout.rb +++ b/integrations/lib/multiwoven/integrations/rollout.rb @@ -2,7 +2,7 @@ module Multiwoven module Integrations - VERSION = "0.34.19" + VERSION = "0.34.20" ENABLED_SOURCES = %w[ Snowflake diff --git a/integrations/spec/multiwoven/integrations/destination/postgresql/client_spec.rb b/integrations/spec/multiwoven/integrations/destination/postgresql/client_spec.rb index 07fcca153..62c0c28d2 100644 --- a/integrations/spec/multiwoven/integrations/destination/postgresql/client_spec.rb +++ b/integrations/spec/multiwoven/integrations/destination/postgresql/client_spec.rb @@ -109,6 +109,8 @@ s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json) s_config.sync_run_id = "33" allow(PG).to receive(:connect).and_return(pg_connection) + allow(pg_connection).to receive(:close) + allow(pg_connection).to receive(:escape_string) { |str| str } allow(pg_connection).to receive(:exec).and_return(true) @@ -126,6 +128,8 @@ s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json) s_config.sync_run_id = "33" allow(PG).to receive(:connect).and_return(pg_connection) + allow(pg_connection).to receive(:close) + allow(pg_connection).to receive(:escape_string) { |str| str } allow(pg_connection).to receive(:exec).and_return(true) @@ -147,6 +151,8 @@ s_config.sync_run_id = "34" allow(PG).to receive(:connect).and_return(pg_connection) + allow(pg_connection).to receive(:close) + allow(pg_connection).to receive(:escape_string) { |str| str } allow(pg_connection).to receive(:exec).and_raise(StandardError.new("test error")) @@ -162,6 +168,156 @@ end end + # bulk write specs + + describe "#write (batch)" do + before do + allow(PG).to receive(:connect).and_return(pg_connection) + allow(pg_connection).to receive(:close) + allow(pg_connection).to receive(:escape_string) { |str| str } + end + + let(:s_config) do + config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json) + config.sync_run_id = "50" + config + end + + let(:batch_records) do + records.map { |r| r.data.transform_keys(&:to_s) } + end + + context "bulk insert" do + it "inserts multiple records in a single statement" do + expect(pg_connection).to receive(:exec).with( + a_string_matching(/INSERT INTO.*VALUES.*,.*/) + ).once.and_return(true) + + tracking = subject.write(s_config, batch_records).tracking + expect(tracking.success).to eql(2) + expect(tracking.failed).to eql(0) + end + end + + context "bulk upsert" do + it "generates ON CONFLICT clause for destination_update" do + expect(pg_connection).to receive(:exec).with( + a_string_matching(/ON CONFLICT.*DO UPDATE SET/) + ).once.and_return(true) + + tracking = subject.write(s_config, batch_records, "destination_update").tracking + expect(tracking.success).to eql(2) + expect(tracking.failed).to eql(0) + end + + it "generates ON CONFLICT DO NOTHING when update_cols is empty (only primary key)" do + records_only_pk = [ + { "id" => "1" }, + { "id" => "2" } + ] + expect(pg_connection).to receive(:exec).with( + a_string_matching(/ON CONFLICT.*DO NOTHING/) + ).once.and_return(true) + + tracking = subject.write(s_config, records_only_pk, "destination_update").tracking + expect(tracking.success).to eql(2) + expect(tracking.failed).to eql(0) + end + end + + context "fallback on bulk failure" do + it "falls back to individual writes and tracks success" do + call_count = 0 + allow(pg_connection).to receive(:exec) do + call_count += 1 + raise StandardError, "bulk failed" if call_count == 1 + + true + end + + tracking = subject.write(s_config, batch_records).tracking + expect(tracking.success).to eql(2) + expect(tracking.failed).to eql(0) + end + + it "tracks partial failures in individual fallback" do + call_count = 0 + allow(pg_connection).to receive(:exec) do + call_count += 1 + # bulk fails, first individual succeeds, second individual fails + raise StandardError, "bulk failed" if call_count == 1 + raise StandardError, "row failed" if call_count == 3 + + true + end + + tracking = subject.write(s_config, batch_records).tracking + expect(tracking.success).to eql(1) + expect(tracking.failed).to eql(1) + end + + it "logs info for each successful individual write in fallback" do + call_count = 0 + allow(pg_connection).to receive(:exec) do + call_count += 1 + raise StandardError, "bulk failed" if call_count == 1 + + true + end + + tracking = subject.write(s_config, batch_records).tracking + expect(tracking.success).to eql(2) + info_logs = tracking.logs.select { |l| l.level == "info" } + expect(info_logs.size).to eql(2) + info_logs.each do |log| + expect(log.message).to include("request") + expect(log.message).to include("response") + end + end + end + + context "records with inconsistent keys" do + let(:mixed_records) do + [ + { "email" => "user1@example.com", "user_id" => "1" }, + { "email" => "user2@example.com", "user_id" => "2", "location" => "NYC" } + ] + end + + it "includes all columns from the union of record keys" do + expect(pg_connection).to receive(:exec).with( + a_string_matching(/"email",\s*"user_id",\s*"location"/) + ).once.and_return(true) + + tracking = subject.write(s_config, mixed_records).tracking + expect(tracking.success).to eql(2) + expect(tracking.failed).to eql(0) + end + + it "uses NULL for missing keys" do + expect(pg_connection).to receive(:exec).with( + a_string_matching(/NULL/) + ).once.and_return(true) + + subject.write(s_config, mixed_records) + end + end + + context "connection cleanup" do + it "closes connection on success" do + allow(pg_connection).to receive(:exec).and_return(true) + expect(pg_connection).to receive(:close) + subject.write(s_config, batch_records) + end + + it "closes connection on failure" do + allow(pg_connection).to receive(:exec).and_raise(StandardError.new("err")) + expect(pg_connection).to receive(:close) + subject.write(s_config, batch_records) + end + end + end + describe "#discover" do it "discovers schema successfully" do allow(PG).to receive(:connect).and_return(pg_connection)