Skip to content

Commit 334c0ca

Browse files
chore(CE): Batch support for postgres destination (#959)
* Resolve conflict in cherry-pick of 96ce8afbc06a85f13e790f73bb893e8baad16add and change the commit message * chore(CE): Resolved conflict --------- Co-authored-by: TivonB-AI2 <124182151+TivonB-AI2@users.noreply.github.com> Co-authored-by: TivonB-AI2 <tivon.brown@squared.ai>
1 parent eef0c10 commit 334c0ca

File tree

4 files changed

+219
-13
lines changed

4 files changed

+219
-13
lines changed

integrations/Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ GIT
77
PATH
88
remote: .
99
specs:
10-
multiwoven-integrations (0.34.19)
10+
multiwoven-integrations (0.34.20)
1111
MailchimpMarketing
1212
activesupport
1313
async-websocket

integrations/lib/multiwoven/integrations/destination/postgresql/client.rb

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ module Multiwoven::Integrations::Destination
66
module Postgresql
77
include Multiwoven::Integrations::Core
88
class Client < DestinationConnector
9+
MAX_CHUNK_SIZE = 10_000
10+
911
def check_connection(connection_config)
1012
connection_config = connection_config.with_indifferent_access
1113
create_connection(connection_config)
@@ -50,30 +52,34 @@ def write(sync_config, records, action = "destination_insert")
5052
raw_table = sync_config.stream.name
5153
table_name = qualify_table(connection_config[:schema], raw_table)
5254
primary_key = sync_config.model.primary_key
53-
log_message_array = []
5455
db = create_connection(connection_config)
5556

5657
write_success = 0
5758
write_failure = 0
59+
log_message_array = []
5860

59-
records.each do |record|
60-
query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key)
61-
logger.debug("POSTGRESQL:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}")
62-
begin
63-
response = db.exec(query)
61+
records.each_slice(MAX_CHUNK_SIZE) do |chunk|
62+
bulk_write(db, table_name, chunk, primary_key, action)
63+
write_success += chunk.size
64+
log_message_array << log_request_response("info", "bulk_#{action}", "#{chunk.size} rows")
65+
rescue StandardError => e
66+
logger.warn("POSTGRESQL:BULK_WRITE:FALLBACK chunk_size=#{chunk.size} error=#{e.message}")
67+
chunk.each do |record|
68+
response = bulk_write(db, table_name, [record], primary_key, action)
6469
write_success += 1
65-
log_message_array << log_request_response("info", query, response)
66-
rescue StandardError => e
67-
handle_exception(e, {
70+
log_message_array << log_request_response("info", "fallback_#{action}", response)
71+
rescue StandardError => individual_error
72+
handle_exception(individual_error, {
6873
context: "POSTGRESQL:RECORD:WRITE:EXCEPTION",
6974
type: "error",
7075
sync_id: sync_config.sync_id,
7176
sync_run_id: sync_config.sync_run_id
7277
})
7378
write_failure += 1
74-
log_message_array << log_request_response("error", query, e.message)
79+
log_message_array << log_request_response("error", "fallback_#{action}", individual_error.message)
7580
end
7681
end
82+
7783
tracking_message(write_success, write_failure, log_message_array)
7884
rescue StandardError => e
7985
handle_exception(e, {
@@ -82,10 +88,48 @@ def write(sync_config, records, action = "destination_insert")
8288
sync_id: sync_config.sync_id,
8389
sync_run_id: sync_config.sync_run_id
8490
})
91+
ensure
92+
db&.close
8593
end
8694

8795
private
8896

97+
def bulk_write(db, table_name, records, primary_key, action)
98+
return if records.empty?
99+
100+
columns = records.flat_map(&:keys).uniq
101+
col_list = columns.map { |c| quote_ident(c) }.join(", ")
102+
103+
values_clauses = records.map do |record|
104+
vals = columns.map { |col| escape_value(db, record[col]) }
105+
"(#{vals.join(", ")})"
106+
end
107+
108+
sql = "INSERT INTO #{table_name} (#{col_list}) VALUES #{values_clauses.join(", ")}"
109+
sql += build_upsert_clause(columns, primary_key) if action.to_s == "destination_update"
110+
db.exec(sql)
111+
end
112+
113+
def build_upsert_clause(columns, primary_key)
114+
return "" unless primary_key.present?
115+
116+
update_cols = columns.reject { |c| c.to_s == primary_key.to_s }
117+
return " ON CONFLICT (#{quote_ident(primary_key)}) DO NOTHING" if update_cols.empty?
118+
119+
set_clause = update_cols.map { |c| "#{quote_ident(c)} = EXCLUDED.#{quote_ident(c)}" }.join(", ")
120+
" ON CONFLICT (#{quote_ident(primary_key)}) DO UPDATE SET #{set_clause}"
121+
end
122+
123+
def escape_value(db, value)
124+
return "NULL" if value.nil?
125+
126+
"'#{db.escape_string(value.to_s)}'"
127+
end
128+
129+
def quote_ident(name)
130+
PG::Connection.quote_ident(name.to_s)
131+
end
132+
89133
def query(connection, query)
90134
connection.exec(query) do |result|
91135
result.map do |row|
@@ -108,7 +152,13 @@ def create_connection(connection_config)
108152

109153
def create_streams(records)
110154
group_by_table(records).map do |r|
111-
Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]))
155+
Multiwoven::Integrations::Protocol::Stream.new(
156+
name: r[:tablename],
157+
action: StreamAction["fetch"],
158+
json_schema: convert_to_json_schema(r[:columns]),
159+
batch_support: true,
160+
batch_size: MAX_CHUNK_SIZE
161+
)
112162
end
113163
end
114164

