@@ -12,7 +12,7 @@ defmodule Realtime.Monitoring.ErlSysMonTest do
12
12
Task . async ( fn ->
13
13
Process . register ( self ( ) , TestProcess )
14
14
Enum . map ( 1 .. 1000 , & send ( self ( ) , & 1 ) )
15
- # Wait for ErlSysMon to notice
15
+
16
16
Process . sleep ( 4000 )
17
17
end )
18
18
|> Task . await ( )
@@ -26,4 +26,81 @@ defmodule Realtime.Monitoring.ErlSysMonTest do
26
26
assert log =~ "total_heap_size: "
27
27
end
28
28
end
29
+
30
+ test "ErlSysMon kills RealtimeChannel process with long message queue" do
31
+ start_supervised! ( { ErlSysMon , config: [ { :long_message_queue , { 0 , 50 } } ] } )
32
+ { :ok , channel_pid } = create_mock_realtime_channel ( )
33
+ ref = Process . monitor ( channel_pid )
34
+ Process . unlink ( channel_pid )
35
+ for i <- 1 .. 10_000 , do: send ( channel_pid , { :test_message , "message_#{ i } " } )
36
+
37
+ assert_receive { :DOWN , ^ ref , :process , ^ channel_pid , :killed } , 5000
38
+ refute Process . alive? ( channel_pid )
39
+ end
40
+
41
+ test "ErlSysMon does not kill non-RealtimeChannel processes with long message queue" do
42
+ start_supervised! ( { ErlSysMon , config: [ { :long_message_queue , { 0 , 50 } } ] } )
43
+ { :ok , regular_pid } = create_regular_process ( )
44
+ Process . unlink ( regular_pid )
45
+ ref = Process . monitor ( regular_pid )
46
+ for i <- 1 .. 10_000 , do: send ( regular_pid , { :test_message , "message_#{ i } " } )
47
+ Process . sleep ( 2000 )
48
+
49
+ assert Process . alive? ( regular_pid )
50
+
51
+ Process . exit ( regular_pid , :kill )
52
+ assert_receive { :DOWN , ^ ref , :process , ^ regular_pid , :killed }
53
+ end
54
+
55
+ test "ErlSysMon logs warning for RealtimeChannel long message queue" do
56
+ start_supervised! ( { ErlSysMon , config: [ { :long_message_queue , { 0 , 50 } } ] } )
57
+ { :ok , channel_pid } = create_mock_realtime_channel ( )
58
+ Process . unlink ( channel_pid )
59
+
60
+ log =
61
+ capture_log ( fn ->
62
+ for i <- 1 .. 10_000 , do: send ( channel_pid , { :test_message , "message_#{ i } " } )
63
+ Process . sleep ( 1000 )
64
+ end )
65
+
66
+ assert log =~ "Realtime.ErlSysMon message:"
67
+ assert log =~ "RealtimeWeb.RealtimeChannel"
68
+ assert log =~ "long_message_queue"
69
+ end
70
+
71
+ defp create_mock_realtime_channel do
72
+ pid =
73
+ spawn_link ( fn ->
74
+ Process . put ( :"$initial_call" , { RealtimeWeb.RealtimeChannel , :init , 1 } )
75
+
76
+ slow_message_processor ( )
77
+ end )
78
+
79
+ { :ok , pid }
80
+ end
81
+
82
+ defp create_regular_process do
83
+ pid =
84
+ spawn_link ( fn ->
85
+ Process . put ( :"$initial_call" , { SomeOtherModule , :init , 1 } )
86
+
87
+ slow_message_processor ( )
88
+ end )
89
+
90
+ { :ok , pid }
91
+ end
92
+
93
+ defp slow_message_processor do
94
+ receive do
95
+ { :test_message , _content } ->
96
+ Process . sleep ( 10 )
97
+ slow_message_processor ( )
98
+
99
+ :stop ->
100
+ :ok
101
+
102
+ _other ->
103
+ slow_message_processor ( )
104
+ end
105
+ end
29
106
end
0 commit comments