Skip to content

Commit 721f5ca

Browse files
committed
fix: set max socket message queue length
We're using the event to update the counter to check if the websocket has too many messages. If so we will consider abusive behaviour and will kill the socket to prevent further damage.
1 parent d4565df commit 721f5ca

File tree

4 files changed

+57
-7
lines changed

4 files changed

+57
-7
lines changed

lib/realtime/helpers.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ defmodule Realtime.Helpers do
44
"""
55
require Logger
66

7+
@doc """
8+
Cancels a timer.
9+
"""
710
@spec cancel_timer(reference() | nil) :: non_neg_integer() | false | :ok | nil
811
def cancel_timer(nil), do: nil
912
def cancel_timer(ref), do: Process.cancel_timer(ref)

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ defmodule RealtimeWeb.RealtimeChannel do
2929
alias RealtimeWeb.RealtimeChannel.Tracker
3030

3131
@confirm_token_ms_interval :timer.minutes(5)
32-
3332
@impl true
3433
def join("realtime:", _params, socket) do
3534
log_error(socket, "TopicNameRequired", "You must provide a topic name")
@@ -238,16 +237,24 @@ defmodule RealtimeWeb.RealtimeChannel do
238237
{:noreply, socket}
239238
end
240239

240+
@websocket_message_queue_length_limit 1000
241241
def handle_info(:update_rate_counter, socket) do
242242
count(socket)
243243

244244
{:ok, rate_counter} = RateCounter.get(socket.assigns.rate_counter)
245+
{:message_queue_len, len} = Process.info(self(), :message_queue_len) |> dbg()
245246

246-
if rate_counter.limit.triggered do
247-
message = "Too many messages per second"
248-
shutdown_response(socket, message)
249-
else
250-
{:noreply, socket}
247+
cond do
248+
rate_counter.limit.triggered ->
249+
message = "Too many messages per second"
250+
shutdown_response(socket, message)
251+
252+
len > @websocket_message_queue_length_limit ->
253+
message = "Websocket message queue length is too long"
254+
shutdown_response(socket, message)
255+
256+
true ->
257+
{:noreply, socket}
251258
end
252259
end
253260

@@ -373,6 +380,7 @@ defmodule RealtimeWeb.RealtimeChannel do
373380
end
374381

375382
def handle_info(:sync_presence, socket), do: {:noreply, socket}
383+
376384
def handle_info(_, socket), do: {:noreply, socket}
377385

378386
@impl true

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.51.5",
7+
version: "2.51.6",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2484,6 +2484,45 @@ defmodule Realtime.Integration.RtChannelTest do
24842484
end
24852485
end
24862486

2487+
test "websocket message queue length limit", %{tenant: tenant} do
2488+
change_tenant_configuration(tenant, :max_events_per_second, 100_000) |> dbg()
2489+
2490+
on_exit(fn ->
2491+
change_tenant_configuration(tenant, :max_events_per_second, 100) |> dbg()
2492+
end)
2493+
2494+
{socket, _} = get_connection(tenant)
2495+
2496+
config = %{broadcast: %{self: true}, private: false}
2497+
channel = random_string()
2498+
full_topic = "realtime:#{channel}"
2499+
2500+
WebsocketClient.join(socket, full_topic, %{config: config})
2501+
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^full_topic}, 500
2502+
2503+
for _ <- 1..3000, Process.alive?(socket) do
2504+
spawn(fn -> WebsocketClient.send_event(socket, full_topic, "broadcast", %{"msg" => random_string()}) end)
2505+
end
2506+
2507+
assert_receive msg = %Message{
2508+
event: "system",
2509+
topic: ^full_topic,
2510+
payload: %{
2511+
"channel" => ^channel,
2512+
"extension" => "system",
2513+
"message" => "Websocket message queue length is too long",
2514+
"status" => "error"
2515+
}
2516+
},
2517+
500
2518+
2519+
assert_receive msg = %Message{
2520+
event: "phx_close",
2521+
topic: ^full_topic
2522+
},
2523+
500
2524+
end
2525+
24872526
defp mode(%{mode: :distributed}) do
24882527
tenant = Api.get_tenant_by_external_id("dev_tenant")
24892528

0 commit comments

Comments
 (0)