Skip to content

Commit d4565df

Browse files
authored
fix: rate limit connect module (#1541)
On bad connection, we rate limit the Connect module so we prevent abuses and too much logging of errors
1 parent e4ee7c8 commit d4565df

File tree

7 files changed

+111
-19
lines changed

7 files changed

+111
-19
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ This is the list of operational codes that can help you understand your deployme
243243
| ChannelRateLimitReached | The number of channels you can create has reached its limit |
244244
| ConnectionRateLimitReached | The number of connected clients as reached its limit |
245245
| ClientJoinRateLimitReached | The rate of joins per second from your clients has reached the channel limits |
246+
| DatabaseConnectionRateLimitReached | The rate of attempts to connect to tenants database has reached the limit |
246247
| MessagePerSecondRateLimitReached | The rate of messages per second from your clients has reached the channel limits |
247248
| RealtimeDisabledForTenant | Realtime has been disabled for the tenant |
248249
| UnableToConnectToTenantDatabase | Realtime was not able to connect to the tenant's database |

lib/realtime/tenants.ex

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,32 @@ defmodule Realtime.Tenants do
328328
%RateCounter.Args{id: {:channel, :authorization_errors, external_id}, opts: opts}
329329
end
330330

331+
@connect_per_second_default 10
332+
@doc "RateCounter arguments for counting connect per second."
333+
@spec connect_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
334+
def connect_per_second_rate(%Tenant{external_id: external_id}) do
335+
connect_per_second_rate(external_id)
336+
end
337+
338+
def connect_per_second_rate(tenant_id) do
339+
opts = [
340+
max_bucket_len: 10,
341+
limit: [
342+
value: @connect_per_second_default,
343+
measurement: :sum,
344+
log_fn: fn ->
345+
Logger.critical(
346+
"DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database",
347+
external_id: tenant_id,
348+
project: tenant_id
349+
)
350+
end
351+
]
352+
]
353+
354+
%RateCounter.Args{id: {:database, :connect, tenant_id}, opts: opts}
355+
end
356+
331357
defp pool_size(%{extensions: [%{settings: settings} | _]}) do
332358
Database.pool_size_by_application_name("realtime_connect", settings)
333359
end

lib/realtime/tenants/connect.ex

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,17 @@ defmodule Realtime.Tenants.Connect do
1111

1212
use Realtime.Logs
1313

14-
alias Realtime.Tenants.Rebalancer
1514
alias Realtime.Api.Tenant
15+
alias Realtime.GenCounter
16+
alias Realtime.RateCounter
1617
alias Realtime.Rpc
1718
alias Realtime.Tenants
1819
alias Realtime.Tenants.Connect.CheckConnection
1920
alias Realtime.Tenants.Connect.GetTenant
2021
alias Realtime.Tenants.Connect.Piper
2122
alias Realtime.Tenants.Connect.RegisterProcess
2223
alias Realtime.Tenants.Migrations
24+
alias Realtime.Tenants.Rebalancer
2325
alias Realtime.Tenants.ReplicationConnection
2426
alias Realtime.UsersCounter
2527

@@ -39,11 +41,8 @@ defmodule Realtime.Tenants.Connect do
3941
@doc "Check if Connect has finished setting up connections"
4042
def ready?(tenant_id) do
4143
case whereis(tenant_id) do
42-
pid when is_pid(pid) ->
43-
GenServer.call(pid, :ready?)
44-
45-
_ ->
46-
false
44+
pid when is_pid(pid) -> GenServer.call(pid, :ready?)
45+
_ -> false
4746
end
4847
end
4948

@@ -55,24 +54,29 @@ defmodule Realtime.Tenants.Connect do
5554
| {:error, :tenant_database_unavailable}
5655
| {:error, :initializing}
5756
| {:error, :tenant_database_connection_initializing}
58-
| {:error, :tenant_db_too_many_connections}
57+
| {:error, :connect_rate_limit_reached}
5958
| {:error, :rpc_error, term()}
6059
def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do
61-
case get_status(tenant_id) do
62-
{:ok, conn} ->
63-
{:ok, conn}
60+
rate_args = Tenants.connect_per_second_rate(tenant_id)
61+
RateCounter.new(rate_args)
6462

65-
{:error, :tenant_database_unavailable} ->
66-
{:error, :tenant_database_unavailable}
63+
with {:ok, %{limit: %{triggered: false}}} <- RateCounter.get(rate_args),
64+
{:ok, conn} <- get_status(tenant_id) do
65+
{:ok, conn}
66+
else
67+
{:ok, %{limit: %{triggered: true}}} ->
68+
{:error, :connect_rate_limit_reached}
6769

6870
{:error, :tenant_database_connection_initializing} ->
71+
GenCounter.add(rate_args.id)
6972
call_external_node(tenant_id, opts)
7073

7174
{:error, :initializing} ->
7275
{:error, :tenant_database_unavailable}
7376

74-
{:error, :tenant_db_too_many_connections} ->
75-
{:error, :tenant_db_too_many_connections}
77+
{:error, reason} ->
78+
GenCounter.add(rate_args.id)
79+
{:error, reason}
7680
end
7781
end
7882

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,10 @@ defmodule RealtimeWeb.RealtimeChannel do
167167
msg = "Database can't accept more connections, Realtime won't connect"
168168
log_error(socket, "DatabaseLackOfConnections", msg)
169169

170+
{:error, :connect_rate_limit_reached} ->
171+
msg = "Too many database connections attempts per second"
172+
log_error(socket, "DatabaseConnectionRateLimitReached", msg)
173+
170174
{:error, :unable_to_set_policies, error} ->
171175
log_error(socket, "UnableToSetPolicies", error)
172176
{:error, %{reason: "Realtime was unable to connect to the project database"}}

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.51.4",
7+
version: "2.51.5",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime/tenants/connect_test.exs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,53 @@ defmodule Realtime.Tenants.ConnectTest do
515515
assert capture_log(fn -> assert {:error, :rpc_error, _} = Connect.lookup_or_start_connection("tenant") end) =~
516516
"project=tenant external_id=tenant [error] ErrorOnRpcCall"
517517
end
518+
519+
test "rate limit connect when too many connections against bad database", %{tenant: tenant} do
520+
extension = %{
521+
"type" => "postgres_cdc_rls",
522+
"settings" => %{
523+
"db_host" => "127.0.0.1",
524+
"db_name" => "postgres",
525+
"db_user" => "supabase_admin",
526+
"db_password" => "postgres",
527+
"poll_interval" => 100,
528+
"poll_max_changes" => 100,
529+
"poll_max_record_bytes" => 1_048_576,
530+
"region" => "us-east-1",
531+
"ssl_enforced" => true
532+
}
533+
}
534+
535+
{:ok, tenant} = update_extension(tenant, extension)
536+
537+
log =
538+
capture_log(fn ->
539+
res =
540+
for _ <- 1..50 do
541+
Process.sleep(200)
542+
Connect.lookup_or_start_connection(tenant.external_id)
543+
end
544+
545+
assert Enum.any?(res, fn {_, res} -> res == :connect_rate_limit_reached end)
546+
end)
547+
548+
assert log =~ "DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database"
549+
end
550+
551+
test "rate limit connect will not trigger if connection is successful", %{tenant: tenant} do
552+
log =
553+
capture_log(fn ->
554+
res =
555+
for _ <- 1..20 do
556+
Process.sleep(500)
557+
Connect.lookup_or_start_connection(tenant.external_id)
558+
end
559+
560+
refute Enum.any?(res, fn {_, res} -> res == :tenant_db_too_many_connections end)
561+
end)
562+
563+
refute log =~ "DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database"
564+
end
518565
end
519566

520567
describe "shutdown/1" do

test/realtime_web/controllers/broadcast_controller_test.exs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
272272
} do
273273
request_events_key = Tenants.requests_per_second_key(tenant)
274274
broadcast_events_key = Tenants.events_per_second_key(tenant)
275+
connect_events_key = Tenants.connect_per_second_rate(tenant).id
275276
expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end)
276277

