Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT
PATH
remote: .
specs:
multiwoven-integrations (0.34.19)
multiwoven-integrations (0.34.20)
MailchimpMarketing
activesupport
async-websocket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ module Multiwoven::Integrations::Destination
module Postgresql
include Multiwoven::Integrations::Core
class Client < DestinationConnector
MAX_CHUNK_SIZE = 10_000

def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
create_connection(connection_config)
Expand Down Expand Up @@ -50,30 +52,34 @@ def write(sync_config, records, action = "destination_insert")
raw_table = sync_config.stream.name
table_name = qualify_table(connection_config[:schema], raw_table)
primary_key = sync_config.model.primary_key
log_message_array = []
db = create_connection(connection_config)

write_success = 0
write_failure = 0
log_message_array = []

records.each do |record|
query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key)
logger.debug("POSTGRESQL:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}")
begin
response = db.exec(query)
records.each_slice(MAX_CHUNK_SIZE) do |chunk|
bulk_write(db, table_name, chunk, primary_key, action)
write_success += chunk.size
log_message_array << log_request_response("info", "bulk_#{action}", "#{chunk.size} rows")
rescue StandardError => e
logger.warn("POSTGRESQL:BULK_WRITE:FALLBACK chunk_size=#{chunk.size} error=#{e.message}")
chunk.each do |record|
response = bulk_write(db, table_name, [record], primary_key, action)
write_success += 1
log_message_array << log_request_response("info", query, response)
rescue StandardError => e
handle_exception(e, {
log_message_array << log_request_response("info", "fallback_#{action}", response)
rescue StandardError => individual_error
handle_exception(individual_error, {
context: "POSTGRESQL:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
write_failure += 1
log_message_array << log_request_response("error", query, e.message)
log_message_array << log_request_response("error", "fallback_#{action}", individual_error.message)
end
end

tracking_message(write_success, write_failure, log_message_array)
rescue StandardError => e
handle_exception(e, {
Expand All @@ -82,10 +88,48 @@ def write(sync_config, records, action = "destination_insert")
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
ensure
db&.close
end

private

def bulk_write(db, table_name, records, primary_key, action)
return if records.empty?

columns = records.flat_map(&:keys).uniq
col_list = columns.map { |c| quote_ident(c) }.join(", ")

values_clauses = records.map do |record|
vals = columns.map { |col| escape_value(db, record[col]) }
"(#{vals.join(", ")})"
end

sql = "INSERT INTO #{table_name} (#{col_list}) VALUES #{values_clauses.join(", ")}"
sql += build_upsert_clause(columns, primary_key) if action.to_s == "destination_update"
db.exec(sql)
end

def build_upsert_clause(columns, primary_key)
return "" unless primary_key.present?

update_cols = columns.reject { |c| c.to_s == primary_key.to_s }
return " ON CONFLICT (#{quote_ident(primary_key)}) DO NOTHING" if update_cols.empty?

set_clause = update_cols.map { |c| "#{quote_ident(c)} = EXCLUDED.#{quote_ident(c)}" }.join(", ")
" ON CONFLICT (#{quote_ident(primary_key)}) DO UPDATE SET #{set_clause}"
end

def escape_value(db, value)
return "NULL" if value.nil?

"'#{db.escape_string(value.to_s)}'"
end

def quote_ident(name)
PG::Connection.quote_ident(name.to_s)
end

def query(connection, query)
connection.exec(query) do |result|
result.map do |row|
Expand All @@ -108,7 +152,13 @@ def create_connection(connection_config)

def create_streams(records)
group_by_table(records).map do |r|
Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]))
Multiwoven::Integrations::Protocol::Stream.new(
name: r[:tablename],
action: StreamAction["fetch"],
json_schema: convert_to_json_schema(r[:columns]),
batch_support: true,
batch_size: MAX_CHUNK_SIZE
)
end
end

Expand Down
2 changes: 1 addition & 1 deletion integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Multiwoven
module Integrations
VERSION = "0.34.19"
VERSION = "0.34.20"

ENABLED_SOURCES = %w[
Snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@
s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json)
s_config.sync_run_id = "33"
allow(PG).to receive(:connect).and_return(pg_connection)
allow(pg_connection).to receive(:close)
allow(pg_connection).to receive(:escape_string) { |str| str }

allow(pg_connection).to receive(:exec).and_return(true)

Expand All @@ -126,6 +128,8 @@
s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json)
s_config.sync_run_id = "33"
allow(PG).to receive(:connect).and_return(pg_connection)
allow(pg_connection).to receive(:close)
allow(pg_connection).to receive(:escape_string) { |str| str }

allow(pg_connection).to receive(:exec).and_return(true)

Expand All @@ -147,6 +151,8 @@
s_config.sync_run_id = "34"

allow(PG).to receive(:connect).and_return(pg_connection)
allow(pg_connection).to receive(:close)
allow(pg_connection).to receive(:escape_string) { |str| str }

allow(pg_connection).to receive(:exec).and_raise(StandardError.new("test error"))

Expand All @@ -162,6 +168,156 @@
end
end

# bulk write specs

describe "#write (batch)" do
before do
allow(PG).to receive(:connect).and_return(pg_connection)
allow(pg_connection).to receive(:close)
allow(pg_connection).to receive(:escape_string) { |str| str }
end

let(:s_config) do
config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json)
config.sync_run_id = "50"
config
end

let(:batch_records) do
records.map { |r| r.data.transform_keys(&:to_s) }
end

context "bulk insert" do
it "inserts multiple records in a single statement" do
expect(pg_connection).to receive(:exec).with(
a_string_matching(/INSERT INTO.*VALUES.*,.*/)
).once.and_return(true)

tracking = subject.write(s_config, batch_records).tracking
expect(tracking.success).to eql(2)
expect(tracking.failed).to eql(0)
end
end

context "bulk upsert" do
it "generates ON CONFLICT clause for destination_update" do
expect(pg_connection).to receive(:exec).with(
a_string_matching(/ON CONFLICT.*DO UPDATE SET/)
).once.and_return(true)

tracking = subject.write(s_config, batch_records, "destination_update").tracking
expect(tracking.success).to eql(2)
expect(tracking.failed).to eql(0)
end

it "generates ON CONFLICT DO NOTHING when update_cols is empty (only primary key)" do
records_only_pk = [
{ "id" => "1" },
{ "id" => "2" }
]
expect(pg_connection).to receive(:exec).with(
a_string_matching(/ON CONFLICT.*DO NOTHING/)
).once.and_return(true)

tracking = subject.write(s_config, records_only_pk, "destination_update").tracking
expect(tracking.success).to eql(2)
expect(tracking.failed).to eql(0)
end
end

context "fallback on bulk failure" do
it "falls back to individual writes and tracks success" do
call_count = 0
allow(pg_connection).to receive(:exec) do
call_count += 1
raise StandardError, "bulk failed" if call_count == 1

true
end

tracking = subject.write(s_config, batch_records).tracking
expect(tracking.success).to eql(2)
expect(tracking.failed).to eql(0)
end

it "tracks partial failures in individual fallback" do
call_count = 0
allow(pg_connection).to receive(:exec) do
call_count += 1
# bulk fails, first individual succeeds, second individual fails
raise StandardError, "bulk failed" if call_count == 1
raise StandardError, "row failed" if call_count == 3

true
end

tracking = subject.write(s_config, batch_records).tracking
expect(tracking.success).to eql(1)
expect(tracking.failed).to eql(1)
end

it "logs info for each successful individual write in fallback" do
call_count = 0
allow(pg_connection).to receive(:exec) do
call_count += 1
raise StandardError, "bulk failed" if call_count == 1

true
end

tracking = subject.write(s_config, batch_records).tracking
expect(tracking.success).to eql(2)
info_logs = tracking.logs.select { |l| l.level == "info" }
expect(info_logs.size).to eql(2)
info_logs.each do |log|
expect(log.message).to include("request")
expect(log.message).to include("response")
end
end
end

context "records with inconsistent keys" do
let(:mixed_records) do
[
{ "email" => "user1@example.com", "user_id" => "1" },
{ "email" => "user2@example.com", "user_id" => "2", "location" => "NYC" }
]
end

it "includes all columns from the union of record keys" do
expect(pg_connection).to receive(:exec).with(
a_string_matching(/"email",\s*"user_id",\s*"location"/)
).once.and_return(true)

tracking = subject.write(s_config, mixed_records).tracking
expect(tracking.success).to eql(2)
expect(tracking.failed).to eql(0)
end

it "uses NULL for missing keys" do
expect(pg_connection).to receive(:exec).with(
a_string_matching(/NULL/)
).once.and_return(true)

subject.write(s_config, mixed_records)
end
end

context "connection cleanup" do
it "closes connection on success" do
allow(pg_connection).to receive(:exec).and_return(true)
expect(pg_connection).to receive(:close)
subject.write(s_config, batch_records)
end

it "closes connection on failure" do
allow(pg_connection).to receive(:exec).and_raise(StandardError.new("err"))
expect(pg_connection).to receive(:close)
subject.write(s_config, batch_records)
end
end
end

describe "#discover" do
it "discovers schema successfully" do
allow(PG).to receive(:connect).and_return(pg_connection)
Expand Down
Loading