@@ -10,21 +10,34 @@ defmodule Realtime.GenRpcMetrics do
10
10
def info do
11
11
if :net_kernel . get_state ( ) [ :started ] != :no do
12
12
{ :ok , nodes_info } = :net_kernel . nodes_info ( )
13
+ # Ignore "hidden" nodes (remote shell)
14
+ nodes_info = Enum . filter ( nodes_info , fn { _k , v } -> v [ :type ] == :normal end )
15
+
13
16
# All TCP server sockets are managed by gen_rpc_acceptor_sup supervisor
14
17
# All TCP client sockets are managed by gen_rpc_client_sup supervisor
15
18
# For each node gen_rpc might have multiple TCP sockets
16
19
17
20
# For client processes we use the remote address (peername)
18
- client_port_addresses = port_addresses ( :gen_rpc_client_sup , & :inet . peername / 1 )
19
- # For server processes we use the local address (sockname)
20
- server_port_addresses = port_addresses ( :gen_rpc_acceptor_sup , & :inet . sockname / 1 )
21
+ client_port_addresses =
22
+ port_addresses ( :gen_rpc_client_sup )
23
+ |> Enum . reduce ( % { } , fn { address , port } , acc ->
24
+ update_in ( acc , [ address ] , fn value -> [ port | value || [ ] ] end )
25
+ end )
26
+
27
+ # For server processes we use the ip address without the tcp port because it's randomly assigned
28
+ server_port_addresses =
29
+ port_addresses ( :gen_rpc_acceptor_sup )
30
+ |> Enum . reduce ( % { } , fn { { ip_address , _tcp_port } , port } , acc ->
31
+ update_in ( acc , [ ip_address ] , fn value -> [ port | value || [ ] ] end )
32
+ end )
33
+
21
34
Map . new ( nodes_info , & info ( & 1 , client_port_addresses , server_port_addresses ) )
22
35
else
23
36
% { }
24
37
end
25
38
end
26
39
27
- defp port_addresses ( supervisor , address_fn ) do
40
+ defp port_addresses ( supervisor ) do
28
41
Supervisor . which_children ( supervisor )
29
42
|> Stream . flat_map ( fn { _ , pid , _ , _ } ->
30
43
# We then grab the only linked port if available
@@ -39,15 +52,12 @@ defmodule Realtime.GenRpcMetrics do
39
52
[ ]
40
53
end
41
54
end )
42
- |> Stream . map ( & { address_fn . ( & 1 ) , & 1 } )
55
+ |> Stream . map ( & { :inet . peername ( & 1 ) , & 1 } )
43
56
|> Stream . filter ( fn
44
57
{ { :ok , _sockname } , _port } -> true
45
58
_ -> false
46
59
end )
47
60
|> Stream . map ( fn { { :ok , address } , port } -> { address , port } end )
48
- |> Enum . reduce ( % { } , fn { address , port } , acc ->
49
- update_in ( acc , [ address ] , fn value -> [ port | value || [ ] ] end )
50
- end )
51
61
end
52
62
53
63
defp info ( { node , info } , client_port_addresses , server_port_addresses ) do
@@ -62,10 +72,9 @@ defmodule Realtime.GenRpcMetrics do
62
72
63
73
defp info ( node , client_port_addresses , server_port_addresses , { ip_address , _ } ) do
64
74
{ :tcp , client_tcp_port } = :gen_rpc_helper . get_client_config_per_node ( node )
65
- server_tcp_port = Application . fetch_env! ( :gen_rpc , :tcp_server_port )
66
75
67
76
gen_rpc_client_ports = Map . get ( client_port_addresses , { ip_address , client_tcp_port } , [ ] )
68
- gen_rpc_server_ports = Map . get ( server_port_addresses , { ip_address , server_tcp_port } , [ ] )
77
+ gen_rpc_server_ports = Map . get ( server_port_addresses , ip_address , [ ] )
69
78
gen_rpc_ports = gen_rpc_client_ports ++ gen_rpc_server_ports
70
79
71
80
if gen_rpc_ports != [ ] do
0 commit comments