integrations/lib/multiwoven/integrations/rollout.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
module Multiwoven
44
module Integrations
5-
VERSION = "0.34.19"
5+
VERSION = "0.34.20"
66

77
ENABLED_SOURCES = %w[
88
Snowflake

integrations/spec/multiwoven/integrations/destination/postgresql/client_spec.rb

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@
109109
s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json)
110110
s_config.sync_run_id = "33"
111111
allow(PG).to receive(:connect).and_return(pg_connection)
112+
allow(pg_connection).to receive(:close)
113+
allow(pg_connection).to receive(:escape_string) { |str| str }
112114

113115
allow(pg_connection).to receive(:exec).and_return(true)
114116

@@ -126,6 +128,8 @@
126128
s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json)
127129
s_config.sync_run_id = "33"
128130
allow(PG).to receive(:connect).and_return(pg_connection)
131+
allow(pg_connection).to receive(:close)
132+
allow(pg_connection).to receive(:escape_string) { |str| str }
129133

130134
allow(pg_connection).to receive(:exec).and_return(true)
131135

@@ -147,6 +151,8 @@
147151
s_config.sync_run_id = "34"
148152

149153
allow(PG).to receive(:connect).and_return(pg_connection)
154+
allow(pg_connection).to receive(:close)
155+
allow(pg_connection).to receive(:escape_string) { |str| str }
150156

151157
allow(pg_connection).to receive(:exec).and_raise(StandardError.new("test error"))
152158

@@ -162,6 +168,156 @@
162168
end
163169
end
164170

