Skip to content

Commit 14df369

Browse files
committed
update to show if peek empty or empty results
1 parent 389273f commit 14df369

File tree

3 files changed

+99
-5
lines changed

3 files changed

+99
-5
lines changed

lib/extensions/postgres_cdc_rls/replication_poller.ex

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
9595
record_list_changes_telemetry(time, tenant_id)
9696

9797
case handle_list_changes_result(list_changes, subscribers_nodes_table, tenant_id, rate_counter_args) do
98+
{:ok, :peek_empty} ->
99+
Backoff.reset(backoff)
100+
poll_ref = Process.send_after(self(), :poll, poll_interval_ms)
101+
{:noreply, %{state | backoff: backoff, poll_ref: poll_ref}}
102+
98103
{:ok, row_count} ->
99104
Backoff.reset(backoff)
100105

@@ -180,6 +185,14 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
180185
)
181186
end
182187

188+
defp handle_list_changes_result(
189+
{:ok, %Postgrex.Result{num_rows: 1, rows: [[nil, nil, nil, _, _, _, nil, [], ["peek_empty"]]]}},
190+
_subscribers_nodes_table,
191+
_tenant_id,
192+
_rate_counter_args
193+
),
194+
do: {:ok, :peek_empty}
195+
183196
defp handle_list_changes_result(
184197
{:ok,
185198
%Postgrex.Result{

lib/realtime/tenants/repo/migrations/20260210000000_create_peek_and_list_changes_function.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,16 @@ defmodule Realtime.Tenants.Migrations.CreatePeekAndListChangesFunction do
7777
where
7878
w2j.w2j_add_tables <> ''
7979
and xyz.subscription_ids[1] is not null
80+
81+
union all
82+
83+
select
84+
null::jsonb,
85+
null::boolean,
86+
'{}'::uuid[],
87+
'{peek_empty}'::text[]
88+
where
89+
not exists (select 1 from peek)
8090
$$;
8191
""")
8292
end

test/integration/replications_test.exs

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ defmodule Realtime.Integration.ReplicationsTest do
4141

4242
describe "replication polling lifecycle" do
4343
test "prepare, poll, consume full cycle", %{conn: conn, slot_name: slot_name} do
44-
# Empty slot short-circuits via peek
4544
{time, result} =
4645
:timer.tc(fn ->
4746
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
4847
end)
4948

50-
assert {:ok, %Postgrex.Result{num_rows: 0}} = result
49+
assert {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = result
50+
assert Enum.at(sentinel, 8) == ["peek_empty"]
5151
assert time < 50_000, "Expected peek short-circuit under 50ms, took #{div(time, 1000)}ms"
5252

5353
Process.sleep(@poll_interval)
@@ -68,15 +68,18 @@ defmodule Realtime.Integration.ReplicationsTest do
6868

6969
Process.sleep(@poll_interval)
7070

71-
{:ok, %Postgrex.Result{num_rows: 0}} =
71+
{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
7272
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
73+
74+
assert Enum.at(sentinel, 8) == ["peek_empty"]
7375
end
7476

7577
test "polls empty multiple times then captures a change when it arrives", %{conn: conn, slot_name: slot_name} do
7678
for _ <- 1..5 do
77-
{:ok, %Postgrex.Result{num_rows: 0}} =
79+
{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
7880
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
7981

82+
assert Enum.at(sentinel, 8) == ["peek_empty"]
8083
Process.sleep(@poll_interval)
8184
end
8285

@@ -92,8 +95,10 @@ defmodule Realtime.Integration.ReplicationsTest do
9295

9396
Process.sleep(@poll_interval)
9497

95-
{:ok, %Postgrex.Result{num_rows: 0}} =
98+
{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
9699
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
100+
101+
assert Enum.at(sentinel, 8) == ["peek_empty"]
97102
end
98103

99104
test "prepare_replication is idempotent", %{conn: conn, slot_name: slot_name} do
@@ -116,4 +121,70 @@ defmodule Realtime.Integration.ReplicationsTest do
116121
assert is_integer(diff)
117122
end
118123
end
124+
125+
describe "peek vs RLS distinction" do
126+
setup do
127+
tenant = Containers.checkout_tenant(run_migrations: true)
128+
129+
{:ok, conn} =
130+
tenant
131+
|> Database.from_tenant("realtime_rls")
132+
|> Map.from_struct()
133+
|> Keyword.new()
134+
|> Postgrex.start_link()
135+
136+
slot_name = "supabase_realtime_rls_slot_#{System.unique_integer([:positive])}"
137+
138+
on_exit(fn ->
139+
try do
140+
Postgrex.query(conn, "select pg_drop_replication_slot($1)", [slot_name])
141+
catch
142+
_, _ -> :ok
143+
end
144+
end)
145+
146+
{:ok, subscription_params} =
147+
Subscriptions.parse_subscription_params(%{
148+
"event" => "*",
149+
"schema" => "public",
150+
"table" => "test",
151+
"filter" => "details=eq.no_match"
152+
})
153+
154+
params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}]
155+
{:ok, _} = Subscriptions.create(conn, @publication, params_list, self(), self())
156+
{:ok, _} = Replications.prepare_replication(conn, slot_name)
157+
158+
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
159+
160+
%{conn: conn, slot_name: slot_name}
161+
end
162+
163+
test "returns 0 rows when WAL changes exist but are filtered by subscription", %{
164+
conn: conn,
165+
slot_name: slot_name
166+
} do
167+
# Peek is empty - sentinel row
168+
{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
169+
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
170+
171+
assert Enum.at(sentinel, 8) == ["peek_empty"]
172+
173+
# Insert a row that doesn't match the filter (details != "no_match")
174+
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('rls_filtered')", [])
175+
Process.sleep(@poll_interval)
176+
177+
# WAL changes consumed but subscription filter doesn't match - 0 rows, no sentinel
178+
{:ok, %Postgrex.Result{num_rows: 0}} =
179+
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
180+
181+
Process.sleep(@poll_interval)
182+
183+
# After consumption, peek is empty again - sentinel returned
184+
{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
185+
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
186+
187+
assert Enum.at(sentinel, 8) == ["peek_empty"]
188+
end
189+
end
119190
end

0 commit comments

Comments
 (0)