Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ jobs:
restore-keys: |
${{ runner.os }}-mix-

- name: Pull postgres image quietly in background (used by test/support/containers.ex)
run: docker pull supabase/postgres:15.8.1.040 > /dev/null 2>&1 &
- name: Install dependencies
run: mix deps.get
- name: Set up Postgres
Expand Down
39 changes: 29 additions & 10 deletions lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
alias Realtime.Adapters.Changes.NewRecord
alias Realtime.Adapters.Changes.UpdatedRecord
alias Realtime.Database
alias Realtime.RateCounter
alias Realtime.Tenants

def start_link(opts), do: GenServer.start_link(__MODULE__, opts)

Expand All @@ -26,6 +28,12 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
tenant_id = args["id"]
Logger.metadata(external_id: tenant_id, project: tenant_id)

%Realtime.Api.Tenant{} = Tenants.Cache.get_tenant_by_external_id(tenant_id)

rate_counter_args = Tenants.db_events_per_second_rate(tenant_id, 4000)

RateCounter.new(rate_counter_args)

state = %{
backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp),
db_host: args["db_host"],
Expand All @@ -41,7 +49,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
retry_ref: nil,
retry_count: 0,
slot_name: args["slot_name"] <> slot_name_suffix(),
tenant_id: tenant_id
tenant_id: tenant_id,
rate_counter_args: rate_counter_args
}

{:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{})
Expand Down Expand Up @@ -74,7 +83,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
max_record_bytes: max_record_bytes,
max_changes: max_changes,
conn: conn,
tenant_id: tenant_id
tenant_id: tenant_id,
rate_counter_args: rate_counter_args
} = state
) do
cancel_timer(poll_ref)
Expand All @@ -84,7 +94,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
{time, list_changes} = :timer.tc(Replications, :list_changes, args)
record_list_changes_telemetry(time, tenant_id)

case handle_list_changes_result(list_changes, tenant_id) do
case handle_list_changes_result(list_changes, tenant_id, rate_counter_args) do
{:ok, row_count} ->
Backoff.reset(backoff)

Expand Down Expand Up @@ -177,20 +187,29 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
rows: [_ | _] = rows,
num_rows: rows_count
}},
tenant_id
tenant_id,
rate_counter_args
) do
for row <- rows,
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
topic = "realtime:postgres:" <> tenant_id
case RateCounter.get(rate_counter_args) do
{:ok, %{limit: %{triggered: true}}} ->
:ok

RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
_ ->
Realtime.GenCounter.add(rate_counter_args.id, rows_count)

for row <- rows,
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
topic = "realtime:postgres:" <> tenant_id

RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
end
end

{:ok, rows_count}
end

defp handle_list_changes_result({:ok, _}, _), do: {:ok, 0}
defp handle_list_changes_result({:error, reason}, _), do: {:error, reason}
defp handle_list_changes_result({:ok, _}, _, _), do: {:ok, 0}
defp handle_list_changes_result({:error, reason}, _, _), do: {:error, reason}

