Skip to content

Commit 98d479d

Browse files
committed
move to erl sys mon
1 parent a884e0e commit 98d479d

File tree

6 files changed

+100
-59
lines changed

6 files changed

+100
-59
lines changed

lib/realtime/monitoring/erl_sys_mon.ex

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ defmodule Realtime.ErlSysMon do
2424
{:ok, []}
2525
end
2626

27-
def handle_info({:monitor, pid, _type, _meta} = msg, state) when is_pid(pid) do
28-
log_process_info(msg, pid)
27+
def handle_info({:monitor, pid, type, _meta} = msg, state) when is_pid(pid) do
28+
log_process_info(msg, pid, type)
2929
{:noreply, state}
3030
end
3131

@@ -34,7 +34,7 @@ defmodule Realtime.ErlSysMon do
3434
{:noreply, state}
3535
end
3636

37-
defp log_process_info(msg, pid) do
37+
defp log_process_info(msg, pid, type) do
3838
pid_info =
3939
pid
4040
|> Process.info(:dictionary)
@@ -47,6 +47,7 @@ defmodule Realtime.ErlSysMon do
4747
end
4848

4949
extra_info = Process.info(pid, [:registered_name, :message_queue_len, :total_heap_size])
50+
defensive_exit?(pid, pid_info, type)
5051

5152
Logger.warning(
5253
"#{__MODULE__} message: " <>
@@ -56,4 +57,14 @@ defmodule Realtime.ErlSysMon do
5657
_ ->
5758
Logger.warning("#{__MODULE__} message: " <> inspect(msg))
5859
end
60+
61+
defp defensive_exit?(pid, pid_info, type) do
62+
{{:"$initial_call", {mod, _, _}}, _} = pid_info
63+
# credo:disable-for-next-line
64+
65+
cond do
66+
mod == RealtimeWeb.RealtimeChannel && type == :long_message_queue -> Process.exit(pid, :kill)
67+
true -> :ok
68+
end
69+
end
5970
end

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ defmodule RealtimeWeb.RealtimeChannel do
2929
alias RealtimeWeb.RealtimeChannel.Tracker
3030

3131
@confirm_token_ms_interval :timer.minutes(5)
32+
3233
@impl true
3334
def join("realtime:", _params, socket) do
3435
log_error(socket, "TopicNameRequired", "You must provide a topic name")
@@ -237,24 +238,16 @@ defmodule RealtimeWeb.RealtimeChannel do
237238
{:noreply, socket}
238239
end
239240

240-
@websocket_message_queue_length_limit 1000
241241
def handle_info(:update_rate_counter, socket) do
242242
count(socket)
243243

244244
{:ok, rate_counter} = RateCounter.get(socket.assigns.rate_counter)
245-
{:message_queue_len, len} = Process.info(self(), :message_queue_len)
246-
247-
cond do
248-
rate_counter.limit.triggered ->
249-
message = "Too many messages per second"
250-
shutdown_response(socket, message)
251-
252-
len > @websocket_message_queue_length_limit ->
253-
message = "Websocket message queue length is too long"
254-
shutdown_response(socket, message)
255245

256-
true ->
257-
{:noreply, socket}
246+
if rate_counter.limit.triggered do
247+
message = "Too many messages per second"
248+
shutdown_response(socket, message)
249+
else
250+
{:noreply, socket}
258251
end
259252
end
260253

@@ -380,7 +373,6 @@ defmodule RealtimeWeb.RealtimeChannel do
380373
end
381374

382375
def handle_info(:sync_presence, socket), do: {:noreply, socket}
383-
384376
def handle_info(_, socket), do: {:noreply, socket}
385377

386378
@impl true

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.51.15",
7+
version: "2.51.16",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2484,45 +2484,6 @@ defmodule Realtime.Integration.RtChannelTest do
24842484
end
24852485
end
24862486

2487-
test "websocket message queue length limit", %{tenant: tenant} do
2488-
change_tenant_configuration(tenant, :max_events_per_second, 100_000) |> dbg()
2489-
2490-
on_exit(fn ->
2491-
change_tenant_configuration(tenant, :max_events_per_second, 100) |> dbg()
2492-
end)
2493-
2494-
{socket, _} = get_connection(tenant)
2495-
2496-
config = %{broadcast: %{self: true}, private: false}
2497-
channel = random_string()
2498-
full_topic = "realtime:#{channel}"
2499-
2500-
WebsocketClient.join(socket, full_topic, %{config: config})
2501-
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^full_topic}, 500
2502-
2503-
for _ <- 1..3000, Process.alive?(socket) do
2504-
spawn(fn -> WebsocketClient.send_event(socket, full_topic, "broadcast", %{"msg" => random_string()}) end)
2505-
end
2506-
2507-
assert_receive msg = %Message{
2508-
event: "system",
2509-
topic: ^full_topic,
2510-
payload: %{
2511-
"channel" => ^channel,
2512-
"extension" => "system",
2513-
"message" => "Websocket message queue length is too long",
2514-
"status" => "error"
2515-
}
2516-
},
2517-
500
2518-
2519-
assert_receive msg = %Message{
2520-
event: "phx_close",
2521-
topic: ^full_topic
2522-
},
2523-
500
2524-
end
2525-
25262487
defp mode(%{mode: :distributed}) do
25272488
tenant = Api.get_tenant_by_external_id("dev_tenant")
25282489