171+
# bulk write specs
172+
173+
describe "#write (batch)" do
174+
before do
175+
allow(PG).to receive(:connect).and_return(pg_connection)
176+
allow(pg_connection).to receive(:close)
177+
allow(pg_connection).to receive(:escape_string) { |str| str }
178+
end
179+
180+
let(:s_config) do
181+
config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json)
182+
config.sync_run_id = "50"
183+
config
184+
end
185+
186+
let(:batch_records) do
187+
records.map { |r| r.data.transform_keys(&:to_s) }
188+
end
189+
190+
context "bulk insert" do
191+
it "inserts multiple records in a single statement" do
192+
expect(pg_connection).to receive(:exec).with(
193+
a_string_matching(/INSERT INTO.*VALUES.*,.*/)
194+
).once.and_return(true)
195+
196+
tracking = subject.write(s_config, batch_records).tracking
197+
expect(tracking.success).to eql(2)
198+
expect(tracking.failed).to eql(0)
199+
end
200+
end
201+
202+
context "bulk upsert" do
203+
it "generates ON CONFLICT clause for destination_update" do
204+
expect(pg_connection).to receive(:exec).with(
205+
a_string_matching(/ON CONFLICT.*DO UPDATE SET/)
206+
).once.and_return(true)
207+
208+
tracking = subject.write(s_config, batch_records, "destination_update").tracking
209+
expect(tracking.success).to eql(2)
210+
expect(tracking.failed).to eql(0)
211+
end
212+
213+
it "generates ON CONFLICT DO NOTHING when update_cols is empty (only primary key)" do
214+
records_only_pk = [
215+
{ "id" => "1" },
216+
{ "id" => "2" }
217+
]
218+
expect(pg_connection).to receive(:exec).with(
219+
a_string_matching(/ON CONFLICT.*DO NOTHING/)
220+
).once.and_return(true)
221+
222+
tracking = subject.write(s_config, records_only_pk, "destination_update").tracking
223+
expect(tracking.success).to eql(2)
224+
expect(tracking.failed).to eql(0)
225+
end
226+
end
227+
228+
context "fallback on bulk failure" do
229+
it "falls back to individual writes and tracks success" do
230+
call_count = 0
231+
allow(pg_connection).to receive(:exec) do
232+
call_count += 1
233+
raise StandardError, "bulk failed" if call_count == 1
234+
235+
true
236+
end
237+
238+
tracking = subject.write(s_config, batch_records).tracking
239+
expect(tracking.success).to eql(2)
240+
expect(tracking.failed).to eql(0)
241+
end
242+
243+
it "tracks partial failures in individual fallback" do
244+
call_count = 0
245+
allow(pg_connection).to receive(:exec) do
246+
call_count += 1
247+
# bulk fails, first individual succeeds, second individual fails
248+
raise StandardError, "bulk failed" if call_count == 1
249+
raise StandardError, "row failed" if call_count == 3
250+
251+
true
252+
end
253+
254+
tracking = subject.write(s_config, batch_records).tracking
255+
expect(tracking.success).to eql(1)
256+
expect(tracking.failed).to eql(1)
257+
end
258+
259+
it "logs info for each successful individual write in fallback" do
260+
call_count = 0
261+
allow(pg_connection).to receive(:exec) do
262+
call_count += 1
263+
raise StandardError, "bulk failed" if call_count == 1
264+
265+
true
266+
end
267+
268+
tracking = subject.write(s_config, batch_records).tracking
269+
expect(tracking.success).to eql(2)
270+
info_logs = tracking.logs.select { |l| l.level == "info" }
271+
expect(info_logs.size).to eql(2)
272+
info_logs.each do |log|
273+
expect(log.message).to include("request")
274+
expect(log.message).to include("response")
275+
end
276+
end
277+
end
278+
279+
context "records with inconsistent keys" do
280+
let(:mixed_records) do
281+
[
282+
{ "email" => "user1@example.com", "user_id" => "1" },
283+
{ "email" => "user2@example.com", "user_id" => "2", "location" => "NYC" }
284+
]
285+
end
286+
287+
it "includes all columns from the union of record keys" do
288+
expect(pg_connection).to receive(:exec).with(
289+
a_string_matching(/"email",\s*"user_id",\s*"location"/)
290+
).once.and_return(true)
291+
292+
tracking = subject.write(s_config, mixed_records).tracking
293+
expect(tracking.success).to eql(2)
294+
expect(tracking.failed).to eql(0)
295+
end
296+
297+
it "uses NULL for missing keys" do
298+
expect(pg_connection).to receive(:exec).with(
299+
a_string_matching(/NULL/)
300+
).once.and_return(true)
301+
302+
subject.write(s_config, mixed_records)
303+
end
304+
end
305+
306+
context "connection cleanup" do
307+
it "closes connection on success" do
308+
allow(pg_connection).to receive(:exec).and_return(true)
309+
expect(pg_connection).to receive(:close)
310+
subject.write(s_config, batch_records)
311+
end
312+
313+
it "closes connection on failure" do
314+
allow(pg_connection).to receive(:exec).and_raise(StandardError.new("err"))
315+
expect(pg_connection).to receive(:close)
316+
subject.write(s_config, batch_records)
317+
end
318+
end
319+
end
320+
165321
describe "#discover" do
166322
it "discovers schema successfully" do
167323
allow(PG).to receive(:connect).and_return(pg_connection)

0 commit comments

Comments
 (0)