diff --git a/lib/realtime/helpers.ex b/lib/realtime/helpers.ex index 6c6209768..bac49b6b9 100644 --- a/lib/realtime/helpers.ex +++ b/lib/realtime/helpers.ex @@ -4,6 +4,9 @@ defmodule Realtime.Helpers do """ require Logger + @doc """ + Cancels a timer. + """ @spec cancel_timer(reference() | nil) :: non_neg_integer() | false | :ok | nil def cancel_timer(nil), do: nil def cancel_timer(ref), do: Process.cancel_timer(ref) diff --git a/lib/realtime/monitoring/erl_sys_mon.ex b/lib/realtime/monitoring/erl_sys_mon.ex index 3278886d6..0884cd601 100644 --- a/lib/realtime/monitoring/erl_sys_mon.ex +++ b/lib/realtime/monitoring/erl_sys_mon.ex @@ -24,8 +24,8 @@ defmodule Realtime.ErlSysMon do {:ok, []} end - def handle_info({:monitor, pid, _type, _meta} = msg, state) when is_pid(pid) do - log_process_info(msg, pid) + def handle_info({:monitor, pid, type, _meta} = msg, state) when is_pid(pid) do + log_process_info(msg, pid, type) {:noreply, state} end @@ -34,7 +34,7 @@ defmodule Realtime.ErlSysMon do {:noreply, state} end - defp log_process_info(msg, pid) do + defp log_process_info(msg, pid, type) do pid_info = pid |> Process.info(:dictionary) @@ -47,6 +47,7 @@ defmodule Realtime.ErlSysMon do end extra_info = Process.info(pid, [:registered_name, :message_queue_len, :total_heap_size]) + defensive_exit?(pid, pid_info, type) Logger.warning( "#{__MODULE__} message: " <> @@ -56,4 +57,14 @@ defmodule Realtime.ErlSysMon do _ -> Logger.warning("#{__MODULE__} message: " <> inspect(msg)) end + + defp defensive_exit?(pid, pid_info, type) do + {{:"$initial_call", {mod, _, _}}, _} = pid_info + # credo:disable-for-next-line + + cond do + mod == RealtimeWeb.RealtimeChannel && type == :long_message_queue -> Process.exit(pid, :kill) + true -> :ok + end + end end diff --git a/mix.exs b/mix.exs index bc5fccc8b..f586a504b 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.51.15", + version: "2.51.16", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/realtime/monitoring/erl_sys_mon_test.exs b/test/realtime/monitoring/erl_sys_mon_test.exs index e9c7b87b7..170465e40 100644 --- a/test/realtime/monitoring/erl_sys_mon_test.exs +++ b/test/realtime/monitoring/erl_sys_mon_test.exs @@ -12,7 +12,7 @@ defmodule Realtime.Monitoring.ErlSysMonTest do Task.async(fn -> Process.register(self(), TestProcess) Enum.map(1..1000, &send(self(), &1)) - # Wait for ErlSysMon to notice + Process.sleep(4000) end) |> Task.await() @@ -26,4 +26,81 @@ defmodule Realtime.Monitoring.ErlSysMonTest do assert log =~ "total_heap_size: " end end + + test "ErlSysMon kills RealtimeChannel process with long message queue" do + start_supervised!({ErlSysMon, config: [{:long_message_queue, {0, 50}}]}) + {:ok, channel_pid} = create_mock_realtime_channel() + ref = Process.monitor(channel_pid) + Process.unlink(channel_pid) + for i <- 1..10_000, do: send(channel_pid, {:test_message, "message_#{i}"}) + + assert_receive {:DOWN, ^ref, :process, ^channel_pid, :killed}, 5000 + refute Process.alive?(channel_pid) + end + + test "ErlSysMon does not kill non-RealtimeChannel processes with long message queue" do + start_supervised!({ErlSysMon, config: [{:long_message_queue, {0, 50}}]}) + {:ok, regular_pid} = create_regular_process() + Process.unlink(regular_pid) + ref = Process.monitor(regular_pid) + for i <- 1..10_000, do: send(regular_pid, {:test_message, "message_#{i}"}) + Process.sleep(2000) + + assert Process.alive?(regular_pid) + + Process.exit(regular_pid, :kill) + assert_receive {:DOWN, ^ref, :process, ^regular_pid, :killed} + end + + test "ErlSysMon logs warning for RealtimeChannel long message queue" do + start_supervised!({ErlSysMon, config: [{:long_message_queue, {0, 50}}]}) + {:ok, channel_pid} = create_mock_realtime_channel() + Process.unlink(channel_pid) + + log = + capture_log(fn -> + for i <- 1..10_000, do: send(channel_pid, {:test_message, "message_#{i}"}) + Process.sleep(1000) + end) + + assert log =~ "Realtime.ErlSysMon message:" + assert log =~ "RealtimeWeb.RealtimeChannel" + assert log =~ "long_message_queue" + end + + defp create_mock_realtime_channel do + pid = + spawn_link(fn -> + Process.put(:"$initial_call", {RealtimeWeb.RealtimeChannel, :init, 1}) + + slow_message_processor() + end) + + {:ok, pid} + end + + defp create_regular_process do + pid = + spawn_link(fn -> + Process.put(:"$initial_call", {SomeOtherModule, :init, 1}) + + slow_message_processor() + end) + + {:ok, pid} + end + + defp slow_message_processor do + receive do + {:test_message, _content} -> + Process.sleep(10) + slow_message_processor() + + :stop -> + :ok + + _other -> + slow_message_processor() + end + end end