Skip to content

Commit ba63e09

Browse files
committed
move to erl sys mon
1 parent a884e0e commit ba63e09

File tree

4 files changed

+26
-36
lines changed

4 files changed

+26
-36
lines changed

lib/realtime/monitoring/erl_sys_mon.ex

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ defmodule Realtime.ErlSysMon do
2424
{:ok, []}
2525
end
2626

27-
def handle_info({:monitor, pid, _type, _meta} = msg, state) when is_pid(pid) do
28-
log_process_info(msg, pid)
27+
def handle_info({:monitor, pid, type, _meta} = msg, state) when is_pid(pid) do
28+
log_process_info(msg, pid, type)
2929
{:noreply, state}
3030
end
3131

@@ -34,7 +34,7 @@ defmodule Realtime.ErlSysMon do
3434
{:noreply, state}
3535
end
3636

37-
defp log_process_info(msg, pid) do
37+
defp log_process_info(msg, pid, type) do
3838
pid_info =
3939
pid
4040
|> Process.info(:dictionary)
@@ -47,6 +47,7 @@ defmodule Realtime.ErlSysMon do
4747
end
4848

4949
extra_info = Process.info(pid, [:registered_name, :message_queue_len, :total_heap_size])
50+
defensive_exit?(pid, pid_info, type)
5051

5152
Logger.warning(
5253
"#{__MODULE__} message: " <>
@@ -56,4 +57,14 @@ defmodule Realtime.ErlSysMon do
5657
_ ->
5758
Logger.warning("#{__MODULE__} message: " <> inspect(msg))
5859
end
60+
61+
defp defensive_exit?(pid, pid_info, type) do
62+
{{:"$initial_call", {mod, _, _}}, _} = pid_info
63+
64+
# credo:disable-for-next-line
65+
cond do
66+
mod == Realtime.Integration.WebsocketClient && type == :long_message_queue -> Process.exit(pid, :kill)
67+
true -> :ok
68+
end
69+
end
5970
end

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ defmodule RealtimeWeb.RealtimeChannel do
2929
alias RealtimeWeb.RealtimeChannel.Tracker
3030

3131
@confirm_token_ms_interval :timer.minutes(5)
32+
3233
@impl true
3334
def join("realtime:", _params, socket) do
3435
log_error(socket, "TopicNameRequired", "You must provide a topic name")
@@ -237,24 +238,16 @@ defmodule RealtimeWeb.RealtimeChannel do
237238
{:noreply, socket}
238239
end
239240

240-
@websocket_message_queue_length_limit 1000
241241
def handle_info(:update_rate_counter, socket) do
242242
count(socket)
243243

244244
{:ok, rate_counter} = RateCounter.get(socket.assigns.rate_counter)
245-
{:message_queue_len, len} = Process.info(self(), :message_queue_len)
246-
247-
cond do
248-
rate_counter.limit.triggered ->
249-
message = "Too many messages per second"
250-
shutdown_response(socket, message)
251-
252-
len > @websocket_message_queue_length_limit ->
253-
message = "Websocket message queue length is too long"
254-
shutdown_response(socket, message)
255245

256-
true ->
257-
{:noreply, socket}
246+
if rate_counter.limit.triggered do
247+
message = "Too many messages per second"
248+
shutdown_response(socket, message)
249+
else
250+
{:noreply, socket}
258251
end
259252
end
260253

@@ -380,7 +373,6 @@ defmodule RealtimeWeb.RealtimeChannel do
380373
end
381374

382375
def handle_info(:sync_presence, socket), do: {:noreply, socket}
383-
384376
def handle_info(_, socket), do: {:noreply, socket}
385377

386378
@impl true

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.51.15",
7+
version: "2.51.16",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2485,13 +2485,14 @@ defmodule Realtime.Integration.RtChannelTest do
24852485
end
24862486

24872487
test "websocket message queue length limit", %{tenant: tenant} do
2488-
change_tenant_configuration(tenant, :max_events_per_second, 100_000) |> dbg()
2488+
change_tenant_configuration(tenant, :max_events_per_second, 100_000)
24892489

24902490
on_exit(fn ->
2491-
change_tenant_configuration(tenant, :max_events_per_second, 100) |> dbg()
2491+
change_tenant_configuration(tenant, :max_events_per_second, 100)
24922492
end)
24932493

24942494
{socket, _} = get_connection(tenant)
2495+
Process.unlink(socket)
24952496

24962497
config = %{broadcast: %{self: true}, private: false}
24972498
channel = random_string()
@@ -2504,23 +2505,9 @@ defmodule Realtime.Integration.RtChannelTest do
25042505
spawn(fn -> WebsocketClient.send_event(socket, full_topic, "broadcast", %{"msg" => random_string()}) end)
25052506
end
25062507

2507-
assert_receive msg = %Message{
2508-
event: "system",
2509-
topic: ^full_topic,
2510-
payload: %{
2511-
"channel" => ^channel,
2512-
"extension" => "system",
2513-
"message" => "Websocket message queue length is too long",
2514-
"status" => "error"
2515-
}
2516-
},
2517-
500
2508+
# assert_receive %Message{event: "phx_close", topic: ^full_topic}, 500
25182509

2519-
assert_receive msg = %Message{
2520-
event: "phx_close",
2521-
topic: ^full_topic
2522-
},
2523-
500
2510+
refute Process.alive?(socket)
25242511
end
25252512

25262513
defp mode(%{mode: :distributed}) do

0 commit comments

Comments
 (0)