Skip to content

Commit 9ee7c7e

Browse files
authored
feat: speed up poller by not re-encoding user data (#1605)
record, old_record and columns are relayed without decoding and encoding their jsonb representation
1 parent 60adbd8 commit 9ee7c7e

File tree

4 files changed

+272
-251
lines changed

4 files changed

+272
-251
lines changed

lib/extensions/postgres_cdc_rls/replication_poller.ex

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
186186
defp handle_list_changes_result(
187187
{:ok,
188188
%Postgrex.Result{
189-
columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"] = columns,
189+
columns: columns,
190190
rows: [_ | _] = rows,
191191
num_rows: rows_count
192192
}},
@@ -260,75 +260,75 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
260260
end
261261

262262
def generate_record([
263-
{"wal",
264-
%{
265-
"type" => "INSERT" = type,
266-
"schema" => schema,
267-
"table" => table
268-
} = wal},
269-
{"is_rls_enabled", _},
263+
{"type", "INSERT" = type},
264+
{"schema", schema},
265+
{"table", table},
266+
{"columns", columns},
267+
{"record", record},
268+
{"old_record", _},
269+
{"commit_timestamp", commit_timestamp},
270270
{"subscription_ids", subscription_ids},
271271
{"errors", errors}
272272
])
273273
when is_list(subscription_ids) do
274274
%NewRecord{
275-
columns: Map.get(wal, "columns", []),
276-
commit_timestamp: Map.get(wal, "commit_timestamp"),
275+
columns: Jason.Fragment.new(columns),
276+
commit_timestamp: commit_timestamp,
277277
errors: convert_errors(errors),
278278
schema: schema,
279279
table: table,
280280
type: type,
281281
subscription_ids: MapSet.new(subscription_ids),
282-
record: Map.get(wal, "record", %{})
282+
record: Jason.Fragment.new(record)
283283
}
284284
end
285285

286286
def generate_record([
287-
{"wal",
288-
%{
289-
"type" => "UPDATE" = type,
290-
"schema" => schema,
291-
"table" => table
292-
} = wal},
293-
{"is_rls_enabled", _},
287+
{"type", "UPDATE" = type},
288+
{"schema", schema},
289+
{"table", table},
290+
{"columns", columns},
291+
{"record", record},
292+
{"old_record", old_record},
293+
{"commit_timestamp", commit_timestamp},
294294
{"subscription_ids", subscription_ids},
295295
{"errors", errors}
296296
])
297297
when is_list(subscription_ids) do
298298
%UpdatedRecord{
299-
columns: Map.get(wal, "columns", []),
300-
commit_timestamp: Map.get(wal, "commit_timestamp"),
299+
columns: Jason.Fragment.new(columns),
300+
commit_timestamp: commit_timestamp,
301301
errors: convert_errors(errors),
302302
schema: schema,
303303
table: table,
304304
type: type,
305305
subscription_ids: MapSet.new(subscription_ids),
306-
old_record: Map.get(wal, "old_record", %{}),
307-
record: Map.get(wal, "record", %{})
306+
old_record: Jason.Fragment.new(old_record),
307+
record: Jason.Fragment.new(record)
308308
}
309309
end
310310

311311
def generate_record([
312-
{"wal",
313-
%{
314-
"type" => "DELETE" = type,
315-
"schema" => schema,
316-
"table" => table
317-
} = wal},
318-
{"is_rls_enabled", _},
312+
{"type", "DELETE" = type},
313+
{"schema", schema},
314+
{"table", table},
315+
{"columns", columns},
316+
{"record", _},
317+
{"old_record", old_record},
318+
{"commit_timestamp", commit_timestamp},
319319
{"subscription_ids", subscription_ids},
320320
{"errors", errors}
321321
])
322322
when is_list(subscription_ids) do
323323
%DeletedRecord{
324-
columns: Map.get(wal, "columns", []),
325-
commit_timestamp: Map.get(wal, "commit_timestamp"),
324+
columns: Jason.Fragment.new(columns),
325+
commit_timestamp: commit_timestamp,
326326
errors: convert_errors(errors),
327327
schema: schema,
328328
table: table,
329329
type: type,
330330
subscription_ids: MapSet.new(subscription_ids),
331-
old_record: Map.get(wal, "old_record", %{})
331+
old_record: Jason.Fragment.new(old_record)
332332
}
333333
end
334334

lib/extensions/postgres_cdc_rls/replications.ex

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,18 @@ defmodule Extensions.PostgresCdcRls.Replications do
7272
def list_changes(conn, slot_name, publication, max_changes, max_record_bytes) do
7373
query(
7474
conn,
75-
"select * from realtime.list_changes($1, $2, $3, $4)",
75+
"""
76+
SELECT wal->>'type' as type,
77+
wal->>'schema' as schema,
78+
wal->>'table' as table,
79+
wal->>'columns' as columns,
80+
wal->>'record' as record,
81+
wal->>'old_record' as old_record,
82+
wal->>'commit_timestamp' as commit_timestamp,
83+
subscription_ids,
84+
errors
85+
FROM realtime.list_changes($1, $2, $3, $4)
86+
""",
7687
[
7788
publication,
7889
slot_name,

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.59.1",
7+
version: "2.60.0",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

0 commit comments

Comments
 (0)