Skip to content

Commit f457695

Browse files
committed
feat: increase pooling rate on no changes
To reduce overall load of the system we will start to reduce pooling speed when no changes are not registered
1 parent 14df369 commit f457695

File tree

2 files changed

+186
-45
lines changed

2 files changed

+186
-45
lines changed

lib/extensions/postgres_cdc_rls/replication_poller.ex

Lines changed: 48 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
defmodule Extensions.PostgresCdcRls.ReplicationPoller do
22
@moduledoc """
3-
Polls the write ahead log, applies row level sucurity policies for each subscriber
3+
Polls the write ahead log, applies row level security policies for each subscriber
44
and broadcast records to the `MessageDispatcher`.
55
"""
66

@@ -46,6 +46,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
4646
publication: extension["publication"],
4747
retry_ref: nil,
4848
retry_count: 0,
49+
empty_count: 0,
4950
slot_name: extension["slot_name"] <> slot_name_suffix(),
5051
tenant_id: tenant_id,
5152
rate_counter_args: rate_counter_args,
@@ -64,6 +65,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
6465
{:noreply, state, {:continue, :prepare}}
6566
end
6667

68+
@impl true
6769
def handle_continue(:prepare, state) do
6870
{:noreply, prepare_replication(state)}
6971
end
@@ -72,46 +74,42 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
7274
def handle_info(
7375
:poll,
7476
%{
75-
backoff: backoff,
76-
poll_interval_ms: poll_interval_ms,
77-
poll_ref: poll_ref,
78-
publication: publication,
79-
retry_ref: retry_ref,
80-
retry_count: retry_count,
77+
conn: conn,
8178
slot_name: slot_name,
82-
max_record_bytes: max_record_bytes,
79+
publication: publication,
8380
max_changes: max_changes,
84-
conn: conn,
81+
max_record_bytes: max_record_bytes,
8582
tenant_id: tenant_id,
8683
subscribers_nodes_table: subscribers_nodes_table,
8784
rate_counter_args: rate_counter_args
8885
} = state
8986
) do
90-
cancel_timer(poll_ref)
91-
cancel_timer(retry_ref)
87+
cancel_timer(state.poll_ref)
88+
cancel_timer(state.retry_ref)
89+
90+
{time, list_changes} =
91+
:timer.tc(Replications, :list_changes, [conn, slot_name, publication, max_changes, max_record_bytes])
9292

93-
args = [conn, slot_name, publication, max_changes, max_record_bytes]
94-
{time, list_changes} = :timer.tc(Replications, :list_changes, args)
9593
record_list_changes_telemetry(time, tenant_id)
9694

97-
case handle_list_changes_result(list_changes, subscribers_nodes_table, tenant_id, rate_counter_args) do
98-
{:ok, :peek_empty} ->
99-
Backoff.reset(backoff)
100-
poll_ref = Process.send_after(self(), :poll, poll_interval_ms)
101-
{:noreply, %{state | backoff: backoff, poll_ref: poll_ref}}
95+
case dispatch_changes(list_changes, subscribers_nodes_table, tenant_id, rate_counter_args) do
96+
:peek_empty ->
97+
empty_count = state.empty_count + 1
98+
interval = adaptive_interval(state.poll_interval_ms, empty_count)
99+
poll_ref = Process.send_after(self(), :poll, interval)
102100

103-
{:ok, row_count} ->
104-
Backoff.reset(backoff)
101+
{:noreply, %{state | backoff: Backoff.reset(state.backoff), poll_ref: poll_ref, empty_count: empty_count}}
105102

106-
pool_ref =
103+
{:ok, row_count} ->
104+
poll_ref =
107105
if row_count > 0 do
108106
send(self(), :poll)
109107
nil
110108
else
111-
Process.send_after(self(), :poll, poll_interval_ms)
109+
Process.send_after(self(), :poll, state.poll_interval_ms)
112110
end
113111

114-
{:noreply, %{state | backoff: backoff, poll_ref: pool_ref}}
112+
{:noreply, %{state | backoff: Backoff.reset(state.backoff), poll_ref: poll_ref, empty_count: 0}}
115113

116114
{:error, %Postgrex.Error{postgres: %{code: :object_in_use, message: msg}}} ->
117115
log_error("ReplicationSlotBeingUsed", msg)
@@ -122,26 +120,19 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
122120

123121
Logger.warning("Database PID #{db_pid} found in pg_stat_activity with state_change diff of #{diff}")
124122

125-
if retry_count > 3 do
123+
if state.retry_count > 3 do
126124
case Replications.terminate_backend(conn, slot_name) do
127125
{:ok, :terminated} -> Logger.warning("Replication slot in use - terminating")
128126
{:error, :slot_not_found} -> Logger.warning("Replication slot not found")
129127
{:error, error} -> Logger.warning("Error terminating backend: #{inspect(error)}")
130128
end
131129
end
132130

