Skip to content

Commit 05ac93e

Browse files
fix: move connect rate limit to socket (#1555)
* fix: reduce max_frame_size to 5MB * fix: fullsweep_after=100 on gen rpc pub sub workers --------- Co-authored-by: Eduardo Gurgel Pinho <[email protected]>
1 parent 6e650f0 commit 05ac93e

File tree

9 files changed

+92
-5
lines changed

9 files changed

+92
-5
lines changed

lib/realtime/gen_rpc/pub_sub.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ defmodule Realtime.GenRpcPubSub.Worker do
6767
@impl true
6868
def init(pubsub) do
6969
Process.flag(:message_queue_data, :off_heap)
70-
Process.flag(:fullsweep_after, 1000)
70+
Process.flag(:fullsweep_after, 100)
7171
{:ok, pubsub}
7272
end
7373

lib/realtime_web/channels/realtime_channel/presence_handler.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
1111
alias Phoenix.Tracker.Shard
1212
alias Realtime.GenCounter
1313
alias Realtime.RateCounter
14-
alias Realtime.Tenants
14+
# alias Realtime.Tenants
1515
alias Realtime.Tenants.Authorization
1616
alias RealtimeWeb.Presence
1717
alias RealtimeWeb.RealtimeChannel.Logging
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
defmodule RealtimeWeb.TenantRateLimiters do
2+
@moduledoc """
3+
Rate limiters for tenants.
4+
"""
5+
require Logger
6+
alias Realtime.UsersCounter
7+
alias Realtime.Tenants
8+
alias Realtime.RateCounter
9+
alias Realtime.Api.Tenant
10+
11+
@spec check_tenant(Realtime.Api.Tenant.t()) :: :ok | {:error, :too_many_connections | :too_many_joins}
12+
def check_tenant(tenant) do
13+
with :ok <- max_concurrent_users_check(tenant) do
14+
max_joins_per_second_check(tenant)
15+
end
16+
end
17+
18+
defp max_concurrent_users_check(%Tenant{max_concurrent_users: max_conn_users, external_id: external_id}) do
19+
total_conn_users = UsersCounter.tenant_users(external_id)
20+
21+
if total_conn_users < max_conn_users,
22+
do: :ok,
23+
else: {:error, :too_many_connections}
24+
end
25+
26+
defp max_joins_per_second_check(%Tenant{max_joins_per_second: max_joins_per_second} = tenant) do
27+
rate_args = Tenants.joins_per_second_rate(tenant.external_id, max_joins_per_second)
28+
29+
RateCounter.new(rate_args)
30+
31+
case RateCounter.get(rate_args) do
32+
{:ok, %{limit: %{triggered: false}}} ->
33+
:ok
34+
35+
{:ok, %{limit: %{triggered: true}}} ->
36+
{:error, :too_many_joins}
37+
38+
error ->
39+
Logger.error("UnknownErrorOnCounter: #{inspect(error)}")
40+
{:error, error}
41+
end
42+
end
43+
end

lib/realtime_web/channels/user_socket.ex

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ defmodule RealtimeWeb.UserSocket do
1616
alias Realtime.PostgresCdc
1717
alias Realtime.Tenants
1818

19+
alias RealtimeWeb.TenantRateLimiters
1920
alias RealtimeWeb.ChannelsAuthorization
2021
alias RealtimeWeb.RealtimeChannel
2122
alias RealtimeWeb.RealtimeChannel.Logging
@@ -56,6 +57,7 @@ defmodule RealtimeWeb.UserSocket do
5657
token when is_binary(token) <- token,
5758
jwt_secret_dec <- Crypto.decrypt!(jwt_secret),
5859
{:ok, claims} <- ChannelsAuthorization.authorize_conn(token, jwt_secret_dec, jwt_jwks),
60+
:ok <- TenantRateLimiters.check_tenant(tenant),
5961
{:ok, postgres_cdc_module} <- PostgresCdc.driver(postgres_cdc_default) do
6062
%Tenant{
6163
extensions: extensions,
@@ -111,6 +113,16 @@ defmodule RealtimeWeb.UserSocket do
111113
log_error("MalformedJWT", "The token provided is not a valid JWT")
112114
{:error, :token_malformed}
113115

116+
{:error, :too_many_connections} ->
117+
msg = "Too many connected users"
118+
Logging.log_error(socket, "ConnectionRateLimitReached", msg)
119+
{:error, :too_many_connections}
120+
121+
{:error, :too_many_joins} ->
122+
msg = "Too many joins per second"
123+
Logging.log_error(socket, "JoinsRateLimitReached", msg)
124+
{:error, :too_many_joins}
125+
114126
error ->
115127
log_error("ErrorConnectingToWebsocket", error)
116128
error

lib/realtime_web/endpoint.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ defmodule RealtimeWeb.Endpoint do
1515
websocket: [
1616
connect_info: [:peer_data, :uri, :x_headers],
1717
fullsweep_after: 20,
18-
max_frame_size: 8_000_000,
18+
max_frame_size: 5_000_000,
1919
# https://github.com/ninenines/cowboy/blob/24d32de931a0c985ff7939077463fc8be939f0e9/doc/src/manual/cowboy_websocket.asciidoc#L228
2020
# active_n: The number of packets Cowboy will request from the socket at once.
2121
# This can be used to tweak the performance of the server. Higher values reduce

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.51.13",
7+
version: "2.51.14",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime/gen_rpc_pub_sub_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ defmodule Realtime.GenRpcPubSubTest do
1313
test "it sets fullsweep_after flag on the workers" do
1414
assert Realtime.PubSubElixir.Realtime.PubSub.Adapter_1
1515
|> Process.whereis()
16-
|> Process.info(:fullsweep_after) == {:fullsweep_after, 1000}
16+
|> Process.info(:fullsweep_after) == {:fullsweep_after, 100}
1717
end
1818
end

test/realtime_web/channels/realtime_channel/presence_handler_test.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
434434
assert log =~ "PresenceRateLimitReached"
435435
end
436436

437+
@tag :skip
437438
@tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence]
438439
test "respects rate limits on private channels", %{tenant: tenant, topic: topic, db_conn: db_conn} do
439440
key = random_string()
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
defmodule RealtimeWeb.TenantRateLimitersTest do
2+
use Realtime.DataCase, async: true
3+
4+
use Mimic
5+
alias RealtimeWeb.TenantRateLimiters
6+
alias Realtime.Api.Tenant
7+
8+
setup do
9+
tenant = %Tenant{external_id: random_string(), max_concurrent_users: 1, max_joins_per_second: 1}
10+
11+
%{tenant: tenant}
12+
end
13+
14+
describe "check_tenant/1" do
15+
test "rate is not exceeded", %{tenant: tenant} do
16+
assert TenantRateLimiters.check_tenant(tenant) == :ok
17+
end
18+
19+
test "max concurrent users is exceeded", %{tenant: tenant} do
20+
Realtime.UsersCounter.add(self(), tenant.external_id)
21+
22+
assert TenantRateLimiters.check_tenant(tenant) == {:error, :too_many_connections}
23+
end
24+
25+
test "max joins is exceeded", %{tenant: tenant} do
26+
expect(Realtime.RateCounter, :get, fn _ -> {:ok, %{limit: %{triggered: true}}} end)
27+
28+
assert TenantRateLimiters.check_tenant(tenant) == {:error, :too_many_joins}
29+
end
30+
end
31+
end

0 commit comments

Comments
 (0)