Skip to content

Commit 389273f

Browse files
committed
feat: implement peek on walrus query
To reduce overall load of the query we are now peeking before pulling so we avoid running the more expensive query and get faster feedback
1 parent 9c969ed commit 389273f

File tree

3 files changed

+272
-2
lines changed

3 files changed

+272
-2
lines changed

lib/realtime/tenants/migrations.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ defmodule Realtime.Tenants.Migrations do
8181
CreateMessagesReplayIndex,
8282
BroadcastSendIncludePayloadId,
8383
AddActionToSubscriptions,
84-
FilterActionPostgresChanges
84+
FilterActionPostgresChanges,
85+
CreatePeekAndListChangesFunction
8586
}
8687

8788
@migrations [
@@ -151,7 +152,8 @@ defmodule Realtime.Tenants.Migrations do
151152
{20_250_905_041_441, CreateMessagesReplayIndex},
152153
{20_251_103_001_201, BroadcastSendIncludePayloadId},
153154
{20_251_120_212_548, AddActionToSubscriptions},
154-
{20_251_120_215_549, FilterActionPostgresChanges}
155+
{20_251_120_215_549, FilterActionPostgresChanges},
156+
{20_260_210_000_000, CreatePeekAndListChangesFunction}
155157
]
156158

157159
defstruct [:tenant_external_id, :settings, migrations_ran: 0]
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
defmodule Realtime.Tenants.Migrations.CreatePeekAndListChangesFunction do
2+
@moduledoc false
3+
4+
use Ecto.Migration
5+
6+
def up do
7+
execute("""
8+
create or replace function realtime.list_changes(publication name, slot_name name, max_changes int, max_record_bytes int)
9+
returns setof realtime.wal_rls
10+
language sql
11+
as $$
12+
with peek as (
13+
select 1
14+
from pg_logical_slot_peek_changes(
15+
slot_name, null, 1,
16+
'include-pk', 'true',
17+
'include-transaction', 'false',
18+
'include-timestamp', 'true',
19+
'include-type-oids', 'true',
20+
'format-version', '2'
21+
)
22+
limit 1
23+
),
24+
pub as (
25+
select
26+
concat_ws(
27+
',',
28+
case when bool_or(pubinsert) then 'insert' else null end,
29+
case when bool_or(pubupdate) then 'update' else null end,
30+
case when bool_or(pubdelete) then 'delete' else null end
31+
) as w2j_actions,
32+
coalesce(
33+
string_agg(
34+
realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass),
35+
','
36+
) filter (where ppt.tablename is not null and ppt.tablename not like '% %'),
37+
''
38+
) w2j_add_tables
39+
from
40+
pg_publication pp
41+
left join pg_publication_tables ppt
42+
on pp.pubname = ppt.pubname
43+
where
44+
pp.pubname = publication
45+
and exists (select 1 from peek)
46+
group by
47+
pp.pubname
48+
limit 1
49+
),
50+
w2j as (
51+
select
52+
x.*, pub.w2j_add_tables
53+
from
54+
pub,
55+
pg_logical_slot_get_changes(
56+
slot_name, null, max_changes,
57+
'include-pk', 'true',
58+
'include-transaction', 'false',
59+
'include-timestamp', 'true',
60+
'include-type-oids', 'true',
61+
'format-version', '2',
62+
'actions', pub.w2j_actions,
63+
'add-tables', pub.w2j_add_tables
64+
) x
65+
)
66+
select
67+
xyz.wal,
68+
xyz.is_rls_enabled,
69+
xyz.subscription_ids,
70+
xyz.errors
71+
from
72+
w2j,
73+
realtime.apply_rls(
74+
wal := w2j.data::jsonb,
75+
max_record_bytes := max_record_bytes
76+
) xyz(wal, is_rls_enabled, subscription_ids, errors)
77+
where
78+
w2j.w2j_add_tables <> ''
79+
and xyz.subscription_ids[1] is not null
80+
$$;
81+
""")
82+
end
83+
84+
def down do
85+
execute("""
86+
create or replace function realtime.list_changes(publication name, slot_name name, max_changes int, max_record_bytes int)
87+
returns setof realtime.wal_rls
88+
language sql
89+
set log_min_messages to 'fatal'
90+
as $$
91+
with pub as (
92+
select
93+
concat_ws(
94+
',',
95+
case when bool_or(pubinsert) then 'insert' else null end,
96+
case when bool_or(pubupdate) then 'update' else null end,
97+
case when bool_or(pubdelete) then 'delete' else null end
98+
) as w2j_actions,
99+
coalesce(
100+
string_agg(
101+
realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass),
102+
','
103+
) filter (where ppt.tablename is not null and ppt.tablename not like '% %'),
104+
''
105+
) w2j_add_tables
106+
from
107+
pg_publication pp
108+
left join pg_publication_tables ppt
109+
on pp.pubname = ppt.pubname
110+
where
111+
pp.pubname = publication
112+
group by
113+
pp.pubname
114+
limit 1
115+
),
116+
w2j as (
117+
select
118+
x.*, pub.w2j_add_tables
119+
from
120+
pub,
121+
pg_logical_slot_get_changes(
122+
slot_name, null, max_changes,
123+
'include-pk', 'true',
124+
'include-transaction', 'false',
125+
'include-timestamp', 'true',
126+
'include-type-oids', 'true',
127+
'format-version', '2',
128+
'actions', pub.w2j_actions,
129+
'add-tables', pub.w2j_add_tables
130+
) x
131+
)
132+
select
133+
xyz.wal,
134+
xyz.is_rls_enabled,
135+
xyz.subscription_ids,
136+
xyz.errors
137+
from
138+
w2j,
139+
realtime.apply_rls(
140+
wal := w2j.data::jsonb,
141+
max_record_bytes := max_record_bytes
142+
) xyz(wal, is_rls_enabled, subscription_ids, errors)
143+
where
144+
w2j.w2j_add_tables <> ''
145+
and xyz.subscription_ids[1] is not null
146+
$$;
147+
""")
148+
end
149+
end
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
defmodule Realtime.Integration.ReplicationsTest do
2+
use Realtime.DataCase, async: false
3+
4+
alias Extensions.PostgresCdcRls.Replications
5+
alias Extensions.PostgresCdcRls.Subscriptions
6+
alias Realtime.Database
7+
8+
@publication "supabase_realtime_test"
9+
@poll_interval 100
10+
11+
setup do
12+
tenant = Containers.checkout_tenant(run_migrations: true)
13+
14+
{:ok, conn} =
15+
tenant
16+
|> Database.from_tenant("realtime_rls")
17+
|> Map.from_struct()
18+
|> Keyword.new()
19+
|> Postgrex.start_link()
20+
21+
slot_name = "supabase_realtime_test_slot_#{System.unique_integer([:positive])}"
22+
23+
on_exit(fn ->
24+
try do
25+
Postgrex.query(conn, "select pg_drop_replication_slot($1)", [slot_name])
26+
catch
27+
_, _ -> :ok
28+
end
29+
end)
30+
31+
{:ok, subscription_params} = Subscriptions.parse_subscription_params(%{"event" => "*", "schema" => "public"})
32+
params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}]
33+
{:ok, _} = Subscriptions.create(conn, @publication, params_list, self(), self())
34+
{:ok, _} = Replications.prepare_replication(conn, slot_name)
35+
36+
# Drain any setup changes
37+
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
38+
39+
%{conn: conn, slot_name: slot_name}
40+
end
41+
42+
describe "replication polling lifecycle" do
43+
test "prepare, poll, consume full cycle", %{conn: conn, slot_name: slot_name} do
44+
# Empty slot short-circuits via peek
45+
{time, result} =
46+
:timer.tc(fn ->
47+
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
48+
end)
49+
50+
assert {:ok, %Postgrex.Result{num_rows: 0}} = result
51+
assert time < 50_000, "Expected peek short-circuit under 50ms, took #{div(time, 1000)}ms"
52+
53+
Process.sleep(@poll_interval)
54+
55+
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_1')", [])
56+
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_2')", [])
57+
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_3')", [])
58+
59+
Process.sleep(@poll_interval)
60+
61+
{:ok, %Postgrex.Result{num_rows: 3, rows: rows}} =
62+
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
63+
64+
[row | _] = rows
65+
assert Enum.at(row, 0) == "INSERT"
66+
assert Enum.at(row, 1) == "public"
67+
assert Enum.at(row, 2) == "test"
68+
69+
Process.sleep(@poll_interval)
70+
71+
{:ok, %Postgrex.Result{num_rows: 0}} =
72+
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
73+
end
74+
75+
test "polls empty multiple times then captures a change when it arrives", %{conn: conn, slot_name: slot_name} do
76+
for _ <- 1..5 do
77+
{:ok, %Postgrex.Result{num_rows: 0}} =
78+
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
79+
80+
Process.sleep(@poll_interval)
81+
end
82+
83+
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('delayed_arrival')", [])
84+
Process.sleep(@poll_interval)
85+
86+
{:ok, %Postgrex.Result{num_rows: 1, rows: [row]}} =
87+
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
88+
89+
assert Enum.at(row, 0) == "INSERT"
90+
assert Enum.at(row, 1) == "public"
91+
assert Enum.at(row, 2) == "test"
92+
93+
Process.sleep(@poll_interval)
94+
95+
{:ok, %Postgrex.Result{num_rows: 0}} =
96+
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
97+
end
98+
99+
test "prepare_replication is idempotent", %{conn: conn, slot_name: slot_name} do
100+
{:ok, _} = Replications.prepare_replication(conn, slot_name)
101+
Process.sleep(@poll_interval)
102+
{:ok, _} = Replications.prepare_replication(conn, slot_name)
103+
end
104+
105+
test "terminate_backend returns slot_not_found for unknown slots", %{conn: conn} do
106+
assert {:error, :slot_not_found} =
107+
Replications.terminate_backend(conn, "nonexistent_slot_#{System.unique_integer([:positive])}")
108+
end
109+
110+
test "get_pg_stat_activity_diff returns elapsed seconds for active connection", %{conn: conn} do
111+
{:ok, %Postgrex.Result{rows: [[pid]]}} = Postgrex.query(conn, "SELECT pg_backend_pid()", [])
112+
Postgrex.query!(conn, "SET application_name = 'realtime_rls'", [])
113+
Process.sleep(@poll_interval)
114+
115+
assert {:ok, diff} = Replications.get_pg_stat_activity_diff(conn, pid)
116+
assert is_integer(diff)
117+
end
118+
end
119+
end

0 commit comments

Comments
 (0)