Skip to content

Commit a0934c0

Browse files
committed
feat: increase pooling rate on no changes
To reduce overall load of the system we will start to reduce pooling speed when no changes are not registered
1 parent 14df369 commit a0934c0

File tree

2 files changed

+182
-45
lines changed

2 files changed

+182
-45
lines changed

lib/extensions/postgres_cdc_rls/replication_poller.ex

Lines changed: 46 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
defmodule Extensions.PostgresCdcRls.ReplicationPoller do
22
@moduledoc """
3-
Polls the write ahead log, applies row level sucurity policies for each subscriber
3+
Polls the write ahead log, applies row level security policies for each subscriber
44
and broadcast records to the `MessageDispatcher`.
55
"""
66

@@ -46,6 +46,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
4646
publication: extension["publication"],
4747
retry_ref: nil,
4848
retry_count: 0,
49+
empty_count: 0,
4950
slot_name: extension["slot_name"] <> slot_name_suffix(),
5051
tenant_id: tenant_id,
5152
rate_counter_args: rate_counter_args,
@@ -64,6 +65,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
6465
{:noreply, state, {:continue, :prepare}}
6566
end
6667

68+
@impl true
6769
def handle_continue(:prepare, state) do
6870
{:noreply, prepare_replication(state)}
6971
end
@@ -72,46 +74,40 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
7274
def handle_info(
7375
:poll,
7476
%{
75-
backoff: backoff,
76-
poll_interval_ms: poll_interval_ms,
77-
poll_ref: poll_ref,
78-
publication: publication,
79-
retry_ref: retry_ref,
80-
retry_count: retry_count,
77+
conn: conn,
8178
slot_name: slot_name,
82-
max_record_bytes: max_record_bytes,
79+
publication: publication,
8380
max_changes: max_changes,
84-
conn: conn,
81+
max_record_bytes: max_record_bytes,
8582
tenant_id: tenant_id,
8683
subscribers_nodes_table: subscribers_nodes_table,
8784
rate_counter_args: rate_counter_args
8885
} = state
8986
) do
90-
cancel_timer(poll_ref)
91-
cancel_timer(retry_ref)
87+
cancel_timer(state.poll_ref)
88+
cancel_timer(state.retry_ref)
9289

93-
args = [conn, slot_name, publication, max_changes, max_record_bytes]
94-
{time, list_changes} = :timer.tc(Replications, :list_changes, args)
90+
{time, list_changes} = :timer.tc(Replications, :list_changes, [conn, slot_name, publication, max_changes, max_record_bytes])
9591
record_list_changes_telemetry(time, tenant_id)
9692

97-
case handle_list_changes_result(list_changes, subscribers_nodes_table, tenant_id, rate_counter_args) do
98-
{:ok, :peek_empty} ->
99-
Backoff.reset(backoff)
100-
poll_ref = Process.send_after(self(), :poll, poll_interval_ms)
101-
{:noreply, %{state | backoff: backoff, poll_ref: poll_ref}}
93+
case dispatch_changes(list_changes, subscribers_nodes_table, tenant_id, rate_counter_args) do
94+
:peek_empty ->
95+
empty_count = state.empty_count + 1
96+
interval = adaptive_interval(state.poll_interval_ms, empty_count)
97+
poll_ref = Process.send_after(self(), :poll, interval)
10298

103-
{:ok, row_count} ->
104-
Backoff.reset(backoff)
99+
{:noreply, %{state | backoff: Backoff.reset(state.backoff), poll_ref: poll_ref, empty_count: empty_count}}
105100

106-
pool_ref =
101+
{:ok, row_count} ->
102+
poll_ref =
107103
if row_count > 0 do
108104
send(self(), :poll)
109105
nil
110106
else
111-
Process.send_after(self(), :poll, poll_interval_ms)
107+
Process.send_after(self(), :poll, state.poll_interval_ms)
112108
end
113109

114-
{:noreply, %{state | backoff: backoff, poll_ref: pool_ref}}
110+
{:noreply, %{state | backoff: Backoff.reset(state.backoff), poll_ref: poll_ref, empty_count: 0}}
115111

116112
{:error, %Postgrex.Error{postgres: %{code: :object_in_use, message: msg}}} ->
117113
log_error("ReplicationSlotBeingUsed", msg)
@@ -122,26 +118,19 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
122118

123119
Logger.warning("Database PID #{db_pid} found in pg_stat_activity with state_change diff of #{diff}")
124120

125-
if retry_count > 3 do
121+
if state.retry_count > 3 do
126122
case Replications.terminate_backend(conn, slot_name) do
127123
{:ok, :terminated} -> Logger.warning("Replication slot in use - terminating")
128124
{:error, :slot_not_found} -> Logger.warning("Replication slot not found")
129125
{:error, error} -> Logger.warning("Error terminating backend: #{inspect(error)}")
130126
end
131127
end
132128