277278
messages_to_send =
@@ -290,7 +291,10 @@ defmodule RealtimeWeb.BroadcastControllerTest do
290291

291292
GenCounter
292293
|> expect(:add, fn ^request_events_key -> :ok end)
293-
|> expect(:add, length(messages), fn ^broadcast_events_key -> :ok end)
294+
|> expect(:add, length(messages), fn
295+
^broadcast_events_key -> :ok
296+
^connect_events_key -> :ok
297+
end)
294298

295299
conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})
296300

@@ -326,6 +330,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
326330
} do
327331
request_events_key = Tenants.requests_per_second_key(tenant)
328332
broadcast_events_key = Tenants.events_per_second_key(tenant)
333+
connect_events_key = Tenants.connect_per_second_rate(tenant).id
329334
expect(TenantBroadcaster, :pubsub_broadcast, 6, fn _, _, _, _ -> :ok end)
330335

331336
channels =
@@ -354,7 +359,10 @@ defmodule RealtimeWeb.BroadcastControllerTest do
354359

355360
GenCounter
356361
|> expect(:add, fn ^request_events_key -> :ok end)
357-
|> expect(:add, length(messages), fn ^broadcast_events_key -> :ok end)
362+
|> expect(:add, length(messages), fn
363+
^broadcast_events_key -> :ok
364+
^connect_events_key -> :ok
365+
end)
358366

359367
conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})
360368

@@ -408,6 +416,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
408416
} do
409417
request_events_key = Tenants.requests_per_second_key(tenant)
410418
broadcast_events_key = Tenants.events_per_second_key(tenant)
419+
connect_events_key = Tenants.connect_per_second_rate(tenant).id
411420
expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end)
412421

413422
messages_to_send =
@@ -428,7 +437,9 @@ defmodule RealtimeWeb.BroadcastControllerTest do
428437

429438
GenCounter
430439
|> expect(:add, fn ^request_events_key -> :ok end)
431-
|> expect(:add, length(messages_to_send), fn ^broadcast_events_key -> :ok end)
440+
# remove the one message that won't be broadcasted for this user
441+
|> expect(:add, 1, fn ^connect_events_key -> :ok end)
442+
|> expect(:add, length(messages) - 1, fn ^broadcast_events_key -> :ok end)
432443

433444
conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})
434445

@@ -482,7 +493,6 @@ defmodule RealtimeWeb.BroadcastControllerTest do
482493

483494
GenCounter
484495
|> expect(:add, fn ^request_events_key -> 1 end)
485-
|> reject(:add, 1)
486496

487497
conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})
488498

0 commit comments

Comments
 (0)