test/realtime/monitoring/erl_sys_mon_test.exs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ defmodule Realtime.Monitoring.ErlSysMonTest do
1212
Task.async(fn ->
1313
Process.register(self(), TestProcess)
1414
Enum.map(1..1000, &send(self(), &1))
15-
# Wait for ErlSysMon to notice
15+
1616
Process.sleep(4000)
1717
end)
1818
|> Task.await()
@@ -26,4 +26,81 @@ defmodule Realtime.Monitoring.ErlSysMonTest do
2626
assert log =~ "total_heap_size: "
2727
end
2828
end
29+
30+
test "ErlSysMon kills RealtimeChannel process with long message queue" do
31+
start_supervised!({ErlSysMon, config: [{:long_message_queue, {0, 50}}]})
32+
{:ok, channel_pid} = create_mock_realtime_channel()
33+
ref = Process.monitor(channel_pid)
34+
Process.unlink(channel_pid)
35+
for i <- 1..10_000, do: send(channel_pid, {:test_message, "message_#{i}"})
36+
37+
assert_receive {:DOWN, ^ref, :process, ^channel_pid, :killed}, 5000
38+
refute Process.alive?(channel_pid)
39+
end
40+
41+
test "ErlSysMon does not kill non-RealtimeChannel processes with long message queue" do
42+
start_supervised!({ErlSysMon, config: [{:long_message_queue, {0, 50}}]})
43+
{:ok, regular_pid} = create_regular_process()
44+
Process.unlink(regular_pid)
45+
ref = Process.monitor(regular_pid)
46+
for i <- 1..10_000, do: send(regular_pid, {:test_message, "message_#{i}"})
47+
Process.sleep(2000)
48+
49+
assert Process.alive?(regular_pid)
50+
51+
Process.exit(regular_pid, :kill)
52+
assert_receive {:DOWN, ^ref, :process, ^regular_pid, :killed}
53+
end
54+
55+
test "ErlSysMon logs warning for RealtimeChannel long message queue" do
56+
start_supervised!({ErlSysMon, config: [{:long_message_queue, {0, 50}}]})
57+
{:ok, channel_pid} = create_mock_realtime_channel()
58+
Process.unlink(channel_pid)
59+
60+
log =
61+
capture_log(fn ->
62+
for i <- 1..10_000, do: send(channel_pid, {:test_message, "message_#{i}"})
63+
Process.sleep(1000)
64+
end)
65+
66+
assert log =~ "Realtime.ErlSysMon message:"
67+
assert log =~ "RealtimeWeb.RealtimeChannel"
68+
assert log =~ "long_message_queue"
69+
end
70+
71+
defp create_mock_realtime_channel do
72+
pid =
73+
spawn_link(fn ->
74+
Process.put(:"$initial_call", {RealtimeWeb.RealtimeChannel, :init, 1})
75+
76+
slow_message_processor()
77+
end)
78+
79+
{:ok, pid}
80+
end
81+
82+
defp create_regular_process do
83+
pid =
84+
spawn_link(fn ->
85+
Process.put(:"$initial_call", {SomeOtherModule, :init, 1})
86+
87+
slow_message_processor()
88+
end)
89+
90+
{:ok, pid}
91+
end
92+
93+
defp slow_message_processor do
94+
receive do
95+
{:test_message, _content} ->
96+
Process.sleep(10)
97+
slow_message_processor()
98+
99+
:stop ->
100+
:ok
101+
102+
_other ->
103+
slow_message_processor()
104+
end
105+
end
29106
end

test/realtime_web/tenant_broadcaster_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
defmodule RealtimeWeb.TenantBroadcasterTest do
1+
git defmodule(RealtimeWeb.TenantBroadcasterTest) do
22
# Usage of Clustered and changing Application env
33
use Realtime.DataCase, async: false
44

0 commit comments

Comments
 (0)