133-
{timeout, backoff} = Backoff.backoff(backoff)
134-
retry_ref = Process.send_after(self(), :retry, timeout)
135-
136-
{:noreply, %{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}}
129+
{:noreply, schedule_retry(state)}
137130

138131
{:error, reason} ->
139132
log_error("PoolingReplicationError", reason)
140-
141-
{timeout, backoff} = Backoff.backoff(backoff)
142-
retry_ref = Process.send_after(self(), :retry, timeout)
143-
144-
{:noreply, %{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}}
133+
{:noreply, schedule_retry(state)}
145134
end
146135
end
147136

@@ -158,22 +147,34 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
158147
end
159148
end
160149

150+
@max_adaptive_increase 500
151+
@empty_polls_per_step 10
152+
@step_increase 100
153+
154+
def adaptive_interval(base_interval, empty_count) do
155+
increase = div(empty_count, @empty_polls_per_step) * @step_increase
156+
min(base_interval + increase, base_interval + @max_adaptive_increase)
157+
end
158+
161159
defp convert_errors([_ | _] = errors), do: errors
162160

163161
defp convert_errors(_), do: nil
164162

165-
defp prepare_replication(%{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state) do
166-
case Replications.prepare_replication(conn, slot_name) do
163+
defp schedule_retry(state) do
164+
{timeout, backoff} = Backoff.backoff(state.backoff)
165+
retry_ref = Process.send_after(self(), :retry, timeout)
166+
%{state | backoff: backoff, retry_ref: retry_ref, retry_count: state.retry_count + 1}
167+
end
168+
169+
defp prepare_replication(state) do
170+
case Replications.prepare_replication(state.conn, state.slot_name) do
167171
{:ok, _} ->
168172
send(self(), :poll)
169173
state
170174

171175
{:error, error} ->
172176
log_error("PoolingReplicationPreparationError", error)
173-
174-
{timeout, backoff} = Backoff.backoff(backoff)
175-
retry_ref = Process.send_after(self(), :retry, timeout)
176-
%{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}
177+
schedule_retry(state)
177178
end
178179
end
179180

@@ -185,15 +186,15 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
185186
)
186187
end
187188

