Skip to content

Commit 353c142

Browse files
authored
fix: rate limit Connect.lookup_or_start_connection on error only (#1549)
1 parent a72a835 commit 353c142

File tree

5 files changed

+48
-22
lines changed

5 files changed

+48
-22
lines changed

lib/realtime/tenants.ex

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -328,18 +328,18 @@ defmodule Realtime.Tenants do
328328
%RateCounter.Args{id: {:channel, :authorization_errors, external_id}, opts: opts}
329329
end
330330

331-
@connect_per_second_default 10
331+
@connect_errors_per_second_default 10
332332
@doc "RateCounter arguments for counting connect per second."
333-
@spec connect_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
334-
def connect_per_second_rate(%Tenant{external_id: external_id}) do
335-
connect_per_second_rate(external_id)
333+
@spec connect_errors_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
334+
def connect_errors_per_second_rate(%Tenant{external_id: external_id}) do
335+
connect_errors_per_second_rate(external_id)
336336
end
337337

338-
def connect_per_second_rate(tenant_id) do
338+
def connect_errors_per_second_rate(tenant_id) do
339339
opts = [
340340
max_bucket_len: 10,
341341
limit: [
342-
value: @connect_per_second_default,
342+
value: @connect_errors_per_second_default,
343343
measurement: :sum,
344344
log_fn: fn ->
345345
Logger.critical(

lib/realtime/tenants/connect.ex

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ defmodule Realtime.Tenants.Connect do
5757
| {:error, :connect_rate_limit_reached}
5858
| {:error, :rpc_error, term()}
5959
def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do
60-
rate_args = Tenants.connect_per_second_rate(tenant_id)
60+
rate_args = Tenants.connect_errors_per_second_rate(tenant_id)
6161
RateCounter.new(rate_args)
6262

6363
with {:ok, %{limit: %{triggered: false}}} <- RateCounter.get(rate_args),
@@ -68,8 +68,14 @@ defmodule Realtime.Tenants.Connect do
6868
{:error, :connect_rate_limit_reached}
6969

7070
{:error, :tenant_database_connection_initializing} ->
71-
GenCounter.add(rate_args.id)
72-
call_external_node(tenant_id, opts)
71+
case call_external_node(tenant_id, opts) do
72+
{:ok, pid} ->
73+
{:ok, pid}
74+
75+
error ->
76+
GenCounter.add(rate_args.id)
77+
error
78+
end
7379

7480
{:error, :initializing} ->
7581
{:error, :tenant_database_unavailable}

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.51.6",
7+
version: "2.51.7",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime/tenants/connect_test.exs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,36 @@ defmodule Realtime.Tenants.ConnectTest do
5151
end
5252

5353
describe "handle cold start" do
54+
test "multiple processes connecting calling Connect.connect", %{tenant: tenant} do
55+
parent = self()
56+
57+
# Let's slow down Connect.connect so that multiple RPC calls are executed
58+
stub(Connect, :connect, fn x, y, z ->
59+
:timer.sleep(1000)
60+
call_original(Connect, :connect, [x, y, z])
61+
end)
62+
63+
connect = fn -> send(parent, Connect.lookup_or_start_connection(tenant.external_id)) end
64+
# Let's call enough times to potentially trigger the Connect RateCounter
65+
66+
for _ <- 1..50, do: spawn(connect)
67+
68+
assert_receive({:ok, pid}, 1100)
69+
70+
for _ <- 1..49, do: assert_receive({:ok, ^pid})
71+
72+
# Does not trigger rate limit as connections eventually succeeded
73+
74+
{:ok, rate_counter} =
75+
tenant.external_id
76+
|> Tenants.connect_errors_per_second_rate()
77+
|> Realtime.RateCounter.get()
78+
79+
assert rate_counter.sum == 0
80+
assert rate_counter.avg == 0.0
81+
assert rate_counter.limit.triggered == false
82+
end
83+
5484
test "multiple proccesses succeed together", %{tenant: tenant} do
5585
parent = self()
5686

test/realtime_web/controllers/broadcast_controller_test.exs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,6 @@ defmodule RealtimeWeb.BroadcastControllerTest do
272272
} do
273273
request_events_key = Tenants.requests_per_second_key(tenant)
274274
broadcast_events_key = Tenants.events_per_second_key(tenant)
275-
connect_events_key = Tenants.connect_per_second_rate(tenant).id
276275
expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end)
277276

278277
messages_to_send =
@@ -291,10 +290,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
291290

292291
GenCounter
293292
|> expect(:add, fn ^request_events_key -> :ok end)
294-
|> expect(:add, length(messages), fn
295-
^broadcast_events_key -> :ok
296-
^connect_events_key -> :ok
297-
end)
293+
|> expect(:add, length(messages), fn ^broadcast_events_key -> :ok end)
298294

299295
conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})
300296

@@ -330,7 +326,6 @@ defmodule RealtimeWeb.BroadcastControllerTest do
330326
} do
331327
request_events_key = Tenants.requests_per_second_key(tenant)
332328
broadcast_events_key = Tenants.events_per_second_key(tenant)
333-
connect_events_key = Tenants.connect_per_second_rate(tenant).id
334329
expect(TenantBroadcaster, :pubsub_broadcast, 6, fn _, _, _, _ -> :ok end)
335330

336331
channels =
@@ -359,10 +354,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
359354

360355
GenCounter
361356
|> expect(:add, fn ^request_events_key -> :ok end)
362-
|> expect(:add, length(messages), fn
363-
^broadcast_events_key -> :ok
364-
^connect_events_key -> :ok
365-
end)
357+
|> expect(:add, length(messages), fn ^broadcast_events_key -> :ok end)
366358

367359
conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})
368360

@@ -416,7 +408,6 @@ defmodule RealtimeWeb.BroadcastControllerTest do
416408
} do
417409
request_events_key = Tenants.requests_per_second_key(tenant)
418410
broadcast_events_key = Tenants.events_per_second_key(tenant)
419-
connect_events_key = Tenants.connect_per_second_rate(tenant).id
420411
expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end)
421412

422413
messages_to_send =
@@ -438,7 +429,6 @@ defmodule RealtimeWeb.BroadcastControllerTest do
438429
GenCounter
439430
|> expect(:add, fn ^request_events_key -> :ok end)
440431
# remove the one message that won't be broadcasted for this user
441-
|> expect(:add, 1, fn ^connect_events_key -> :ok end)
442432
|> expect(:add, length(messages) - 1, fn ^broadcast_events_key -> :ok end)
443433

444434
conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})

0 commit comments

Comments
 (0)