Skip to content

Commit 8345e72

Browse files
committed
move to erl sys mon
1 parent a884e0e commit 8345e72

File tree

4 files changed

+20
-33
lines changed

4 files changed

+20
-33
lines changed

lib/realtime/monitoring/erl_sys_mon.ex

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ defmodule Realtime.ErlSysMon do
4747
end
4848

4949
extra_info = Process.info(pid, [:registered_name, :message_queue_len, :total_heap_size])
50+
defensive_exit?(pid, pid_info)
5051

5152
Logger.warning(
5253
"#{__MODULE__} message: " <>
@@ -56,4 +57,11 @@ defmodule Realtime.ErlSysMon do
5657
_ ->
5758
Logger.warning("#{__MODULE__} message: " <> inspect(msg))
5859
end
60+
61+
defp defensive_exit?(pid, pid_info) do
62+
case pid_info do
63+
{{:"$initial_call", {Realtime.Integration.WebsocketClient, _, _}}, _} -> Process.exit(pid, :kill)
64+
_ -> :ok
65+
end
66+
end
5967
end

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ defmodule RealtimeWeb.RealtimeChannel do
2929
alias RealtimeWeb.RealtimeChannel.Tracker
3030

3131
@confirm_token_ms_interval :timer.minutes(5)
32+
3233
@impl true
3334
def join("realtime:", _params, socket) do
3435
log_error(socket, "TopicNameRequired", "You must provide a topic name")
@@ -237,24 +238,16 @@ defmodule RealtimeWeb.RealtimeChannel do
237238
{:noreply, socket}
238239
end
239240

240-
@websocket_message_queue_length_limit 1000
241241
def handle_info(:update_rate_counter, socket) do
242242
count(socket)
243243

244244
{:ok, rate_counter} = RateCounter.get(socket.assigns.rate_counter)
245-
{:message_queue_len, len} = Process.info(self(), :message_queue_len)
246-
247-
cond do
248-
rate_counter.limit.triggered ->
249-
message = "Too many messages per second"
250-
shutdown_response(socket, message)
251-
252-
len > @websocket_message_queue_length_limit ->
253-
message = "Websocket message queue length is too long"
254-
shutdown_response(socket, message)
255245

256-
true ->
257-
{:noreply, socket}
246+
if rate_counter.limit.triggered do
247+
message = "Too many messages per second"
248+
shutdown_response(socket, message)
249+
else
250+
{:noreply, socket}
258251
end
259252
end
260253

@@ -380,7 +373,6 @@ defmodule RealtimeWeb.RealtimeChannel do
380373
end
381374

382375
def handle_info(:sync_presence, socket), do: {:noreply, socket}
383-
384376
def handle_info(_, socket), do: {:noreply, socket}
385377

386378
@impl true

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.51.15",
7+
version: "2.51.16",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2485,13 +2485,14 @@ defmodule Realtime.Integration.RtChannelTest do
24852485
end
24862486

24872487
test "websocket message queue length limit", %{tenant: tenant} do
2488-
change_tenant_configuration(tenant, :max_events_per_second, 100_000) |> dbg()
2488+
change_tenant_configuration(tenant, :max_events_per_second, 100_000)
24892489

24902490
on_exit(fn ->
2491-
change_tenant_configuration(tenant, :max_events_per_second, 100) |> dbg()
2491+
change_tenant_configuration(tenant, :max_events_per_second, 100)
24922492
end)
24932493

24942494
{socket, _} = get_connection(tenant)
2495+
Process.unlink(socket)
24952496

24962497
config = %{broadcast: %{self: true}, private: false}
24972498
channel = random_string()
@@ -2504,23 +2505,9 @@ defmodule Realtime.Integration.RtChannelTest do
25042505
spawn(fn -> WebsocketClient.send_event(socket, full_topic, "broadcast", %{"msg" => random_string()}) end)
25052506
end
25062507

2507-
assert_receive msg = %Message{
2508-
event: "system",
2509-
topic: ^full_topic,
2510-
payload: %{
2511-
"channel" => ^channel,
2512-
"extension" => "system",
2513-
"message" => "Websocket message queue length is too long",
2514-
"status" => "error"
2515-
}
2516-
},
2517-
500
2508+
# assert_receive %Message{event: "phx_close", topic: ^full_topic}, 500
25182509

2519-
assert_receive msg = %Message{
2520-
event: "phx_close",
2521-
topic: ^full_topic
2522-
},
2523-
500
2510+
refute Process.alive?(socket)
25242511
end
25252512

25262513
defp mode(%{mode: :distributed}) do

0 commit comments

Comments
 (0)