188-
defp handle_list_changes_result(
189+
defp dispatch_changes(
189190
{:ok, %Postgrex.Result{num_rows: 1, rows: [[nil, nil, nil, _, _, _, nil, [], ["peek_empty"]]]}},
190191
_subscribers_nodes_table,
191192
_tenant_id,
192193
_rate_counter_args
193194
),
194-
do: {:ok, :peek_empty}
195+
do: :peek_empty
195196

196-
defp handle_list_changes_result(
197+
defp dispatch_changes(
197198
{:ok,
198199
%Postgrex.Result{
199200
columns: columns,
@@ -249,8 +250,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
249250
{:ok, rows_count}
250251
end
251252

252-
defp handle_list_changes_result({:ok, _}, _, _, _), do: {:ok, 0}
253-
defp handle_list_changes_result({:error, reason}, _, _, _), do: {:error, reason}
253+
defp dispatch_changes({:ok, _}, _, _, _), do: {:ok, 0}
254+
defp dispatch_changes({:error, reason}, _, _, _), do: {:error, reason}
254255

255256
defp collect_subscription_nodes(subscribers_nodes_table, subscription_ids) do
256257
Enum.reduce_while(subscription_ids, {:ok, %{}}, fn subscription_id, {:ok, acc} ->
@@ -266,7 +267,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
266267
end
267268
end)
268269
rescue
269-
_ -> {:error, :node_not_found}
270+
ArgumentError -> {:error, :node_not_found}
270271
end
271272

272273
def generate_record([

test/realtime/extensions/cdc_rls/replication_poller_test.exs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,142 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
575575
end
576576
end
577577

578+
describe "poll adaptive interval" do
579+
setup do
580+
tenant = Containers.checkout_tenant(run_migrations: true)
581+
582+
subscribers_pids_table = :ets.new(:"#{__MODULE__}.adaptive_pids", [:public, :bag])
583+
subscribers_nodes_table = :ets.new(:"#{__MODULE__}.adaptive_nodes", [:public, :set])
584+
585+
args =
586+
hd(tenant.extensions).settings
587+
|> Map.put("id", tenant.external_id)
588+
|> Map.put("subscribers_pids_table", subscribers_pids_table)
589+
|> Map.put("subscribers_nodes_table", subscribers_nodes_table)
590+
591+
%{args: args}
592+
end
593+
594+
test "peek_empty polls slow down over time", %{args: args} do
595+
test_pid = self()
596+
peek_empty_result = {:ok, %Postgrex.Result{num_rows: 1, rows: [[nil, nil, nil, nil, nil, nil, nil, [], ["peek_empty"]]]}}
597+
598+
stub(Replications, :list_changes, fn _, _, _, _, _ ->
599+
send(test_pid, {:polled_at, System.monotonic_time(:millisecond)})
600+
peek_empty_result
601+
end)
602+
603+
reject(&TenantBroadcaster.pubsub_direct_broadcast/6)
604+
reject(&TenantBroadcaster.pubsub_broadcast/5)
605+
606+
start_link_supervised!({Poller, args})
607+
608+
# Collect enough polls to cross the 10-poll threshold where interval increases
609+
timestamps =
610+
for _ <- 1..15 do
611+
assert_receive {:polled_at, ts}, 2_000
612+
ts
613+
end
614+
615+
early_gaps =
616+
timestamps
617+
|> Enum.take(5)
618+
|> Enum.chunk_every(2, 1, :discard)
619+
|> Enum.map(fn [a, b] -> b - a end)
620+
621+
late_gaps =
622+
timestamps
623+
|> Enum.drop(10)
624+
|> Enum.chunk_every(2, 1, :discard)
625+
|> Enum.map(fn [a, b] -> b - a end)
626+
627+
avg_early = Enum.sum(early_gaps) / length(early_gaps)
628+
avg_late = Enum.sum(late_gaps) / length(late_gaps)
629+
630+
# After 10+ peek_empty polls, interval should have increased by at least 100ms
631+
assert avg_late > avg_early,
632+
"Expected later polls to be slower (#{avg_late}ms) than early polls (#{avg_early}ms)"
633+
end
634+
635+
test "empty results (RLS filtered) poll at base interval without slowing down", %{args: args} do
636+
test_pid = self()
637+
empty_result = {:ok, %Postgrex.Result{rows: [], num_rows: 0}}
638+
639+
stub(Replications, :list_changes, fn _, _, _, _, _ ->
640+
send(test_pid, {:polled_at, System.monotonic_time(:millisecond)})
641+
empty_result
642+
end)
643+
644+
reject(&TenantBroadcaster.pubsub_direct_broadcast/6)
645+
reject(&TenantBroadcaster.pubsub_broadcast/5)
646+
647+
start_link_supervised!({Poller, args})
648+
649+
timestamps =
650+
for _ <- 1..4 do
651+
assert_receive {:polled_at, ts}, 1_000
652+
ts
653+
end
654+
655+
gaps = timestamps |> Enum.chunk_every(2, 1, :discard) |> Enum.map(fn [a, b] -> b - a end)
656+
657+
# All gaps should be roughly the base interval (100ms), none growing
658+
for gap <- gaps do
659+
assert gap < 200, "Expected base interval polling (~100ms), got #{gap}ms"
660+
end
661+
end
662+
663+
test "rows with changes trigger immediate re-poll without delay", %{args: args} do
664+
test_pid = self()
665+
666+
results_with_changes =
667+
build_result([
668+
<<71, 36, 83, 212, 168, 9, 17, 240, 165, 186, 118, 202, 193, 157, 232, 187>>
669+
])
670+
671+
stub(Replications, :list_changes, fn _, _, _, _, _ ->
672+
send(test_pid, {:polled_at, System.monotonic_time(:millisecond)})
673+
results_with_changes
674+
end)
675+
676+
stub(TenantBroadcaster, :pubsub_broadcast, fn _, _, _, _, _ -> :ok end)
677+
678+
start_link_supervised!({Poller, args})
679+
680+
timestamps =
681+
for _ <- 1..3 do
682+
assert_receive {:polled_at, ts}, 1_000
683+
ts
684+
end
685+
686+
gaps = timestamps |> Enum.chunk_every(2, 1, :discard) |> Enum.map(fn [a, b] -> b - a end)
687+
688+
# With changes, send(self(), :poll) means near-immediate re-poll
689+
for gap <- gaps do
690+
assert gap < 50, "Expected immediate re-poll, got #{gap}ms delay"
691+
end
692+
end
693+
end
694+
695+
describe "adaptive_interval/2" do
696+
test "returns base interval when empty_count is below threshold" do
697+
assert Poller.adaptive_interval(100, 0) == 100
698+
assert Poller.adaptive_interval(100, 9) == 100
699+
end
700+
701+
test "increases by 100ms for every 10 empty polls" do
702+
assert Poller.adaptive_interval(100, 10) == 200
703+
assert Poller.adaptive_interval(100, 20) == 300
704+
assert Poller.adaptive_interval(100, 30) == 400
705+
end
706+
707+
test "caps at base + 500ms" do
708+
assert Poller.adaptive_interval(100, 50) == 600
709+
assert Poller.adaptive_interval(100, 100) == 600
710+
assert Poller.adaptive_interval(100, 1000) == 600
711+
end
712+
end
713+
578714
describe "slot_name_suffix/0" do
579715
setup do
580716
slot_name_suffix = Application.get_env(:realtime, :slot_name_suffix)

0 commit comments

Comments
 (0)