133-
{timeout, backoff} = Backoff.backoff(backoff)
134-
retry_ref = Process.send_after(self(), :retry, timeout)
135-
136-
{:noreply, %{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}}
131+
{:noreply, schedule_retry(state)}
137132

138133
{:error, reason} ->
139134
log_error("PoolingReplicationError", reason)
140-
141-
{timeout, backoff} = Backoff.backoff(backoff)
142-
retry_ref = Process.send_after(self(), :retry, timeout)
143-
144-
{:noreply, %{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}}
135+
{:noreply, schedule_retry(state)}
145136
end
146137
end
147138

@@ -158,22 +149,34 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
158149
end
159150
end
160151

152+
@max_adaptive_increase 500
153+
@empty_polls_per_step 10
154+
@step_increase 100
155+
156+
def adaptive_interval(base_interval, empty_count) do
157+
increase = div(empty_count, @empty_polls_per_step) * @step_increase
158+
min(base_interval + increase, base_interval + @max_adaptive_increase)
159+
end
160+
161161
defp convert_errors([_ | _] = errors), do: errors
162162

163163
defp convert_errors(_), do: nil
164164

165-
defp prepare_replication(%{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state) do
166-
case Replications.prepare_replication(conn, slot_name) do
165+
defp schedule_retry(state) do
166+
{timeout, backoff} = Backoff.backoff(state.backoff)
167+
retry_ref = Process.send_after(self(), :retry, timeout)
168+
%{state | backoff: backoff, retry_ref: retry_ref, retry_count: state.retry_count + 1}
169+
end
170+
171+
defp prepare_replication(state) do
172+
case Replications.prepare_replication(state.conn, state.slot_name) do
167173
{:ok, _} ->
168174
send(self(), :poll)
169175
state
170176

171177
{:error, error} ->
172178
log_error("PoolingReplicationPreparationError", error)
173-
174-
{timeout, backoff} = Backoff.backoff(backoff)
175-
retry_ref = Process.send_after(self(), :retry, timeout)
176-
%{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}
179+
schedule_retry(state)
177180
end
178181
end
179182

@@ -185,15 +188,15 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
185188
)
186189
end
187190

