Skip to content

Commit 058be58

Browse files
authored
fix: limit db events (#1562)
1 parent ecac071 commit 058be58

File tree

6 files changed

+139
-15
lines changed

6 files changed

+139
-15
lines changed

.github/workflows/tests.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ jobs:
4141
restore-keys: |
4242
${{ runner.os }}-mix-
4343
44+
- name: Pull postgres image quietly in background (used by test/support/containers.ex)
45+
run: docker pull supabase/postgres:15.8.1.040 > /dev/null 2>&1 &
4446
- name: Install dependencies
4547
run: mix deps.get
4648
- name: Set up Postgres

lib/extensions/postgres_cdc_rls/replication_poller.ex

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
1818
alias Realtime.Adapters.Changes.NewRecord
1919
alias Realtime.Adapters.Changes.UpdatedRecord
2020
alias Realtime.Database
21+
alias Realtime.RateCounter
22+
alias Realtime.Tenants
2123

2224
def start_link(opts), do: GenServer.start_link(__MODULE__, opts)
2325

@@ -26,6 +28,12 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
2628
tenant_id = args["id"]
2729
Logger.metadata(external_id: tenant_id, project: tenant_id)
2830

31+
%Realtime.Api.Tenant{} = Tenants.Cache.get_tenant_by_external_id(tenant_id)
32+
33+
rate_counter_args = Tenants.db_events_per_second_rate(tenant_id, 4000)
34+
35+
RateCounter.new(rate_counter_args)
36+
2937
state = %{
3038
backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp),
3139
db_host: args["db_host"],
@@ -41,7 +49,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
4149
retry_ref: nil,
4250
retry_count: 0,
4351
slot_name: args["slot_name"] <> slot_name_suffix(),
44-
tenant_id: tenant_id
52+
tenant_id: tenant_id,
53+
rate_counter_args: rate_counter_args
4554
}
4655

4756
{:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{})
@@ -74,7 +83,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
7483
max_record_bytes: max_record_bytes,
7584
max_changes: max_changes,
7685
conn: conn,
77-
tenant_id: tenant_id
86+
tenant_id: tenant_id,
87+
rate_counter_args: rate_counter_args
7888
} = state
7989
) do
8090
cancel_timer(poll_ref)
@@ -84,7 +94,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
8494
{time, list_changes} = :timer.tc(Replications, :list_changes, args)
8595
record_list_changes_telemetry(time, tenant_id)
8696

87-
case handle_list_changes_result(list_changes, tenant_id) do
97+
case handle_list_changes_result(list_changes, tenant_id, rate_counter_args) do
8898
{:ok, row_count} ->
8999
Backoff.reset(backoff)
90100

@@ -177,20 +187,29 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
177187
rows: [_ | _] = rows,
178188
num_rows: rows_count
179189
}},
180-
tenant_id
190+
tenant_id,
191+
rate_counter_args
181192
) do
182-
for row <- rows,
183-
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
184-
topic = "realtime:postgres:" <> tenant_id
193+
case RateCounter.get(rate_counter_args) do
194+
{:ok, %{limit: %{triggered: true}}} ->
195+
:ok
185196

186-
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
197+
_ ->
198+
Realtime.GenCounter.add(rate_counter_args.id, rows_count)
199+
200+
for row <- rows,
201+
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
202+
topic = "realtime:postgres:" <> tenant_id
203+
204+
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
205+
end
187206
end
188207

189208
{:ok, rows_count}
190209
end
191210

192-
defp handle_list_changes_result({:ok, _}, _), do: {:ok, 0}
193-
defp handle_list_changes_result({:error, reason}, _), do: {:error, reason}
211+
defp handle_list_changes_result({:ok, _}, _, _), do: {:ok, 0}
212+
defp handle_list_changes_result({:error, reason}, _, _), do: {:error, reason}
194213