def generate_record([
{"wal",
Expand Down
25 changes: 25 additions & 0 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,31 @@ defmodule Realtime.Tenants do
%RateCounter.Args{id: db_events_per_second_key(tenant_id), opts: opts}
end

@doc "RateCounter arguments for counting database events per second with a limit."
@spec db_events_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t()
def db_events_per_second_rate(tenant_id, max_events_per_second) when is_binary(tenant_id) do
opts = [
telemetry: %{
event_name: [:channel, :db_events],
measurements: %{},
metadata: %{tenant: tenant_id}
},
limit: [
value: max_events_per_second,
measurement: :avg,
log: true,
log_fn: fn ->
Logger.error("MessagePerSecondRateLimitReached: Too many postgres changes messages per second",
external_id: tenant_id,
project: tenant_id
)
end
]
]

%RateCounter.Args{id: db_events_per_second_key(tenant_id), opts: opts}
end

@doc """
The GenCounter key to use when counting events for RealtimeChannel events.
iex> Realtime.Tenants.db_events_per_second_key("tenant_id")
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.53.0",
version: "2.53.1",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
5 changes: 3 additions & 2 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,7 @@ defmodule Realtime.Integration.RtChannelTest do
}
end)

assert log =~ "ChannelShutdown: Token has expired 1000 seconds ago"
assert log =~ "ChannelShutdown: Token has expired"
end

test "ChannelShutdown include sub if available in jwt claims", %{tenant: tenant, topic: topic} do
Expand Down Expand Up @@ -2240,7 +2240,8 @@ defmodule Realtime.Integration.RtChannelTest do
# 0 events as no broadcast used
assert 2 = get_count([:realtime, :rate_counter, :channel, :joins], external_id)
assert 2 = get_count([:realtime, :rate_counter, :channel, :presence_events], external_id)
assert 10 = get_count([:realtime, :rate_counter, :channel, :db_events], external_id)
# 5 + 5 + 5 (5 for each websocket and 5 while publishing)
assert 15 = get_count([:realtime, :rate_counter, :channel, :db_events], external_id)
assert 0 = get_count([:realtime, :rate_counter, :channel, :events], external_id)
end

Expand Down
81 changes: 79 additions & 2 deletions test/realtime/extensions/cdc_rls/cdc_rls_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
end)

RateCounter.stop(tenant.external_id)
on_exit(fn -> RateCounter.stop(tenant.external_id) end)

on_exit(fn -> :telemetry.detach(__MODULE__) end)

Expand Down Expand Up @@ -324,8 +325,11 @@ defmodule Realtime.Extensions.CdcRlsTest do

rate = Realtime.Tenants.db_events_per_second_rate(tenant)

assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate)
assert 1 in bucket
assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} =
RateCounter.get(rate)

# 1 from ReplicationPoller and 1 from MessageDispatcher
assert Enum.sum(bucket) == 2

assert_receive {
:telemetry,
Expand All @@ -335,6 +339,79 @@ defmodule Realtime.Extensions.CdcRlsTest do
}
end

test "rate limit works", %{tenant: tenant, conn: conn} do
on_exit(fn -> PostgresCdcRls.handle_stop(tenant.external_id, 10_000) end)

%Tenant{extensions: extensions, external_id: external_id} = tenant
postgres_extension = PostgresCdc.filter_settings("postgres_cdc_rls", extensions)
args = Map.put(postgres_extension, "id", external_id)

pg_change_params = [
%{
id: UUID.uuid1(),
params: %{"event" => "*", "schema" => "public"},
channel_pid: self(),
claims: %{
"exp" => System.system_time(:second) + 100_000,
"iat" => 0,
"ref" => "127.0.0.1",
"role" => "anon"
}
}
]

ids =
Enum.map(pg_change_params, fn %{id: id, params: params} ->
{UUID.string_to_binary!(id), :erlang.phash2(params)}
end)

topic = "realtime:test"
serializer = Phoenix.Socket.V1.JSONSerializer

subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true}
metadata = [metadata: subscription_metadata]
:ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata)

# First time it will return nil
PostgresCdcRls.handle_connect(args)
# Wait for it to start
Process.sleep(3000)
{:ok, response} = PostgresCdcRls.handle_connect(args)

# Now subscribe to the Postgres Changes
{:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params)
assert %Postgrex.Result{rows: [[1]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", [])

log =
capture_log(fn ->
# increment artifically the counter to reach the limit
tenant.external_id
|> Realtime.Tenants.db_events_per_second_key()
|> Realtime.GenCounter.add(100_000_000)

# Wait for RateCounter to update
Process.sleep(1500)
end)

assert log =~ "MessagePerSecondRateLimitReached: Too many postgres changes messages per second"

# Insert a record
%{rows: [[_id]]} = Postgrex.query!(conn, "insert into test (details) values ('test') returning id", [])

refute_receive {:socket_push, :text, _}, 5000

# Wait for RateCounter to update
Process.sleep(2000)

rate = Realtime.Tenants.db_events_per_second_rate(tenant)

assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket, limit: %{triggered: true}}} =
RateCounter.get(rate)

# Nothing has changed
assert Enum.sum(bucket) == 100_000_000
end

@aux_mod (quote do
defmodule Subscriber do
# Start CDC remotely
Expand Down
Loading