From 2d225d526c043aa96b7ca1706f55444ee569f0ea Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Pinho Date: Tue, 7 Oct 2025 13:38:16 +1300 Subject: [PATCH 1/4] fix: limit db events --- .github/workflows/tests.yml | 2 + .../postgres_cdc_rls/replication_poller.ex | 40 +++++++--- lib/realtime/tenants.ex | 25 ++++++ mix.exs | 2 +- .../extensions/cdc_rls/cdc_rls_test.exs | 80 ++++++++++++++++++- 5 files changed, 136 insertions(+), 13 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c9c2a73fa..5d3f686dd 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -41,6 +41,8 @@ jobs: restore-keys: | ${{ runner.os }}-mix- + - name: Pull postgres image quietly in background (used by test/support/containers.ex) + run: docker pull supabase/postgres:15.8.1.040 > /dev/null 2>&1 & - name: Install dependencies run: mix deps.get - name: Set up Postgres diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index 85466ebe9..20aedfc2e 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -18,6 +18,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do alias Realtime.Adapters.Changes.NewRecord alias Realtime.Adapters.Changes.UpdatedRecord alias Realtime.Database + alias Realtime.RateCounter + alias Realtime.Tenants def start_link(opts), do: GenServer.start_link(__MODULE__, opts) @@ -26,6 +28,13 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do tenant_id = args["id"] Logger.metadata(external_id: tenant_id, project: tenant_id) + %Realtime.Api.Tenant{max_events_per_second: max_events_per_second} = + Tenants.Cache.get_tenant_by_external_id(tenant_id) + + rate_counter_args = Tenants.db_events_per_second_rate(tenant_id, max_events_per_second) + + RateCounter.new(rate_counter_args) + state = %{ backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp), db_host: args["db_host"], @@ -41,7 +50,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do retry_ref: nil, retry_count: 0, slot_name: args["slot_name"] <> slot_name_suffix(), - tenant_id: tenant_id + tenant_id: tenant_id, + rate_counter_args: rate_counter_args } {:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{}) @@ -74,7 +84,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do max_record_bytes: max_record_bytes, max_changes: max_changes, conn: conn, - tenant_id: tenant_id + tenant_id: tenant_id, + rate_counter_args: rate_counter_args } = state ) do cancel_timer(poll_ref) @@ -84,7 +95,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do {time, list_changes} = :timer.tc(Replications, :list_changes, args) record_list_changes_telemetry(time, tenant_id) - case handle_list_changes_result(list_changes, tenant_id) do + case handle_list_changes_result(list_changes, tenant_id, rate_counter_args) do {:ok, row_count} -> Backoff.reset(backoff) @@ -177,20 +188,29 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do rows: [_ | _] = rows, num_rows: rows_count }}, - tenant_id + tenant_id, + rate_counter_args ) do - for row <- rows, - change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do - topic = "realtime:postgres:" <> tenant_id + case RateCounter.get(rate_counter_args) do + {:ok, %{limit: %{triggered: true}}} -> + :ok - RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes) + _ -> + Realtime.GenCounter.add(rate_counter_args.id, rows_count) + + for row <- rows, + change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do + topic = "realtime:postgres:" <> tenant_id + + RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes) + end end {:ok, rows_count} end - defp handle_list_changes_result({:ok, _}, _), do: {:ok, 0} - defp handle_list_changes_result({:error, reason}, _), do: {:error, reason} + defp handle_list_changes_result({:ok, _}, _, _), do: {:ok, 0} + defp handle_list_changes_result({:error, reason}, _, _), do: {:error, reason} def generate_record([ {"wal", diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index efd2397ac..d792c1ca5 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -247,6 +247,31 @@ defmodule Realtime.Tenants do %RateCounter.Args{id: db_events_per_second_key(tenant_id), opts: opts} end + @doc "RateCounter arguments for counting database events per second with a limit." + @spec db_events_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t() + def db_events_per_second_rate(tenant_id, max_events_per_second) when is_binary(tenant_id) do + opts = [ + telemetry: %{ + event_name: [:channel, :db_events], + measurements: %{}, + metadata: %{tenant: tenant_id} + }, + limit: [ + value: max_events_per_second, + measurement: :avg, + log: true, + log_fn: fn -> + Logger.error("MessagePerSecondRateLimitReached: Too many postgres changes messages per second", + external_id: tenant_id, + project: tenant_id + ) + end + ] + ] + + %RateCounter.Args{id: db_events_per_second_key(tenant_id), opts: opts} + end + @doc """ The GenCounter key to use when counting events for RealtimeChannel events. iex> Realtime.Tenants.db_events_per_second_key("tenant_id") diff --git a/mix.exs b/mix.exs index d0e42bf11..65245c31d 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.53.0", + version: "2.53.1", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs index d12c0ba73..1aa895aa2 100644 --- a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs +++ b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs @@ -324,8 +324,11 @@ defmodule Realtime.Extensions.CdcRlsTest do rate = Realtime.Tenants.db_events_per_second_rate(tenant) - assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate) - assert 1 in bucket + assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = + RateCounter.get(rate) + + # 1 from ReplicationPoller and 1 from MessageDispatcher + assert Enum.sum(bucket) == 2 assert_receive { :telemetry, @@ -335,6 +338,79 @@ defmodule Realtime.Extensions.CdcRlsTest do } end + test "rate limit works", %{tenant: tenant, conn: conn} do + on_exit(fn -> PostgresCdcRls.handle_stop(tenant.external_id, 10_000) end) + + %Tenant{extensions: extensions, external_id: external_id} = tenant + postgres_extension = PostgresCdc.filter_settings("postgres_cdc_rls", extensions) + args = Map.put(postgres_extension, "id", external_id) + + pg_change_params = [ + %{ + id: UUID.uuid1(), + params: %{"event" => "*", "schema" => "public"}, + channel_pid: self(), + claims: %{ + "exp" => System.system_time(:second) + 100_000, + "iat" => 0, + "ref" => "127.0.0.1", + "role" => "anon" + } + } + ] + + ids = + Enum.map(pg_change_params, fn %{id: id, params: params} -> + {UUID.string_to_binary!(id), :erlang.phash2(params)} + end) + + topic = "realtime:test" + serializer = Phoenix.Socket.V1.JSONSerializer + + subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true} + metadata = [metadata: subscription_metadata] + :ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata) + + # First time it will return nil + PostgresCdcRls.handle_connect(args) + # Wait for it to start + Process.sleep(3000) + {:ok, response} = PostgresCdcRls.handle_connect(args) + + # Now subscribe to the Postgres Changes + {:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params) + assert %Postgrex.Result{rows: [[1]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", []) + + log = + capture_log(fn -> + # increment artifically the counter to reach the limit + tenant.external_id + |> Realtime.Tenants.db_events_per_second_key() + |> Realtime.GenCounter.add(100_000_000) + + # Wait for RateCounter to update + Process.sleep(1500) + end) + + assert log =~ "MessagePerSecondRateLimitReached: Too many postgres changes messages per second" + + # Insert a record + %{rows: [[_id]]} = Postgrex.query!(conn, "insert into test (details) values ('test') returning id", []) + + refute_receive {:socket_push, :text, _}, 5000 + + # Wait for RateCounter to update + Process.sleep(2000) + + rate = Realtime.Tenants.db_events_per_second_rate(tenant) + + assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket, limit: %{triggered: true}}} = + RateCounter.get(rate) + + # Nothing has changed + assert Enum.sum(bucket) == 100_000_000 + end + @aux_mod (quote do defmodule Subscriber do # Start CDC remotely From d08f0e36657f878f55f60c7a0c32b87e6eeacdf5 Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Pinho Date: Tue, 7 Oct 2025 15:36:12 +1300 Subject: [PATCH 2/4] fix: integration tst --- test/integration/rt_channel_test.exs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 23b1a3a7f..20fc33515 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -2240,7 +2240,8 @@ defmodule Realtime.Integration.RtChannelTest do # 0 events as no broadcast used assert 2 = get_count([:realtime, :rate_counter, :channel, :joins], external_id) assert 2 = get_count([:realtime, :rate_counter, :channel, :presence_events], external_id) - assert 10 = get_count([:realtime, :rate_counter, :channel, :db_events], external_id) + # 5 + 5 + 5 (5 for each websocket and 5 while publishing) + assert 15 = get_count([:realtime, :rate_counter, :channel, :db_events], external_id) assert 0 = get_count([:realtime, :rate_counter, :channel, :events], external_id) end From fc63f75915649b892d172cebb60f0440c1dd04c9 Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Pinho Date: Wed, 8 Oct 2025 11:05:53 +1300 Subject: [PATCH 3/4] fix: hardcode db events per second with a high rate for now --- lib/extensions/postgres_cdc_rls/replication_poller.ex | 5 ++--- test/integration/rt_channel_test.exs | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index 20aedfc2e..34697572c 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -28,10 +28,9 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do tenant_id = args["id"] Logger.metadata(external_id: tenant_id, project: tenant_id) - %Realtime.Api.Tenant{max_events_per_second: max_events_per_second} = - Tenants.Cache.get_tenant_by_external_id(tenant_id) + %Realtime.Api.Tenant{} = Tenants.Cache.get_tenant_by_external_id(tenant_id) - rate_counter_args = Tenants.db_events_per_second_rate(tenant_id, max_events_per_second) + rate_counter_args = Tenants.db_events_per_second_rate(tenant_id, 4000) RateCounter.new(rate_counter_args) diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 20fc33515..f45b4aff5 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -1159,7 +1159,7 @@ defmodule Realtime.Integration.RtChannelTest do } end) - assert log =~ "ChannelShutdown: Token has expired 1000 seconds ago" + assert log =~ "ChannelShutdown: Token has expired" end test "ChannelShutdown include sub if available in jwt claims", %{tenant: tenant, topic: topic} do From 562de5d97969ee72cbc3d82ce1b11b42d9bffadd Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Pinho Date: Wed, 8 Oct 2025 11:22:16 +1300 Subject: [PATCH 4/4] chore: ensure that ratecounter is stopped after tests run --- test/realtime/extensions/cdc_rls/cdc_rls_test.exs | 1 + 1 file changed, 1 insertion(+) diff --git a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs index 1aa895aa2..0c0df0e19 100644 --- a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs +++ b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs @@ -235,6 +235,7 @@ defmodule Realtime.Extensions.CdcRlsTest do end) RateCounter.stop(tenant.external_id) + on_exit(fn -> RateCounter.stop(tenant.external_id) end) on_exit(fn -> :telemetry.detach(__MODULE__) end)