Skip to content

Commit 9daf263

Browse files
authored
feat: allow for gen_rpc TLS to be configured (#1507)
Either TCP or SSL server port can be exposed. Not both. The gen_rpc metrics had to be fixed to account for TLS
1 parent 4f8984d commit 9daf263

File tree

6 files changed

+140
-65
lines changed

6 files changed

+140
-65
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000
174174
| OTEL_TRACES_SAMPLER | string | Default to `parentbased_always_on` . More info [here](https://opentelemetry.io/docs/languages/erlang/sampling/#environment-variables) |
175175
| GEN_RPC_TCP_SERVER_PORT | number | Port served by `gen_rpc`. Must be secured just like the Erlang distribution port. Defaults to 5369 |
176176
| GEN_RPC_TCP_CLIENT_PORT | number | `gen_rpc` connects to another node using this port. Most of the time it should be the same as GEN_RPC_TCP_SERVER_PORT. Defaults to 5369 |
177+
| GEN_RPC_SSL_SERVER_PORT | number | Port served by `gen_rpc` secured with TLS. Must also define GEN_RPC_CERTFILE, GEN_RPC_KEYFILE and GEN_RPC_CACERTFILE. If this is defined then only TLS connections will be set-up. |
178+
| GEN_RPC_SSL_CLIENT_PORT | number | `gen_rpc` connects to another node using this port. Most of the time it should be the same as GEN_RPC_SSL_SERVER_PORT. Defaults to 6369 |
179+
| GEN_RPC_CERTFILE | string | Path to the public key in PEM format. Only needs to be provided if GEN_RPC_SSL_SERVER_PORT is defined |
180+
| GEN_RPC_KEYFILE | string | Path to the private key in PEM format. Only needs to be provided if GEN_RPC_SSL_SERVER_PORT is defined |
181+
| GEN_RPC_CACERTFILE | string | Path to the certificate authority public key in PEM format. Only needs to be provided if GEN_RPC_SSL_SERVER_PORT is defined |
177182
| GEN_RPC_CONNECT_TIMEOUT_IN_MS | number | `gen_rpc` client connect timeout in milliseconds. Defaults to 10000. |
178183
| GEN_RPC_SEND_TIMEOUT_IN_MS | number | `gen_rpc` client and server send timeout in milliseconds. Defaults to 10000. |
179184
| GEN_RPC_SOCKET_IP | string | Interface which `gen_rpc` will bind to. Defaults to "0.0.0.0" (ipv4) which means that all interfaces are going to expose the `gen_rpc` port. |

config/runtime.exs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,41 @@ end
158158
if config_env() != :test do
159159
gen_rpc_socket_ip = System.get_env("GEN_RPC_SOCKET_IP", "0.0.0.0") |> to_charlist()
160160

161+
gen_rpc_ssl_server_port = System.get_env("GEN_RPC_SSL_SERVER_PORT")
162+
163+
gen_rpc_ssl_server_port =
164+
if gen_rpc_ssl_server_port do
165+
String.to_integer(gen_rpc_ssl_server_port)
166+
end
167+
168+
gen_rpc_default_driver = if gen_rpc_ssl_server_port, do: :ssl, else: :tcp
169+
170+
if gen_rpc_default_driver == :ssl do
171+
gen_rpc_ssl_opts = [
172+
certfile: System.fetch_env!("GEN_RPC_CERTFILE"),
173+
keyfile: System.fetch_env!("GEN_RPC_KEYFILE"),
174+
cacertfile: System.fetch_env!("GEN_RPC_CACERTFILE")
175+
]
176+
177+
config :gen_rpc,
178+
ssl_server_port: gen_rpc_ssl_server_port,
179+
ssl_client_port: System.get_env("GEN_RPC_SSL_CLIENT_PORT", "6369") |> String.to_integer(),
180+
ssl_client_options: gen_rpc_ssl_opts,
181+
ssl_server_options: gen_rpc_ssl_opts,
182+
tcp_server_port: false,
183+
tcp_client_port: false
184+
else
185+
config :gen_rpc,
186+
ssl_server_port: false,
187+
ssl_client_port: false,
188+
tcp_server_port: System.get_env("GEN_RPC_TCP_SERVER_PORT", "5369") |> String.to_integer(),
189+
tcp_client_port: System.get_env("GEN_RPC_TCP_CLIENT_PORT", "5369") |> String.to_integer()
190+
end
191+
161192
case :inet.parse_address(gen_rpc_socket_ip) do
162193
{:ok, address} ->
163194
config :gen_rpc,
164-
tcp_server_port: System.get_env("GEN_RPC_TCP_SERVER_PORT", "5369") |> String.to_integer(),
165-
tcp_client_port: System.get_env("GEN_RPC_TCP_CLIENT_PORT", "5369") |> String.to_integer(),
195+
default_client_driver: gen_rpc_default_driver,
166196
connect_timeout: System.get_env("GEN_RPC_CONNECT_TIMEOUT_IN_MS", "10000") |> String.to_integer(),
167197
send_timeout: System.get_env("GEN_RPC_SEND_TIMEOUT_IN_MS", "10000") |> String.to_integer(),
168198
ipv6_only: System.get_env("GEN_RPC_IPV6_ONLY", "false") == "true",

lib/realtime/monitoring/gen_rpc_metrics.ex

Lines changed: 62 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -12,79 +12,58 @@ defmodule Realtime.GenRpcMetrics do
1212
{:ok, nodes_info} = :net_kernel.nodes_info()
1313
# Ignore "hidden" nodes (remote shell)
1414
nodes_info = Enum.filter(nodes_info, fn {_k, v} -> v[:type] == :normal end)
15+
gen_rpc_server_port = server_port()
16+
ip_address_node = ip_address_node(nodes_info)
1517

16-
# All TCP server sockets are managed by gen_rpc_acceptor_sup supervisor
17-
# All TCP client sockets are managed by gen_rpc_client_sup supervisor
18-
# For each node gen_rpc might have multiple TCP sockets
19-
20-
# For client processes we use the remote address (peername)
21-
client_port_addresses =
22-
port_addresses(:gen_rpc_client_sup)
23-
|> Enum.reduce(%{}, fn {address, port}, acc ->
24-
update_in(acc, [address], fn value -> [port | value || []] end)
18+
{client_ports, server_ports} =
19+
:erlang.ports()
20+
|> Stream.filter(fn port -> :erlang.port_info(port, :name) == {:name, ~c"tcp_inet"} end)
21+
|> Stream.map(&{:inet.peername(&1), :inet.sockname(&1), &1})
22+
|> Stream.filter(fn
23+
{{:ok, _peername}, {:ok, _sockname}, _port} -> true
24+
_ -> false
25+
end)
26+
|> Stream.map(fn {{:ok, {peername_ipaddress, peername_port}}, {:ok, {_, server_port}}, port} ->
27+
{ip_address_node[peername_ipaddress], peername_port, server_port, port}
2528
end)
29+
|> Stream.filter(fn
30+
{nil, _, _} ->
31+
false
2632

27-
# For server processes we use the ip address without the tcp port because it's randomly assigned
28-
server_port_addresses =
29-
port_addresses(:gen_rpc_acceptor_sup)
30-
|> Enum.reduce(%{}, fn {{ip_address, _tcp_port}, port}, acc ->
31-
update_in(acc, [ip_address], fn value -> [port | value || []] end)
33+
{node, peername_port, server_port, _port} ->
34+
{_, client_tcp_or_ssl_port} = :gen_rpc_helper.get_client_config_per_node(node)
35+
# Only keep Erlang ports that are either serving on the gen_rpc server tcp/ssl port or
36+
# connecting to other nodes using the expected client tcp/ssl port for that node
37+
peername_port == client_tcp_or_ssl_port or server_port == gen_rpc_server_port
38+
end)
39+
|> Enum.reduce({%{}, %{}}, fn {node, _peername_port, server_port, port}, {clients, servers} ->
40+
if server_port == gen_rpc_server_port do
41+
# This Erlang port is serving gen_rpc
42+
{clients, update_in(servers, [node], fn value -> [port | value || []] end)}
43+
else
44+
# This Erlang port is requesting gen_rpc
45+
{update_in(clients, [node], fn value -> [port | value || []] end), servers}
46+
end
3247
end)
3348

34-
Map.new(nodes_info, &info(&1, client_port_addresses, server_port_addresses))
49+
Map.new(nodes_info, &info(&1, client_ports, server_ports))
3550
else
3651
%{}
3752
end
3853
end
3954

40-
defp port_addresses(supervisor) do
41-
Supervisor.which_children(supervisor)
42-
|> Stream.flat_map(fn {_, pid, _, _} ->
43-
# We then grab the only linked port if available
44-
case Process.info(pid, :links) do
45-
{:links, links} ->
46-
links
47-
|> Enum.filter(&is_port/1)
48-
|> hd()
49-
|> List.wrap()
50-
51-
_ ->
52-
[]
53-
end
54-
end)
55-
|> Stream.map(&{:inet.peername(&1), &1})
56-
|> Stream.filter(fn
57-
{{:ok, _sockname}, _port} -> true
58-
_ -> false
59-
end)
60-
|> Stream.map(fn {{:ok, address}, port} -> {address, port} end)
61-
end
62-
63-
defp info({node, info}, client_port_addresses, server_port_addresses) do
64-
case info[:address] do
65-
net_address(address: address) when address != :undefined ->
66-
{node, info(node, client_port_addresses, server_port_addresses, address)}
67-
68-
_ ->
69-
{node, %{}}
70-
end
71-
end
72-
73-
defp info(node, client_port_addresses, server_port_addresses, {ip_address, _}) do
74-
{:tcp, client_tcp_port} = :gen_rpc_helper.get_client_config_per_node(node)
75-
76-
gen_rpc_client_ports = Map.get(client_port_addresses, {ip_address, client_tcp_port}, [])
77-
gen_rpc_server_ports = Map.get(server_port_addresses, ip_address, [])
78-
gen_rpc_ports = gen_rpc_client_ports ++ gen_rpc_server_ports
55+
defp info({node, _}, client_ports, server_ports) do
56+
gen_rpc_ports = Map.get(client_ports, node, []) ++ Map.get(server_ports, node, [])
7957

8058
if gen_rpc_ports != [] do
81-
%{
82-
inet_stats: inet_stats(gen_rpc_ports),
83-
queue_size: queue_size(gen_rpc_ports),
84-
connections: length(gen_rpc_ports)
85-
}
59+
{node,
60+
%{
61+
inet_stats: inet_stats(gen_rpc_ports),
62+
queue_size: queue_size(gen_rpc_ports),
63+
connections: length(gen_rpc_ports)
64+
}}
8665
else
87-
%{}
66+
{node, %{}}
8867
end
8968
end
9069

@@ -103,4 +82,27 @@ defmodule Realtime.GenRpcMetrics do
10382
acc + queue_size
10483
end)
10584
end
85+
86+
defp server_port() do
87+
if Application.fetch_env!(:gen_rpc, :default_client_driver) == :tcp do
88+
Application.fetch_env!(:gen_rpc, :tcp_server_port)
89+
else
90+
Application.fetch_env!(:gen_rpc, :ssl_server_port)
91+
end
92+
end
93+
94+
defp ip_address_node(nodes_info) do
95+
nodes_info
96+
|> Stream.map(fn {node, info} ->
97+
case info[:address] do
98+
net_address(address: {ip_address, _}) ->
99+
{ip_address, node}
100+
101+
_ ->
102+
{nil, node}
103+
end
104+
end)
105+
|> Stream.filter(fn {ip_address, _node} -> ip_address != nil end)
106+
|> Map.new()
107+
end
106108
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.43.2",
7+
version: "2.44.0",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime/monitoring/distributed_metrics_test.exs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,28 @@ defmodule Realtime.DistributedMetricsTest do
3232
}
3333
} = DistributedMetrics.info()
3434
end
35+
36+
test "metric matches on both sides", %{node: node} do
37+
# We need to generate some data first
38+
Realtime.Rpc.call(node, String, :to_integer, ["25"], key: 1)
39+
Realtime.Rpc.call(node, String, :to_integer, ["25"], key: 2)
40+
41+
local_metrics = DistributedMetrics.info()[node][:inet_stats]
42+
# Use gen_rpc to not use erl dist and change the result
43+
remote_metrics = :gen_rpc.call(node, DistributedMetrics, :info, [])[node()][:inet_stats]
44+
45+
# It's not going to 100% the same because erl dist sends pings and other things out of our control
46+
47+
assert local_metrics[:connections] == remote_metrics[:connections]
48+
49+
assert_in_delta(local_metrics[:send_avg], remote_metrics[:recv_avg], 5)
50+
assert_in_delta(local_metrics[:recv_avg], remote_metrics[:send_avg], 5)
51+
52+
assert_in_delta(local_metrics[:send_oct], remote_metrics[:recv_oct], 5)
53+
assert_in_delta(local_metrics[:recv_oct], remote_metrics[:send_oct], 5)
54+
55+
assert_in_delta(local_metrics[:send_max], remote_metrics[:recv_max], 5)
56+
assert_in_delta(local_metrics[:recv_max], remote_metrics[:send_max], 5)
57+
end
3558
end
3659
end

test/realtime/monitoring/gen_rpc_metrics_test.exs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,23 @@ defmodule Realtime.GenRpcMetricsTest do
4040
Realtime.GenRpc.call(node, String, :to_integer, ["25"], key: 1)
4141
Realtime.GenRpc.call(node, String, :to_integer, ["25"], key: 2)
4242

43-
local_metrics = GenRpcMetrics.info()[node]
44-
remote_metrics = :erpc.call(node, GenRpcMetrics, :info, [])[node()]
43+
:erpc.call(node, Realtime.GenRpc, :call, [node(), String, :to_integer, ["25"], [key: 1]])
44+
45+
local_metrics = GenRpcMetrics.info()[node][:inet_stats]
46+
remote_metrics = :erpc.call(node, GenRpcMetrics, :info, [])[node()][:inet_stats]
47+
48+
assert Map.keys(local_metrics) == [
49+
:send_pend,
50+
:recv_avg,
51+
:recv_cnt,
52+
:recv_dvi,
53+
:recv_max,
54+
:recv_oct,
55+
:send_avg,
56+
:send_cnt,
57+
:send_max,
58+
:send_oct
59+
]
4560

4661
assert local_metrics[:connections] == remote_metrics[:connections]
4762

0 commit comments

Comments
 (0)