195214
def generate_record([
196215
{"wal",

lib/realtime/tenants.ex

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,31 @@ defmodule Realtime.Tenants do
247247
%RateCounter.Args{id: db_events_per_second_key(tenant_id), opts: opts}
248248
end
249249

250+
@doc "RateCounter arguments for counting database events per second with a limit."
251+
@spec db_events_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t()
252+
def db_events_per_second_rate(tenant_id, max_events_per_second) when is_binary(tenant_id) do
253+
opts = [
254+
telemetry: %{
255+
event_name: [:channel, :db_events],
256+
measurements: %{},
257+
metadata: %{tenant: tenant_id}
258+
},
259+
limit: [
260+
value: max_events_per_second,
261+
measurement: :avg,
262+
log: true,
263+
log_fn: fn ->
264+
Logger.error("MessagePerSecondRateLimitReached: Too many postgres changes messages per second",
265+
external_id: tenant_id,
266+
project: tenant_id
267+
)
268+
end
269+
]
270+
]
271+
272+
%RateCounter.Args{id: db_events_per_second_key(tenant_id), opts: opts}
273+
end
274+
250275
@doc """
251276
The GenCounter key to use when counting events for RealtimeChannel events.
252277
iex> Realtime.Tenants.db_events_per_second_key("tenant_id")

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.53.0",
7+
version: "2.53.1",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,7 +1159,7 @@ defmodule Realtime.Integration.RtChannelTest do
11591159
}
11601160
end)
11611161

1162-
assert log =~ "ChannelShutdown: Token has expired 1000 seconds ago"
1162+
assert log =~ "ChannelShutdown: Token has expired"
11631163
end
11641164

11651165
test "ChannelShutdown include sub if available in jwt claims", %{tenant: tenant, topic: topic} do
@@ -2240,7 +2240,8 @@ defmodule Realtime.Integration.RtChannelTest do
22402240
# 0 events as no broadcast used
22412241
assert 2 = get_count([:realtime, :rate_counter, :channel, :joins], external_id)
22422242
assert 2 = get_count([:realtime, :rate_counter, :channel, :presence_events], external_id)
2243-
assert 10 = get_count([:realtime, :rate_counter, :channel, :db_events], external_id)
2243+
# 5 + 5 + 5 (5 for each websocket and 5 while publishing)
2244+
assert 15 = get_count([:realtime, :rate_counter, :channel, :db_events], external_id)
22442245
assert 0 = get_count([:realtime, :rate_counter, :channel, :events], external_id)
22452246
end
22462247

test/realtime/extensions/cdc_rls/cdc_rls_test.exs

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
235235
end)
236236

237237
RateCounter.stop(tenant.external_id)
238+
on_exit(fn -> RateCounter.stop(tenant.external_id) end)
238239

239240
on_exit(fn -> :telemetry.detach(__MODULE__) end)
240241

@@ -324,8 +325,11 @@ defmodule Realtime.Extensions.CdcRlsTest do
324325

325326
rate = Realtime.Tenants.db_events_per_second_rate(tenant)
326327

327-
assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate)
328-
assert 1 in bucket
328+
assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} =
329+
RateCounter.get(rate)
330+
331+
# 1 from ReplicationPoller and 1 from MessageDispatcher
332+
assert Enum.sum(bucket) == 2
329333

330334
assert_receive {
331335
:telemetry,
@@ -335,6 +339,79 @@ defmodule Realtime.Extensions.CdcRlsTest do
335339
}
336340
end
337341

342+
test "rate limit works", %{tenant: tenant, conn: conn} do
343+
on_exit(fn -> PostgresCdcRls.handle_stop(tenant.external_id, 10_000) end)
344+
345+
%Tenant{extensions: extensions, external_id: external_id} = tenant
346+
postgres_extension = PostgresCdc.filter_settings("postgres_cdc_rls", extensions)
347+
args = Map.put(postgres_extension, "id", external_id)
348+
349+
pg_change_params = [
350+
%{
351+
id: UUID.uuid1(),
352+
params: %{"event" => "*", "schema" => "public"},
353+
channel_pid: self(),
354+
claims: %{
355+
"exp" => System.system_time(:second) + 100_000,
356+
"iat" => 0,
357+
"ref" => "127.0.0.1",
358+
"role" => "anon"
359+
}
360+
}
361+
]
362+
363+
ids =
364+
Enum.map(pg_change_params, fn %{id: id, params: params} ->
365+
{UUID.string_to_binary!(id), :erlang.phash2(params)}
366+
end)
367+
368+
topic = "realtime:test"
369+
serializer = Phoenix.Socket.V1.JSONSerializer
370+
371+
subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true}
372+
metadata = [metadata: subscription_metadata]
373+
:ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata)
374+
375+
# First time it will return nil
376+
PostgresCdcRls.handle_connect(args)
377+
# Wait for it to start
378+
Process.sleep(3000)
379+
{:ok, response} = PostgresCdcRls.handle_connect(args)
380+
381+
# Now subscribe to the Postgres Changes
382+
{:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params)
383+
assert %Postgrex.Result{rows: [[1]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", [])
384+
385+
log =
386+
capture_log(fn ->
387+
# increment artifically the counter to reach the limit
388+
tenant.external_id
389+
|> Realtime.Tenants.db_events_per_second_key()
390+
|> Realtime.GenCounter.add(100_000_000)
391+
392+
# Wait for RateCounter to update
393+
Process.sleep(1500)
394+
end)
395+
396+
assert log =~ "MessagePerSecondRateLimitReached: Too many postgres changes messages per second"
397+
398+
# Insert a record
399+
%{rows: [[_id]]} = Postgrex.query!(conn, "insert into test (details) values ('test') returning id", [])
400+
401+
refute_receive {:socket_push, :text, _}, 5000
402+
403+
# Wait for RateCounter to update
404+
Process.sleep(2000)
405+
406+
rate = Realtime.Tenants.db_events_per_second_rate(tenant)
407+
408+
assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket, limit: %{triggered: true}}} =
409+
RateCounter.get(rate)
410+
411+
# Nothing has changed
412+
assert Enum.sum(bucket) == 100_000_000
413+
end
414+
338415
@aux_mod (quote do
339416
defmodule Subscriber do
340417
# Start CDC remotely

0 commit comments

Comments
 (0)