188-
defp handle_list_changes_result(
191+
defp dispatch_changes(
189192
{:ok, %Postgrex.Result{num_rows: 1, rows: [[nil, nil, nil, _, _, _, nil, [], ["peek_empty"]]]}},
190193
_subscribers_nodes_table,
191194
_tenant_id,
192195
_rate_counter_args
193196
),
194-
do: {:ok, :peek_empty}
197+
do: :peek_empty
195198

196-
defp handle_list_changes_result(
199+
defp dispatch_changes(
197200
{:ok,
198201
%Postgrex.Result{
199202
columns: columns,
@@ -249,8 +252,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
249252
{:ok, rows_count}
250253
end
251254

252-
defp handle_list_changes_result({:ok, _}, _, _, _), do: {:ok, 0}
253-
defp handle_list_changes_result({:error, reason}, _, _, _), do: {:error, reason}
255+
defp dispatch_changes({:ok, _}, _, _, _), do: {:ok, 0}
256+
defp dispatch_changes({:error, reason}, _, _, _), do: {:error, reason}
254257

255258
defp collect_subscription_nodes(subscribers_nodes_table, subscription_ids) do
256259
Enum.reduce_while(subscription_ids, {:ok, %{}}, fn subscription_id, {:ok, acc} ->
@@ -266,7 +269,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
266269
end
267270
end)
268271
rescue
269-
_ -> {:error, :node_not_found}
272+
ArgumentError -> {:error, :node_not_found}
270273
end
271274

272275
def generate_record([

test/realtime/extensions/cdc_rls/replication_poller_test.exs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,144 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
575575
end
576576
end
577577

578+
describe "poll adaptive interval" do
579+
setup do
580+
tenant = Containers.checkout_tenant(run_migrations: true)
581+
582+
subscribers_pids_table = :ets.new(:"#{__MODULE__}.adaptive_pids", [:public, :bag])
583+
subscribers_nodes_table = :ets.new(:"#{__MODULE__}.adaptive_nodes", [:public, :set])
584+
585+
args =
586+
hd(tenant.extensions).settings
587+
|> Map.put("id", tenant.external_id)
588+
|> Map.put("subscribers_pids_table", subscribers_pids_table)
589+
|> Map.put("subscribers_nodes_table", subscribers_nodes_table)
590+
591+
%{args: args}
592+
end
593+
594+
test "peek_empty polls slow down over time", %{args: args} do
595+
test_pid = self()
596+
597+
peek_empty_result =
598+
{:ok, %Postgrex.Result{num_rows: 1, rows: [[nil, nil, nil, nil, nil, nil, nil, [], ["peek_empty"]]]}}
599+
600+
stub(Replications, :list_changes, fn _, _, _, _, _ ->
601+
send(test_pid, {:polled_at, System.monotonic_time(:millisecond)})
602+
peek_empty_result
603+
end)
604+
605+
reject(&TenantBroadcaster.pubsub_direct_broadcast/6)
606+
reject(&TenantBroadcaster.pubsub_broadcast/5)
607+
608+
start_link_supervised!({Poller, args})
609+
610+
# Collect enough polls to cross the 10-poll threshold where interval increases
611+
timestamps =
612+
for _ <- 1..15 do
613+
assert_receive {:polled_at, ts}, 2_000
614+
ts
615+
end
616+
617+
early_gaps =
618+
timestamps
619+
|> Enum.take(5)
620+
|> Enum.chunk_every(2, 1, :discard)
621+
|> Enum.map(fn [a, b] -> b - a end)
622+
623+
late_gaps =
624+
timestamps
625+
|> Enum.drop(10)
626+
|> Enum.chunk_every(2, 1, :discard)
627+
|> Enum.map(fn [a, b] -> b - a end)
628+
629+
avg_early = Enum.sum(early_gaps) / length(early_gaps)
630+
avg_late = Enum.sum(late_gaps) / length(late_gaps)
631+
632+
# After 10+ peek_empty polls, interval should have increased by at least 100ms
633+
assert avg_late > avg_early,
634+
"Expected later polls to be slower (#{avg_late}ms) than early polls (#{avg_early}ms)"
635+
end
636+
637+
test "empty results (RLS filtered) poll at base interval without slowing down", %{args: args} do
638+
test_pid = self()
639+
empty_result = {:ok, %Postgrex.Result{rows: [], num_rows: 0}}
640+
641+
stub(Replications, :list_changes, fn _, _, _, _, _ ->
642+
send(test_pid, {:polled_at, System.monotonic_time(:millisecond)})
643+
empty_result
644+
end)
645+
646+
reject(&TenantBroadcaster.pubsub_direct_broadcast/6)
647+
reject(&TenantBroadcaster.pubsub_broadcast/5)
648+
649+
start_link_supervised!({Poller, args})
650+
651+
timestamps =
652+
for _ <- 1..4 do
653+
assert_receive {:polled_at, ts}, 1_000
654+
ts
655+
end
656+
657+
gaps = timestamps |> Enum.chunk_every(2, 1, :discard) |> Enum.map(fn [a, b] -> b - a end)
658+
659+
# All gaps should be roughly the base interval (100ms), none growing
660+
for gap <- gaps do
661+
assert gap < 200, "Expected base interval polling (~100ms), got #{gap}ms"
662+
end
663+
end
664+
665+
test "rows with changes trigger immediate re-poll without delay", %{args: args} do
666+
test_pid = self()
667+
668+
results_with_changes =
669+
build_result([
670+
<<71, 36, 83, 212, 168, 9, 17, 240, 165, 186, 118, 202, 193, 157, 232, 187>>
671+
])
672+
673+
stub(Replications, :list_changes, fn _, _, _, _, _ ->
674+
send(test_pid, {:polled_at, System.monotonic_time(:millisecond)})
675+
results_with_changes
676+
end)
677+
678+
stub(TenantBroadcaster, :pubsub_broadcast, fn _, _, _, _, _ -> :ok end)
679+
680+
start_link_supervised!({Poller, args})
681+
682+
timestamps =
683+
for _ <- 1..3 do
684+
assert_receive {:polled_at, ts}, 1_000
685+
ts
686+
end
687+
688+
gaps = timestamps |> Enum.chunk_every(2, 1, :discard) |> Enum.map(fn [a, b] -> b - a end)
689+
690+
# With changes, send(self(), :poll) means near-immediate re-poll
691+
for gap <- gaps do
692+
assert gap < 50, "Expected immediate re-poll, got #{gap}ms delay"
693+
end
694+
end
695+
end
696+
697+
describe "adaptive_interval/2" do
698+
test "returns base interval when empty_count is below threshold" do
699+
assert Poller.adaptive_interval(100, 0) == 100
700+
assert Poller.adaptive_interval(100, 9) == 100
701+
end
702+
703+
test "increases by 100ms for every 10 empty polls" do
704+
assert Poller.adaptive_interval(100, 10) == 200
705+
assert Poller.adaptive_interval(100, 20) == 300
706+
assert Poller.adaptive_interval(100, 30) == 400
707+
end
708+
709+
test "caps at base + 500ms" do
710+
assert Poller.adaptive_interval(100, 50) == 600
711+
assert Poller.adaptive_interval(100, 100) == 600
712+
assert Poller.adaptive_interval(100, 1000) == 600
713+
end
714+
end
715+
578716
describe "slot_name_suffix/0" do
579717
setup do
580718
slot_name_suffix = Application.get_env(:realtime, :slot_name_suffix)

0 commit comments

Comments
 (0)